Package proton ::
Module reactor
|
|
1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
135
137 n = pn_reactor_wakeup(self._impl)
138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
141 pn_reactor_start(self._impl)
142
143 @property
145 return pn_reactor_quiesced(self._impl)
146
148 if self.errors:
149 for exc, value, tb in self.errors[:-1]:
150 traceback.print_exception(exc, value, tb)
151 exc, value, tb = self.errors[-1]
152 _compat.raise_(exc, value, tb)
153
155 result = pn_reactor_process(self._impl)
156 self._check_errors()
157 return result
158
160 pn_reactor_stop(self._impl)
161 self._check_errors()
162 self.global_handler = None
163 self.handler = None
164
166 impl = _chandler(task, self.on_error)
167 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
168 pn_decref(impl)
169 return task
170
171 - def acceptor(self, host, port, handler=None):
172 impl = _chandler(handler, self.on_error)
173 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
174 pn_decref(impl)
175 if aimpl:
176 return Acceptor(aimpl)
177 else:
178 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
179
181 impl = _chandler(handler, self.on_error)
182 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
183 pn_decref(impl)
184 return result
185
187 impl = _chandler(handler, self.on_error)
188 result = Selectable.wrap(pn_reactor_selectable(self._impl))
189 if impl:
190 record = pn_selectable_attachments(result._impl)
191 pn_record_set_handler(record, impl)
192 pn_decref(impl)
193 return result
194
196 pn_reactor_update(self._impl, sel._impl)
197
199 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
200
201 from proton import wrappers as _wrappers
202 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
203 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
207 """
208 Can be added to a reactor to allow events to be triggered by an
209 external thread but handled on the event thread associated with
210 the reactor. An instance of this class can be passed to the
211 Reactor.selectable() method of the reactor in order to activate
212 it. The close() method should be called when it is no longer
213 needed, to allow the event loop to end if needed.
214 """
216 self.queue = Queue.Queue()
217 self.pipe = os.pipe()
218 self._closed = False
219
221 """
222 Request that the given event be dispatched on the event thread
223 of the reactor to which this EventInjector was added.
224 """
225 self.queue.put(event)
226 os.write(self.pipe[1], _compat.str2bin("!"))
227
229 """
230 Request that this EventInjector be closed. Existing events
231 will be dispctahed on the reactors event dispactch thread,
232 then this will be removed from the set of interest.
233 """
234 self._closed = True
235 os.write(self.pipe[1], _compat.str2bin("!"))
236
239
245
247 os.read(self.pipe[0], 512)
248 while not self.queue.empty():
249 requested = self.queue.get()
250 event.reactor.push_event(requested.context, requested.type)
251 if self._closed:
252 s = event.context
253 s.terminate()
254 event.reactor.update(s)
255
258 """
259 Application defined event, which can optionally be associated with
260 an engine object and or an arbitrary subject
261 """
262 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
275
277 objects = [self.connection, self.session, self.link, self.delivery, self.subject]
278 return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
279
281 """
282 Class to track state of an AMQP 1.0 transaction.
283 """
284 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
285 self.txn_ctrl = txn_ctrl
286 self.handler = handler
287 self.id = None
288 self._declare = None
289 self._discharge = None
290 self.failed = False
291 self._pending = []
292 self.settle_before_discharge = settle_before_discharge
293 self.declare()
294
297
300
302 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
303
305 self.failed = failed
306 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
307
309 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value)))
310 delivery.transaction = self
311 return delivery
312
313 - def send(self, sender, msg, tag=None):
314 dlv = sender.send(msg, tag=tag)
315 dlv.local.data = [self.id]
316 dlv.update(0x34)
317 return dlv
318
320 self.update(delivery, PN_ACCEPTED)
321 if self.settle_before_discharge:
322 delivery.settle()
323 else:
324 self._pending.append(delivery)
325
326 - def update(self, delivery, state=None):
327 if state:
328 delivery.local.data = [self.id, Described(ulong(state), [])]
329 delivery.update(0x34)
330
332 for d in self._pending:
333 d.update(Delivery.RELEASED)
334 d.settle()
335 self._clear_pending()
336
339
362
364 """
365 Abstract interface for link configuration options
366 """
368 """
369 Subclasses will implement any configuration logic in this
370 method
371 """
372 pass
373 - def test(self, link):
374 """
375 Subclasses can override this to selectively apply an option
376 e.g. based on some link criteria
377 """
378 return True
379
382 link.snd_settle_mode = Link.SND_SETTLED
383
386 link.snd_settle_mode = Link.SND_UNSETTLED
387 link.rcv_settle_mode = Link.RCV_FIRST
388
390 - def apply(self, sender): pass
391 - def test(self, link): return link.is_sender
392
394 - def apply(self, receiver): pass
395 - def test(self, link): return link.is_receiver
396
399 self.properties = {}
400 for k in props:
401 if isinstance(k, symbol):
402 self.properties[k] = props[k]
403 else:
404 self.properties[symbol(k)] = props[k]
405
407 if link.is_receiver:
408 link.source.properties.put_dict(self.properties)
409 else:
410 link.target.properties.put_dict(self.properties)
411
414 self.filter_set = filter_set
415
416 - def apply(self, receiver):
417 receiver.source.filter.put_dict(self.filter_set)
418
420 """
421 Configures a link with a message selector filter
422 """
423 - def __init__(self, value, name='selector'):
424 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
425
427 - def apply(self, receiver):
428 receiver.source.durability = Terminus.DELIVERIES
429 receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
430
431 -class Move(ReceiverOption):
432 - def apply(self, receiver):
433 receiver.source.distribution_mode = Terminus.DIST_MODE_MOVE
434
435 -class Copy(ReceiverOption):
436 - def apply(self, receiver):
437 receiver.source.distribution_mode = Terminus.DIST_MODE_COPY
438
440 if options:
441 if isinstance(options, list):
442 for o in options:
443 if o.test(link): o.apply(link)
444 else:
445 if options.test(link): options.apply(link)
446
451
454 if hasattr(target, name):
455 return getattr(target, name)
456 else:
457 return None
458
461 self._default_session = None
462
464 if not self._default_session:
465 self._default_session = _create_session(connection)
466 self._default_session.context = self
467 return self._default_session
468
472
474 """
475 Internal handler that triggers the necessary socket connect for an
476 opened connection.
477 """
480
482 if not self._override(event):
483 event.dispatch(self.base)
484
486 conn = event.connection
487 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
488
490 """
491 Internal handler that triggers the necessary socket connect for an
492 opened connection.
493 """
495 self.connection = connection
496 self.address = None
497 self.heartbeat = None
498 self.reconnect = None
499 self.ssl_domain = None
500 self.allow_insecure_mechs = True
501 self.allowed_mechs = None
502 self.sasl_enabled = True
503 self.user = None
504 self.password = None
505
507 url = self.address.next()
508
509 connection.hostname = "%s:%s" % (url.host, url.port)
510 logging.info("connecting to %s..." % connection.hostname)
511
512 transport = Transport()
513 if self.sasl_enabled:
514 sasl = transport.sasl()
515 sasl.allow_insecure_mechs = self.allow_insecure_mechs
516 if url.username:
517 connection.user = url.username
518 elif self.user:
519 connection.user = self.user
520 if url.password:
521 connection.password = url.password
522 elif self.password:
523 connection.password = self.password
524 if self.allowed_mechs:
525 sasl.allowed_mechs(self.allowed_mechs)
526 transport.bind(connection)
527 if self.heartbeat:
528 transport.idle_timeout = self.heartbeat
529 if url.scheme == 'amqps':
530 if not self.ssl_domain:
531 raise SSLUnavailable("amqps: SSL libraries not found")
532 self.ssl = SSL(transport, self.ssl_domain)
533 self.ssl.peer_hostname = url.host
534
537
539 logging.info("connected to %s" % event.connection.hostname)
540 if self.reconnect:
541 self.reconnect.reset()
542 self.transport = None
543
546
548 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE:
549 if self.reconnect:
550 event.transport.unbind()
551 delay = self.reconnect.next()
552 if delay == 0:
553 logging.info("Disconnected, reconnecting...")
554 self._connect(self.connection)
555 else:
556 logging.info("Disconnected will try to reconnect after %s seconds" % delay)
557 event.reactor.schedule(delay, self)
558 else:
559 logging.info("Disconnected")
560 self.connection = None
561
564
567
569 """
570 A reconnect strategy involving an increasing delay between
571 retries, up to a maximum or 10 seconds.
572 """
575
578
580 current = self.delay
581 if current == 0:
582 self.delay = 0.1
583 else:
584 self.delay = min(10, 2*current)
585 return current
586
589 self.values = [Url(v) for v in values]
590 self.i = iter(self.values)
591
594
596 try:
597 return next(self.i)
598 except StopIteration:
599 self.i = iter(self.values)
600 return next(self.i)
601
604 self.client = SSLDomain(SSLDomain.MODE_CLIENT)
605 self.server = SSLDomain(SSLDomain.MODE_SERVER)
606
610
614
617 """A representation of the AMQP concept of a 'container', which
618 lossely speaking is something that establishes links to or from
619 another container, over which messages are transfered. This is
620 an extension to the Reactor class that adds convenience methods
621 for creating connections and sender- or receiver- links.
622 """
623 - def __init__(self, *handlers, **kwargs):
624 super(Container, self).__init__(*handlers, **kwargs)
625 if "impl" not in kwargs:
626 try:
627 self.ssl = SSLConfig()
628 except SSLUnavailable:
629 self.ssl = None
630 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler))
631 self.trigger = None
632 self.container_id = str(generate_uuid())
633 self.allow_insecure_mechs = True
634 self.allowed_mechs = None
635 self.sasl_enabled = True
636 self.user = None
637 self.password = None
638 Wrapper.__setattr__(self, 'subclass', self.__class__)
639
640 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
641 """
642 Initiates the establishment of an AMQP connection. Returns an
643 instance of proton.Connection.
644
645 @param url: URL string of process to connect to
646
647 @param urls: list of URL strings of process to try to connect to
648
649 Only one of url or urls should be specified.
650
651 @param reconnect: A value of False will prevent the library
652 form automatically trying to reconnect if the underlying
653 socket is disconnected before the connection has been closed.
654
655 @param heartbeat: A value in milliseconds indicating the
656 desired frequency of heartbeats used to test the underlying
657 socket is alive.
658
659 @param ssl_domain: SSL configuration in the form of an
660 instance of proton.SSLdomain.
661
662 @param handler: a connection scoped handler that will be
663 called to process any events in the scope of this connection
664 or its child links
665
666 @param kwargs: sasl_enabled, which determines whether a sasl
667 layer is used for the connection; allowed_mechs an optional
668 list of SASL mechanisms to allow if sasl is enabled;
669 allow_insecure_mechs a flag indicating whether insecure
670 mechanisms, such as PLAIN over a non-encrypted socket, are
671 allowed. These options can also be set at container scope.
672
673 """
674 conn = self.connection(handler)
675 conn.container = self.container_id or str(generate_uuid())
676
677 connector = Connector(conn)
678 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
679 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
680 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
681 connector.user = kwargs.get('user', self.user)
682 connector.password = kwargs.get('password', self.password)
683 conn._overrides = connector
684 if url: connector.address = Urls([url])
685 elif urls: connector.address = Urls(urls)
686 elif address: connector.address = address
687 else: raise ValueError("One of url, urls or address required")
688 if heartbeat:
689 connector.heartbeat = heartbeat
690 if reconnect:
691 connector.reconnect = reconnect
692 elif reconnect is None:
693 connector.reconnect = Backoff()
694
695
696 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
697 conn._session_policy = SessionPerConnection()
698 conn.open()
699 return conn
700
701 - def _get_id(self, container, remote, local):
702 if local and remote: "%s-%s-%s" % (container, remote, local)
703 elif local: return "%s-%s" % (container, local)
704 elif remote: return "%s-%s" % (container, remote)
705 else: return "%s-%s" % (container, str(generate_uuid()))
706
708 if isinstance(context, Url):
709 return self._get_session(self.connect(url=context))
710 elif isinstance(context, Session):
711 return context
712 elif isinstance(context, Connection):
713 if hasattr(context, '_session_policy'):
714 return context._session_policy.session(context)
715 else:
716 return _create_session(context)
717 else:
718 return context.session()
719
720 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
721 """
722 Initiates the establishment of a link over which messages can
723 be sent. Returns an instance of proton.Sender.
724
725 There are two patterns of use. (1) A connection can be passed
726 as the first argument, in which case the link is established
727 on that connection. In this case the target address can be
728 specified as the second argument (or as a keyword
729 argument). The source address can also be specified if
730 desired. (2) Alternatively a URL can be passed as the first
731 argument. In this case a new connection will be establised on
732 which the link will be attached. If a path is specified and
733 the target is not, then the path of the URL is used as the
734 target address.
735
736 The name of the link may be specified if desired, otherwise a
737 unique name will be generated.
738
739 Various LinkOptions can be specified to further control the
740 attachment.
741 """
742 if isinstance(context, _compat.STRING_TYPES):
743 context = Url(context)
744 if isinstance(context, Url) and not target:
745 target = context.path
746 session = self._get_session(context)
747 snd = session.sender(name or self._get_id(session.connection.container, target, source))
748 if source:
749 snd.source.address = source
750 if target:
751 snd.target.address = target
752 if handler != None:
753 snd.handler = handler
754 if tags:
755 snd.tag_generator = tags
756 _apply_link_options(options, snd)
757 snd.open()
758 return snd
759
760 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
761 """
762 Initiates the establishment of a link over which messages can
763 be received (aka a subscription). Returns an instance of
764 proton.Receiver.
765
766 There are two patterns of use. (1) A connection can be passed
767 as the first argument, in which case the link is established
768 on that connection. In this case the source address can be
769 specified as the second argument (or as a keyword
770 argument). The target address can also be specified if
771 desired. (2) Alternatively a URL can be passed as the first
772 argument. In this case a new connection will be establised on
773 which the link will be attached. If a path is specified and
774 the source is not, then the path of the URL is used as the
775 target address.
776
777 The name of the link may be specified if desired, otherwise a
778 unique name will be generated.
779
780 Various LinkOptions can be specified to further control the
781 attachment.
782 """
783 if isinstance(context, _compat.STRING_TYPES):
784 context = Url(context)
785 if isinstance(context, Url) and not source:
786 source = context.path
787 session = self._get_session(context)
788 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
789 if source:
790 rcv.source.address = source
791 if dynamic:
792 rcv.source.dynamic = True
793 if target:
794 rcv.target.address = target
795 if handler != None:
796 rcv.handler = handler
797 _apply_link_options(options, rcv)
798 rcv.open()
799 return rcv
800
802 if not _get_attr(context, '_txn_ctrl'):
803 class InternalTransactionHandler(OutgoingMessageHandler):
804 def __init__(self):
805 super(InternalTransactionHandler, self).__init__(auto_settle=True)
806
807 def on_settled(self, event):
808 if hasattr(event.delivery, "transaction"):
809 event.transaction = event.delivery.transaction
810 event.delivery.transaction.handle_outcome(event)
811 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
812 context._txn_ctrl.target.type = Terminus.COORDINATOR
813 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
814 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
815
816 - def listen(self, url, ssl_domain=None):
817 """
818 Initiates a server socket, accepting incoming AMQP connections
819 on the interface and port specified.
820 """
821 url = Url(url)
822 acceptor = self.acceptor(url.host, url.port)
823 ssl_config = ssl_domain
824 if not ssl_config and url.scheme == 'amqps':
825
826 if self.ssl:
827 ssl_config = self.ssl.server
828 else:
829 raise SSLUnavailable("amqps: SSL libraries not found")
830 if ssl_config:
831 acceptor.set_ssl_domain(ssl_config)
832 return acceptor
833
838