Package proton :: Module handlers
[frames] | no frames]

Source Code for Module proton.handlers

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one 
  3  # or more contributor license agreements.  See the NOTICE file 
  4  # distributed with this work for additional information 
  5  # regarding copyright ownership.  The ASF licenses this file 
  6  # to you under the Apache License, Version 2.0 (the 
  7  # "License"); you may not use this file except in compliance 
  8  # with the License.  You may obtain a copy of the License at 
  9  # 
 10  #   http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, 
 13  # software distributed under the License is distributed on an 
 14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 15  # KIND, either express or implied.  See the License for the 
 16  # specific language governing permissions and limitations 
 17  # under the License. 
 18  # 
 19  import heapq, logging, os, re, socket, time, types 
 20   
 21  from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url 
 22  from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout 
 23  from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException 
 24  from select import select 
25 26 27 -class OutgoingMessageHandler(Handler):
28 """ 29 A utility for simpler and more intuitive handling of delivery 30 events related to outgoing i.e. sent messages. 31 """
32 - def __init__(self, auto_settle=True, delegate=None):
33 self.auto_settle = auto_settle 34 self.delegate = delegate
35 39
40 - def on_delivery(self, event):
41 dlv = event.delivery 42 if dlv.link.is_sender and dlv.updated: 43 if dlv.remote_state == Delivery.ACCEPTED: 44 self.on_accepted(event) 45 elif dlv.remote_state == Delivery.REJECTED: 46 self.on_rejected(event) 47 elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED: 48 self.on_released(event) 49 if dlv.settled: 50 self.on_settled(event) 51 if self.auto_settle: 52 dlv.settle()
53
54 - def on_sendable(self, event):
55 """ 56 Called when the sender link has credit and messages can 57 therefore be transferred. 58 """ 59 if self.delegate != None: 60 dispatch(self.delegate, 'on_sendable', event)
61
62 - def on_accepted(self, event):
63 """ 64 Called when the remote peer accepts an outgoing message. 65 """ 66 if self.delegate != None: 67 dispatch(self.delegate, 'on_accepted', event)
68
69 - def on_rejected(self, event):
70 """ 71 Called when the remote peer rejects an outgoing message. 72 """ 73 if self.delegate != None: 74 dispatch(self.delegate, 'on_rejected', event)
75
76 - def on_released(self, event):
77 """ 78 Called when the remote peer releases an outgoing message. Note 79 that this may be in response to either the RELEASE or MODIFIED 80 state as defined by the AMQP specification. 81 """ 82 if self.delegate != None: 83 dispatch(self.delegate, 'on_released', event)
84
85 - def on_settled(self, event):
86 """ 87 Called when the remote peer has settled the outgoing 88 message. This is the point at which it shouod never be 89 retransmitted. 90 """ 91 if self.delegate != None: 92 dispatch(self.delegate, 'on_settled', event)
93
94 -def recv_msg(delivery):
95 msg = Message() 96 msg.decode(delivery.link.recv(delivery.pending)) 97 delivery.link.advance() 98 return msg
99
100 -class Reject(ProtonException):
101 """ 102 An exception that indicate a message should be rejected 103 """ 104 pass
105
106 -class Release(ProtonException):
107 """ 108 An exception that indicate a message should be rejected 109 """ 110 pass
111
112 -class Acking(object):
113 - def accept(self, delivery):
114 """ 115 Accepts a received message. 116 """ 117 self.settle(delivery, Delivery.ACCEPTED)
118
119 - def reject(self, delivery):
120 """ 121 Rejects a received message that is considered invalid or 122 unprocessable. 123 """ 124 self.settle(delivery, Delivery.REJECTED)
125
126 - def release(self, delivery, delivered=True):
127 """ 128 Releases a received message, making it available at the source 129 for any (other) interested receiver. The ``delivered`` 130 parameter indicates whether this should be considered a 131 delivery attempt (and the delivery count updated) or not. 132 """ 133 if delivered: 134 self.settle(delivery, Delivery.MODIFIED) 135 else: 136 self.settle(delivery, Delivery.RELEASED)
137
138 - def settle(self, delivery, state=None):
139 if state: 140 delivery.update(state) 141 delivery.settle()
142
143 -class IncomingMessageHandler(Handler, Acking):
144 """ 145 A utility for simpler and more intuitive handling of delivery 146 events related to incoming i.e. received messages. 147 """ 148
149 - def __init__(self, auto_accept=True, delegate=None):
150 self.delegate = delegate 151 self.auto_accept = auto_accept
152
153 - def on_delivery(self, event):
154 dlv = event.delivery 155 if not dlv.link.is_receiver: return 156 if dlv.readable and not dlv.partial: 157 event.message = recv_msg(dlv) 158 if event.link.state & Endpoint.LOCAL_CLOSED: 159 if self.auto_accept: 160 dlv.update(Delivery.RELEASED) 161 dlv.settle() 162 else: 163 try: 164 self.on_message(event) 165 if self.auto_accept: 166 dlv.update(Delivery.ACCEPTED) 167 dlv.settle() 168 except Reject: 169 dlv.update(Delivery.REJECTED) 170 dlv.settle() 171 except Release: 172 dlv.update(Delivery.MODIFIED) 173 dlv.settle() 174 elif dlv.updated and dlv.settled: 175 self.on_settled(event)
176
177 - def on_message(self, event):
178 """ 179 Called when a message is received. The message itself can be 180 obtained as a property on the event. For the purpose of 181 refering to this message in further actions (e.g. if 182 explicitly accepting it, the ``delivery`` should be used, also 183 obtainable via a property on the event. 184 """ 185 if self.delegate != None: 186 dispatch(self.delegate, 'on_message', event)
187
188 - def on_settled(self, event):
189 if self.delegate != None: 190 dispatch(self.delegate, 'on_settled', event)
191
192 -class EndpointStateHandler(Handler):
193 """ 194 A utility that exposes 'endpoint' events i.e. the open/close for 195 links, sessions and connections in a more intuitive manner. A 196 XXX_opened method will be called when both local and remote peers 197 have opened the link, session or connection. This can be used to 198 confirm a locally initiated action for example. A XXX_opening 199 method will be called when the remote peer has requested an open 200 that was not initiated locally. By default this will simply open 201 locally, which then triggers the XXX_opened call. The same applies 202 to close. 203 """ 204
205 - def __init__(self, peer_close_is_error=False, delegate=None):
206 self.delegate = delegate 207 self.peer_close_is_error = peer_close_is_error
208 209 @classmethod
210 - def is_local_open(cls, endpoint):
211 return endpoint.state & Endpoint.LOCAL_ACTIVE
212 213 @classmethod
214 - def is_local_uninitialised(cls, endpoint):
215 return endpoint.state & Endpoint.LOCAL_UNINIT
216 217 @classmethod
218 - def is_local_closed(cls, endpoint):
219 return endpoint.state & Endpoint.LOCAL_CLOSED
220 221 @classmethod
222 - def is_remote_open(cls, endpoint):
223 return endpoint.state & Endpoint.REMOTE_ACTIVE
224 225 @classmethod
226 - def is_remote_closed(cls, endpoint):
227 return endpoint.state & Endpoint.REMOTE_CLOSED
228 229 @classmethod
230 - def print_error(cls, endpoint, endpoint_type):
231 if endpoint.remote_condition: 232 logging.error(endpoint.remote_condition.description) 233 elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): 234 logging.error("%s closed by peer" % endpoint_type)
235 244
245 - def on_session_remote_close(self, event):
246 if event.session.remote_condition: 247 self.on_session_error(event) 248 elif self.is_local_closed(event.session): 249 self.on_session_closed(event) 250 else: 251 self.on_session_closing(event) 252 event.session.close()
253
254 - def on_connection_remote_close(self, event):
255 if event.connection.remote_condition: 256 self.on_connection_error(event) 257 elif self.is_local_closed(event.connection): 258 self.on_connection_closed(event) 259 else: 260 self.on_connection_closing(event) 261 event.connection.close()
262
263 - def on_connection_local_open(self, event):
264 if self.is_remote_open(event.connection): 265 self.on_connection_opened(event)
266
267 - def on_connection_remote_open(self, event):
268 if self.is_local_open(event.connection): 269 self.on_connection_opened(event) 270 elif self.is_local_uninitialised(event.connection): 271 self.on_connection_opening(event) 272 event.connection.open()
273
274 - def on_session_local_open(self, event):
275 if self.is_remote_open(event.session): 276 self.on_session_opened(event)
277
278 - def on_session_remote_open(self, event):
279 if self.is_local_open(event.session): 280 self.on_session_opened(event) 281 elif self.is_local_uninitialised(event.session): 282 self.on_session_opening(event) 283 event.session.open()
284 288 295
296 - def on_connection_opened(self, event):
297 if self.delegate != None: 298 dispatch(self.delegate, 'on_connection_opened', event)
299
300 - def on_session_opened(self, event):
301 if self.delegate != None: 302 dispatch(self.delegate, 'on_session_opened', event)
303 307
308 - def on_connection_opening(self, event):
309 if self.delegate != None: 310 dispatch(self.delegate, 'on_connection_opening', event)
311
312 - def on_session_opening(self, event):
313 if self.delegate != None: 314 dispatch(self.delegate, 'on_session_opening', event)
315 319
320 - def on_connection_error(self, event):
321 if self.delegate != None: 322 dispatch(self.delegate, 'on_connection_error', event) 323 else: 324 self.log_error(event.connection, "connection")
325
326 - def on_session_error(self, event):
327 if self.delegate != None: 328 dispatch(self.delegate, 'on_session_error', event) 329 else: 330 self.log_error(event.session, "session") 331 event.connection.close()
332 339
340 - def on_connection_closed(self, event):
341 if self.delegate != None: 342 dispatch(self.delegate, 'on_connection_closed', event)
343
344 - def on_session_closed(self, event):
345 if self.delegate != None: 346 dispatch(self.delegate, 'on_session_closed', event)
347 351
352 - def on_connection_closing(self, event):
353 if self.delegate != None: 354 dispatch(self.delegate, 'on_connection_closing', event) 355 elif self.peer_close_is_error: 356 self.on_connection_error(event)
357
358 - def on_session_closing(self, event):
359 if self.delegate != None: 360 dispatch(self.delegate, 'on_session_closing', event) 361 elif self.peer_close_is_error: 362 self.on_session_error(event)
363 369
370 - def on_transport_tail_closed(self, event):
371 self.on_transport_closed(event)
372
373 - def on_transport_closed(self, event):
374 if self.delegate != None and event.connection and self.is_local_open(event.connection): 375 dispatch(self.delegate, 'on_disconnected', event)
376
377 -class MessagingHandler(Handler, Acking):
378 """ 379 A general purpose handler that makes the proton-c events somewhat 380 simpler to deal with and/or avoids repetitive tasks for common use 381 cases. 382 """
383 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
384 self.handlers = [] 385 if prefetch: 386 self.handlers.append(CFlowController(prefetch)) 387 self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) 388 self.handlers.append(IncomingMessageHandler(auto_accept, self)) 389 self.handlers.append(OutgoingMessageHandler(auto_settle, self)) 390 self.fatal_conditions = ["amqp:unauthorized-access"]
391
392 - def on_transport_error(self, event):
393 """ 394 Called when some error is encountered with the transport over 395 which the AMQP connection is to be established. This includes 396 authentication errors as well as socket errors. 397 """ 398 if event.transport.condition: 399 if event.transport.condition.info: 400 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description, event.transport.condition.info)) 401 else: 402 logging.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description)) 403 if event.transport.condition.name in self.fatal_conditions: 404 event.connection.close() 405 else: 406 logging.error("Unspecified transport error")
407
408 - def on_connection_error(self, event):
409 """ 410 Called when the peer closes the connection with an error condition. 411 """ 412 EndpointStateHandler.print_error(event.connection, "connection")
413
414 - def on_session_error(self, event):
415 """ 416 Called when the peer closes the session with an error condition. 417 """ 418 EndpointStateHandler.print_error(event.session, "session") 419 event.connection.close()
420 427
428 - def on_reactor_init(self, event):
429 """ 430 Called when the event loop - the reactor - starts. 431 """ 432 if hasattr(event.reactor, 'subclass'): 433 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor) 434 self.on_start(event)
435
436 - def on_start(self, event):
437 """ 438 Called when the event loop starts. (Just an alias for on_reactor_init) 439 """ 440 pass
441 - def on_connection_closed(self, event):
442 """ 443 Called when the connection is closed. 444 """ 445 pass
446 - def on_session_closed(self, event):
447 """ 448 Called when the session is closed. 449 """ 450 pass
456 - def on_connection_closing(self, event):
457 """ 458 Called when the peer initiates the closing of the connection. 459 """ 460 pass
461 - def on_session_closing(self, event):
462 """ 463 Called when the peer initiates the closing of the session. 464 """ 465 pass
471 - def on_disconnected(self, event):
472 """ 473 Called when the socket is disconnected. 474 """ 475 pass
476
477 - def on_sendable(self, event):
478 """ 479 Called when the sender link has credit and messages can 480 therefore be transferred. 481 """ 482 pass
483
484 - def on_accepted(self, event):
485 """ 486 Called when the remote peer accepts an outgoing message. 487 """ 488 pass
489
490 - def on_rejected(self, event):
491 """ 492 Called when the remote peer rejects an outgoing message. 493 """ 494 pass
495
496 - def on_released(self, event):
497 """ 498 Called when the remote peer releases an outgoing message. Note 499 that this may be in response to either the RELEASE or MODIFIED 500 state as defined by the AMQP specification. 501 """ 502 pass
503
504 - def on_settled(self, event):
505 """ 506 Called when the remote peer has settled the outgoing 507 message. This is the point at which it shouod never be 508 retransmitted. 509 """ 510 pass
511 - def on_message(self, event):
512 """ 513 Called when a message is received. The message itself can be 514 obtained as a property on the event. For the purpose of 515 refering to this message in further actions (e.g. if 516 explicitly accepting it, the ``delivery`` should be used, also 517 obtainable via a property on the event. 518 """ 519 pass
520
521 -class TransactionHandler(object):
522 """ 523 The interface for transaction handlers, i.e. objects that want to 524 be notified of state changes related to a transaction. 525 """
526 - def on_transaction_declared(self, event):
527 pass
528
529 - def on_transaction_committed(self, event):
530 pass
531
532 - def on_transaction_aborted(self, event):
533 pass
534
535 - def on_transaction_declare_failed(self, event):
536 pass
537
538 - def on_transaction_commit_failed(self, event):
539 pass
540
541 -class TransactionalClientHandler(MessagingHandler, TransactionHandler):
542 """ 543 An extension to the MessagingHandler for applications using 544 transactions. 545 """ 546
547 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
548 super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)
549
550 - def accept(self, delivery, transaction=None):
551 if transaction: 552 transaction.accept(delivery) 553 else: 554 super(TransactionalClientHandler, self).accept(delivery)
555 556 from proton import WrappedHandler 557 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
558 559 -class CFlowController(WrappedHandler):
560
561 - def __init__(self, window=1024):
562 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
563
564 -class CHandshaker(WrappedHandler):
565
566 - def __init__(self):
567 WrappedHandler.__init__(self, pn_handshaker)
568
569 -class IOHandler(WrappedHandler):
570
571 - def __init__(self):
572 WrappedHandler.__init__(self, pn_iohandler)
573
574 -class PythonIO:
575
576 - def __init__(self):
577 self.selectables = [] 578 self.delegate = IOHandler()
579
580 - def on_unhandled(self, method, event):
581 event.dispatch(self.delegate)
582
583 - def on_selectable_init(self, event):
584 self.selectables.append(event.context)
585
586 - def on_selectable_updated(self, event):
587 pass
588
589 - def on_selectable_final(self, event):
590 sel = event.context 591 if sel.is_terminal: 592 self.selectables.remove(sel) 593 sel.release()
594
595 - def on_reactor_quiesced(self, event):
596 reactor = event.reactor 597 # check if we are still quiesced, other handlers of 598 # on_reactor_quiesced could have produced events to process 599 if not reactor.quiesced: return 600 601 reading = [] 602 writing = [] 603 deadline = None 604 for sel in self.selectables: 605 if sel.reading: 606 reading.append(sel) 607 if sel.writing: 608 writing.append(sel) 609 if sel.deadline: 610 if deadline is None: 611 deadline = sel.deadline 612 else: 613 deadline = min(sel.deadline, deadline) 614 615 if deadline is not None: 616 timeout = deadline - time.time() 617 else: 618 timeout = reactor.timeout 619 if (timeout < 0): timeout = 0 620 timeout = min(timeout, reactor.timeout) 621 readable, writable, _ = select(reading, writing, [], timeout) 622 623 reactor.mark() 624 625 now = time.time() 626 627 for s in readable: 628 s.readable() 629 for s in writable: 630 s.writable() 631 for s in self.selectables: 632 if s.deadline and now > s.deadline: 633 s.expired() 634 635 reactor.yield_()
636