1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32 from __future__ import absolute_import
33
34 from cproton import *
35 from .wrapper import Wrapper
36 from proton import _compat
37
38 import logging, weakref, socket, sys, threading
39
40 try:
41 handler = logging.NullHandler()
42 except AttributeError:
46
47 - def emit(self, record):
49
52
53 handler = NullHandler()
54
55 log = logging.getLogger("proton")
56 log.addHandler(handler)
57
58 try:
59 import uuid
63
64 except ImportError:
65 """
66 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
67 """
68 import struct
71 - def __init__(self, hex=None, bytes=None):
72 if [hex, bytes].count(None) != 1:
73 raise TypeError("need one of hex or bytes")
74 if bytes is not None:
75 self.bytes = bytes
76 elif hex is not None:
77 fields=hex.split("-")
78 fields[4:5] = [fields[4][:4], fields[4][4:]]
79 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
80
82 if isinstance(other, uuid.UUID):
83 return cmp(self.bytes, other.bytes)
84 else:
85 return -1
86
88 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
89
91 return "UUID(%r)" % str(self)
92
95
96 import os, random, time
97 rand = random.Random()
98 rand.seed((os.getpid(), time.time(), socket.gethostname()))
100 data = [rand.randint(0, 255) for i in xrange(16)]
101
102
103 data[6] &= 0x0F
104 data[6] |= 0x40
105
106
107 data[8] &= 0x3F
108 data[8] |= 0x80
109 return "".join(map(chr, data))
110
112 return uuid.UUID(bytes=random_uuid())
113
116
117
118
119
120 try:
121 bytes()
122 except NameError:
123 bytes = str
124 try:
125 long()
126 except NameError:
127 long = int
128 try:
129 unicode()
130 except NameError:
131 unicode = str
132
133
134 VERSION_MAJOR = PN_VERSION_MAJOR
135 VERSION_MINOR = PN_VERSION_MINOR
136 VERSION_POINT = PN_VERSION_POINT
137 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT)
138 API_LANGUAGE = "C"
139 IMPLEMENTATION_LANGUAGE = "C"
148
150 """
151 The root of the proton exception hierarchy. All proton exception
152 classes derive from this exception.
153 """
154 pass
155
157 """
158 A timeout exception indicates that a blocking operation has timed
159 out.
160 """
161 pass
162
164 """
165 An interrupt exception indicates that a blocking operation was interrupted.
166 """
167 pass
168
170 """
171 The root of the messenger exception hierarchy. All exceptions
172 generated by the messenger class derive from this exception.
173 """
174 pass
175
177 """
178 The MessageException class is the root of the message exception
179 hierarchy. All exceptions generated by the Message class derive from
180 this exception.
181 """
182 pass
183
184 EXCEPTIONS = {
185 PN_TIMEOUT: Timeout,
186 PN_INTR: Interrupt
187 }
188
189 PENDING = Constant("PENDING")
190 ACCEPTED = Constant("ACCEPTED")
191 REJECTED = Constant("REJECTED")
192 RELEASED = Constant("RELEASED")
193 MODIFIED = Constant("MODIFIED")
194 ABORTED = Constant("ABORTED")
195 SETTLED = Constant("SETTLED")
196
197 STATUSES = {
198 PN_STATUS_ABORTED: ABORTED,
199 PN_STATUS_ACCEPTED: ACCEPTED,
200 PN_STATUS_REJECTED: REJECTED,
201 PN_STATUS_RELEASED: RELEASED,
202 PN_STATUS_MODIFIED: MODIFIED,
203 PN_STATUS_PENDING: PENDING,
204 PN_STATUS_SETTLED: SETTLED,
205 PN_STATUS_UNKNOWN: None
206 }
207
208 AUTOMATIC = Constant("AUTOMATIC")
209 MANUAL = Constant("MANUAL")
212 """
213 The L{Messenger} class defines a high level interface for sending
214 and receiving L{Messages<Message>}. Every L{Messenger} contains a
215 single logical queue of incoming messages and a single logical queue
216 of outgoing messages. These messages in these queues may be destined
217 for, or originate from, a variety of addresses.
218
219 The messenger interface is single-threaded. All methods
220 except one (L{interrupt}) are intended to be used from within
221 the messenger thread.
222
223
224 Address Syntax
225 ==============
226
227 An address has the following form::
228
229 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
230
231 Where domain can be one of::
232
233 host | host:port | ip | ip:port | name
234
235 The following are valid examples of addresses:
236
237 - example.org
238 - example.org:1234
239 - amqp://example.org
240 - amqps://example.org
241 - example.org/incoming
242 - amqps://example.org/outgoing
243 - amqps://fred:trustno1@example.org
244 - 127.0.0.1:1234
245 - amqps://127.0.0.1:1234
246
247 Sending & Receiving Messages
248 ============================
249
250 The L{Messenger} class works in conjunction with the L{Message} class. The
251 L{Message} class is a mutable holder of message content.
252
253 The L{put} method copies its L{Message} to the outgoing queue, and may
254 send queued messages if it can do so without blocking. The L{send}
255 method blocks until it has sent the requested number of messages,
256 or until a timeout interrupts the attempt.
257
258
259 >>> message = Message()
260 >>> for i in range(3):
261 ... message.address = "amqp://host/queue"
262 ... message.subject = "Hello World %i" % i
263 ... messenger.put(message)
264 >>> messenger.send()
265
266 Similarly, the L{recv} method receives messages into the incoming
267 queue, and may block as it attempts to receive the requested number
268 of messages, or until timeout is reached. It may receive fewer
269 than the requested number. The L{get} method pops the
270 eldest L{Message} off the incoming queue and copies it into the L{Message}
271 object that you supply. It will not block.
272
273
274 >>> message = Message()
275 >>> messenger.recv(10):
276 >>> while messenger.incoming > 0:
277 ... messenger.get(message)
278 ... print message.subject
279 Hello World 0
280 Hello World 1
281 Hello World 2
282
283 The blocking flag allows you to turn off blocking behavior entirely,
284 in which case L{send} and L{recv} will do whatever they can without
285 blocking, and then return. You can then look at the number
286 of incoming and outgoing messages to see how much outstanding work
287 still remains.
288 """
289
291 """
292 Construct a new L{Messenger} with the given name. The name has
293 global scope. If a NULL name is supplied, a UUID based name will
294 be chosen.
295
296 @type name: string
297 @param name: the name of the messenger or None
298
299 """
300 self._mng = pn_messenger(name)
301 self._selectables = {}
302
304 """
305 Destroy the L{Messenger}. This will close all connections that
306 are managed by the L{Messenger}. Call the L{stop} method before
307 destroying the L{Messenger}.
308 """
309 if hasattr(self, "_mng"):
310 pn_messenger_free(self._mng)
311 del self._mng
312
314 if err < 0:
315 if (err == PN_INPROGRESS):
316 return
317 exc = EXCEPTIONS.get(err, MessengerException)
318 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
319 else:
320 return err
321
322 @property
324 """
325 The name of the L{Messenger}.
326 """
327 return pn_messenger_name(self._mng)
328
330 return pn_messenger_get_certificate(self._mng)
331
333 self._check(pn_messenger_set_certificate(self._mng, value))
334
335 certificate = property(_get_certificate, _set_certificate,
336 doc="""
337 Path to a certificate file for the L{Messenger}. This certificate is
338 used when the L{Messenger} accepts or establishes SSL/TLS connections.
339 This property must be specified for the L{Messenger} to accept
340 incoming SSL/TLS connections and to establish client authenticated
341 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
342 connections do not require this property.
343 """)
344
346 return pn_messenger_get_private_key(self._mng)
347
349 self._check(pn_messenger_set_private_key(self._mng, value))
350
351 private_key = property(_get_private_key, _set_private_key,
352 doc="""
353 Path to a private key file for the L{Messenger's<Messenger>}
354 certificate. This property must be specified for the L{Messenger} to
355 accept incoming SSL/TLS connections and to establish client
356 authenticated outgoing SSL/TLS connection. Non client authenticated
357 SSL/TLS connections do not require this property.
358 """)
359
361 return pn_messenger_get_password(self._mng)
362
364 self._check(pn_messenger_set_password(self._mng, value))
365
366 password = property(_get_password, _set_password,
367 doc="""
368 This property contains the password for the L{Messenger.private_key}
369 file, or None if the file is not encrypted.
370 """)
371
373 return pn_messenger_get_trusted_certificates(self._mng)
374
376 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
377
378 trusted_certificates = property(_get_trusted_certificates,
379 _set_trusted_certificates,
380 doc="""
381 A path to a database of trusted certificates for use in verifying the
382 peer on an SSL/TLS connection. If this property is None, then the peer
383 will not be verified.
384 """)
385
387 t = pn_messenger_get_timeout(self._mng)
388 if t == -1:
389 return None
390 else:
391 return millis2secs(t)
392
394 if value is None:
395 t = -1
396 else:
397 t = secs2millis(value)
398 self._check(pn_messenger_set_timeout(self._mng, t))
399
400 timeout = property(_get_timeout, _set_timeout,
401 doc="""
402 The timeout property contains the default timeout for blocking
403 operations performed by the L{Messenger}.
404 """)
405
407 return pn_messenger_is_blocking(self._mng)
408
410 self._check(pn_messenger_set_blocking(self._mng, b))
411
412 blocking = property(_is_blocking, _set_blocking,
413 doc="""
414 Enable or disable blocking behavior during L{Message} sending
415 and receiving. This affects every blocking call, with the
416 exception of L{work}. Currently, the affected calls are
417 L{send}, L{recv}, and L{stop}.
418 """)
419
421 return pn_messenger_is_passive(self._mng)
422
424 self._check(pn_messenger_set_passive(self._mng, b))
425
426 passive = property(_is_passive, _set_passive,
427 doc="""
428 When passive is set to true, Messenger will not attempt to perform I/O
429 internally. In this mode it is necessary to use the selectables API to
430 drive any I/O needed to perform requested actions. In this mode
431 Messenger will never block.
432 """)
433
435 return pn_messenger_get_incoming_window(self._mng)
436
438 self._check(pn_messenger_set_incoming_window(self._mng, window))
439
440 incoming_window = property(_get_incoming_window, _set_incoming_window,
441 doc="""
442 The incoming tracking window for the messenger. The messenger will
443 track the remote status of this many incoming deliveries after they
444 have been accepted or rejected. Defaults to zero.
445
446 L{Messages<Message>} enter this window only when you take them into your application
447 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
448 without explicitly accepting or rejecting the oldest message, then the
449 message that passes beyond the edge of the incoming window will be assigned
450 the default disposition of its link.
451 """)
452
454 return pn_messenger_get_outgoing_window(self._mng)
455
457 self._check(pn_messenger_set_outgoing_window(self._mng, window))
458
459 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
460 doc="""
461 The outgoing tracking window for the messenger. The messenger will
462 track the remote status of this many outgoing deliveries after calling
463 send. Defaults to zero.
464
465 A L{Message} enters this window when you call the put() method with the
466 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
467 times, status information will no longer be available for the
468 first message.
469 """)
470
472 """
473 Currently a no-op placeholder.
474 For future compatibility, do not L{send} or L{recv} messages
475 before starting the L{Messenger}.
476 """
477 self._check(pn_messenger_start(self._mng))
478
480 """
481 Transitions the L{Messenger} to an inactive state. An inactive
482 L{Messenger} will not send or receive messages from its internal
483 queues. A L{Messenger} should be stopped before being discarded to
484 ensure a clean shutdown handshake occurs on any internally managed
485 connections.
486 """
487 self._check(pn_messenger_stop(self._mng))
488
489 @property
491 """
492 Returns true iff a L{Messenger} is in the stopped state.
493 This function does not block.
494 """
495 return pn_messenger_stopped(self._mng)
496
498 """
499 Subscribes the L{Messenger} to messages originating from the
500 specified source. The source is an address as specified in the
501 L{Messenger} introduction with the following addition. If the
502 domain portion of the address begins with the '~' character, the
503 L{Messenger} will interpret the domain as host/port, bind to it,
504 and listen for incoming messages. For example "~0.0.0.0",
505 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
506 local interface and listen for incoming messages with the last
507 variant only permitting incoming SSL connections.
508
509 @type source: string
510 @param source: the source of messages to subscribe to
511 """
512 sub_impl = pn_messenger_subscribe(self._mng, source)
513 if not sub_impl:
514 self._check(pn_error_code(pn_messenger_error(self._mng)))
515 raise MessengerException("Cannot subscribe to %s"%source)
516 return Subscription(sub_impl)
517
518 - def put(self, message):
519 """
520 Places the content contained in the message onto the outgoing
521 queue of the L{Messenger}. This method will never block, however
522 it will send any unblocked L{Messages<Message>} in the outgoing
523 queue immediately and leave any blocked L{Messages<Message>}
524 remaining in the outgoing queue. The L{send} call may be used to
525 block until the outgoing queue is empty. The L{outgoing} property
526 may be used to check the depth of the outgoing queue.
527
528 When the content in a given L{Message} object is copied to the outgoing
529 message queue, you may then modify or discard the L{Message} object
530 without having any impact on the content in the outgoing queue.
531
532 This method returns an outgoing tracker for the L{Message}. The tracker
533 can be used to determine the delivery status of the L{Message}.
534
535 @type message: Message
536 @param message: the message to place in the outgoing queue
537 @return: a tracker
538 """
539 message._pre_encode()
540 self._check(pn_messenger_put(self._mng, message._msg))
541 return pn_messenger_outgoing_tracker(self._mng)
542
544 """
545 Gets the last known remote state of the delivery associated with
546 the given tracker.
547
548 @type tracker: tracker
549 @param tracker: the tracker whose status is to be retrieved
550
551 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED
552 """
553 disp = pn_messenger_status(self._mng, tracker);
554 return STATUSES.get(disp, disp)
555
557 """
558 Checks if the delivery associated with the given tracker is still
559 waiting to be sent.
560
561 @type tracker: tracker
562 @param tracker: the tracker whose status is to be retrieved
563
564 @return: true if delivery is still buffered
565 """
566 return pn_messenger_buffered(self._mng, tracker);
567
568 - def settle(self, tracker=None):
569 """
570 Frees a L{Messenger} from tracking the status associated with a given
571 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
572 to the most recent will be settled.
573 """
574 if tracker is None:
575 tracker = pn_messenger_outgoing_tracker(self._mng)
576 flags = PN_CUMULATIVE
577 else:
578 flags = 0
579 self._check(pn_messenger_settle(self._mng, tracker, flags))
580
581 - def send(self, n=-1):
582 """
583 This call will block until the indicated number of L{messages<Message>}
584 have been sent, or until the operation times out. If n is -1 this call will
585 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
586 this call will send whatever it can without blocking.
587 """
588 self._check(pn_messenger_send(self._mng, n))
589
590 - def recv(self, n=None):
591 """
592 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
593 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
594 can buffer internally. If the L{Messenger} is in blocking mode, this
595 call will block until at least one L{Message} is available in the
596 incoming queue.
597 """
598 if n is None:
599 n = -1
600 self._check(pn_messenger_recv(self._mng, n))
601
602 - def work(self, timeout=None):
603 """
604 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
605 This will block for the indicated timeout.
606 This method may also do I/O work other than sending and receiving
607 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
608 has been called.
609 """
610 if timeout is None:
611 t = -1
612 else:
613 t = secs2millis(timeout)
614 err = pn_messenger_work(self._mng, t)
615 if (err == PN_TIMEOUT):
616 return False
617 else:
618 self._check(err)
619 return True
620
621 @property
623 return pn_messenger_receiving(self._mng)
624
626 """
627 The L{Messenger} interface is single-threaded.
628 This is the only L{Messenger} function intended to be called
629 from outside of the L{Messenger} thread.
630 Call this from a non-messenger thread to interrupt
631 a L{Messenger} that is blocking.
632 This will cause any in-progress blocking call to throw
633 the L{Interrupt} exception. If there is no currently blocking
634 call, then the next blocking call will be affected, even if it
635 is within the same thread that interrupt was called from.
636 """
637 self._check(pn_messenger_interrupt(self._mng))
638
639 - def get(self, message=None):
640 """
641 Moves the message from the head of the incoming message queue into
642 the supplied message object. Any content in the message will be
643 overwritten.
644
645 A tracker for the incoming L{Message} is returned. The tracker can
646 later be used to communicate your acceptance or rejection of the
647 L{Message}.
648
649 If None is passed in for the L{Message} object, the L{Message}
650 popped from the head of the queue is discarded.
651
652 @type message: Message
653 @param message: the destination message object
654 @return: a tracker
655 """
656 if message is None:
657 impl = None
658 else:
659 impl = message._msg
660 self._check(pn_messenger_get(self._mng, impl))
661 if message is not None:
662 message._post_decode()
663 return pn_messenger_incoming_tracker(self._mng)
664
665 - def accept(self, tracker=None):
666 """
667 Signal the sender that you have acted on the L{Message}
668 pointed to by the tracker. If no tracker is supplied,
669 then all messages that have been returned by the L{get}
670 method are accepted, except those that have already been
671 auto-settled by passing beyond your incoming window size.
672
673 @type tracker: tracker
674 @param tracker: a tracker as returned by get
675 """
676 if tracker is None:
677 tracker = pn_messenger_incoming_tracker(self._mng)
678 flags = PN_CUMULATIVE
679 else:
680 flags = 0
681 self._check(pn_messenger_accept(self._mng, tracker, flags))
682
683 - def reject(self, tracker=None):
684 """
685 Rejects the L{Message} indicated by the tracker. If no tracker
686 is supplied, all messages that have been returned by the L{get}
687 method are rejected, except those that have already been auto-settled
688 by passing beyond your outgoing window size.
689
690 @type tracker: tracker
691 @param tracker: a tracker as returned by get
692 """
693 if tracker is None:
694 tracker = pn_messenger_incoming_tracker(self._mng)
695 flags = PN_CUMULATIVE
696 else:
697 flags = 0
698 self._check(pn_messenger_reject(self._mng, tracker, flags))
699
700 @property
702 """
703 The outgoing queue depth.
704 """
705 return pn_messenger_outgoing(self._mng)
706
707 @property
709 """
710 The incoming queue depth.
711 """
712 return pn_messenger_incoming(self._mng)
713
714 - def route(self, pattern, address):
715 """
716 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
717
718 The route procedure may be used to influence how a L{Messenger} will
719 internally treat a given address or class of addresses. Every call
720 to the route procedure will result in L{Messenger} appending a routing
721 rule to its internal routing table.
722
723 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
724 will match the address of this message against the set of routing
725 rules in order. The first rule to match will be triggered, and
726 instead of routing based on the address presented in the message,
727 the L{Messenger} will route based on the address supplied in the rule.
728
729 The pattern matching syntax supports two types of matches, a '%'
730 will match any character except a '/', and a '*' will match any
731 character including a '/'.
732
733 A routing address is specified as a normal AMQP address, however it
734 may additionally use substitution variables from the pattern match
735 that triggered the rule.
736
737 Any message sent to "foo" will be routed to "amqp://foo.com":
738
739 >>> messenger.route("foo", "amqp://foo.com");
740
741 Any message sent to "foobar" will be routed to
742 "amqp://foo.com/bar":
743
744 >>> messenger.route("foobar", "amqp://foo.com/bar");
745
746 Any message sent to bar/<path> will be routed to the corresponding
747 path within the amqp://bar.com domain:
748
749 >>> messenger.route("bar/*", "amqp://bar.com/$1");
750
751 Route all L{messages<Message>} over TLS:
752
753 >>> messenger.route("amqp:*", "amqps:$1")
754
755 Supply credentials for foo.com:
756
757 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
758
759 Supply credentials for all domains:
760
761 >>> messenger.route("amqp://*", "amqp://user:password@$1");
762
763 Route all addresses through a single proxy while preserving the
764 original destination:
765
766 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
767
768 Route any address through a single broker:
769
770 >>> messenger.route("*", "amqp://user:password@broker/$1");
771 """
772 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
773
774 - def rewrite(self, pattern, address):
775 """
776 Similar to route(), except that the destination of
777 the L{Message} is determined before the message address is rewritten.
778
779 The outgoing address is only rewritten after routing has been
780 finalized. If a message has an outgoing address of
781 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
782 outgoing address to "foo", it will still arrive at the peer that
783 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
784 the receiver will see its outgoing address as "foo".
785
786 The default rewrite rule removes username and password from addresses
787 before they are transmitted.
788 """
789 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
790
792 return Selectable.wrap(pn_messenger_selectable(self._mng))
793
794 @property
796 tstamp = pn_messenger_deadline(self._mng)
797 if tstamp:
798 return millis2secs(tstamp)
799 else:
800 return None
801
803 """The L{Message} class is a mutable holder of message content.
804
805 @ivar instructions: delivery instructions for the message
806 @type instructions: dict
807 @ivar annotations: infrastructure defined message annotations
808 @type annotations: dict
809 @ivar properties: application defined message properties
810 @type properties: dict
811 @ivar body: message body
812 @type body: bytes | unicode | dict | list | int | long | float | UUID
813 """
814
815 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
816
817 - def __init__(self, body=None, **kwargs):
818 """
819 @param kwargs: Message property name/value pairs to initialise the Message
820 """
821 self._msg = pn_message()
822 self._id = Data(pn_message_id(self._msg))
823 self._correlation_id = Data(pn_message_correlation_id(self._msg))
824 self.instructions = None
825 self.annotations = None
826 self.properties = None
827 self.body = body
828 for k,v in _compat.iteritems(kwargs):
829 getattr(self, k)
830 setattr(self, k, v)
831
833 if hasattr(self, "_msg"):
834 pn_message_free(self._msg)
835 del self._msg
836
838 if err < 0:
839 exc = EXCEPTIONS.get(err, MessageException)
840 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg))))
841 else:
842 return err
843
850
870
871 - def _post_decode(self):
872 inst = Data(pn_message_instructions(self._msg))
873 ann = Data(pn_message_annotations(self._msg))
874 props = Data(pn_message_properties(self._msg))
875 body = Data(pn_message_body(self._msg))
876
877 if inst.next():
878 self.instructions = inst.get_object()
879 else:
880 self.instructions = None
881 if ann.next():
882 self.annotations = ann.get_object()
883 else:
884 self.annotations = None
885 if props.next():
886 self.properties = props.get_object()
887 else:
888 self.properties = None
889 if body.next():
890 self.body = body.get_object()
891 else:
892 self.body = None
893
895 """
896 Clears the contents of the L{Message}. All fields will be reset to
897 their default values.
898 """
899 pn_message_clear(self._msg)
900 self.instructions = None
901 self.annotations = None
902 self.properties = None
903 self.body = None
904
906 return pn_message_is_inferred(self._msg)
907
909 self._check(pn_message_set_inferred(self._msg, bool(value)))
910
911 inferred = property(_is_inferred, _set_inferred, doc="""
912 The inferred flag for a message indicates how the message content
913 is encoded into AMQP sections. If inferred is true then binary and
914 list values in the body of the message will be encoded as AMQP DATA
915 and AMQP SEQUENCE sections, respectively. If inferred is false,
916 then all values in the body of the message will be encoded as AMQP
917 VALUE sections regardless of their type.
918 """)
919
921 return pn_message_is_durable(self._msg)
922
924 self._check(pn_message_set_durable(self._msg, bool(value)))
925
926 durable = property(_is_durable, _set_durable,
927 doc="""
928 The durable property indicates that the message should be held durably
929 by any intermediaries taking responsibility for the message.
930 """)
931
933 return pn_message_get_priority(self._msg)
934
936 self._check(pn_message_set_priority(self._msg, value))
937
938 priority = property(_get_priority, _set_priority,
939 doc="""
940 The priority of the message.
941 """)
942
944 return millis2secs(pn_message_get_ttl(self._msg))
945
947 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
948
949 ttl = property(_get_ttl, _set_ttl,
950 doc="""
951 The time to live of the message measured in seconds. Expired messages
952 may be dropped.
953 """)
954
956 return pn_message_is_first_acquirer(self._msg)
957
959 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
960
961 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
962 doc="""
963 True iff the recipient is the first to acquire the message.
964 """)
965
967 return pn_message_get_delivery_count(self._msg)
968
970 self._check(pn_message_set_delivery_count(self._msg, value))
971
972 delivery_count = property(_get_delivery_count, _set_delivery_count,
973 doc="""
974 The number of delivery attempts made for this message.
975 """)
976
977
985 id = property(_get_id, _set_id,
986 doc="""
987 The id of the message.
988 """)
989
991 return pn_message_get_user_id(self._msg)
992
994 self._check(pn_message_set_user_id(self._msg, value))
995
996 user_id = property(_get_user_id, _set_user_id,
997 doc="""
998 The user id of the message creator.
999 """)
1000
1002 return utf82unicode(pn_message_get_address(self._msg))
1003
1005 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
1006
1007 address = property(_get_address, _set_address,
1008 doc="""
1009 The address of the message.
1010 """)
1011
1013 return utf82unicode(pn_message_get_subject(self._msg))
1014
1016 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
1017
1018 subject = property(_get_subject, _set_subject,
1019 doc="""
1020 The subject of the message.
1021 """)
1022
1024 return utf82unicode(pn_message_get_reply_to(self._msg))
1025
1027 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1028
1029 reply_to = property(_get_reply_to, _set_reply_to,
1030 doc="""
1031 The reply-to address for the message.
1032 """)
1033
1041
1042 correlation_id = property(_get_correlation_id, _set_correlation_id,
1043 doc="""
1044 The correlation-id for the message.
1045 """)
1046
1048 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
1049
1050 - def _set_content_type(self, value):
1051 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
1052
1053 content_type = property(_get_content_type, _set_content_type,
1054 doc="""
1055 The content-type of the message.
1056 """)
1057
1059 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
1060
1061 - def _set_content_encoding(self, value):
1062 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
1063
1064 content_encoding = property(_get_content_encoding, _set_content_encoding,
1065 doc="""
1066 The content-encoding of the message.
1067 """)
1068
1070 return millis2secs(pn_message_get_expiry_time(self._msg))
1071
1073 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1074
1075 expiry_time = property(_get_expiry_time, _set_expiry_time,
1076 doc="""
1077 The expiry time of the message.
1078 """)
1079
1081 return millis2secs(pn_message_get_creation_time(self._msg))
1082
1084 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1085
1086 creation_time = property(_get_creation_time, _set_creation_time,
1087 doc="""
1088 The creation time of the message.
1089 """)
1090
1092 return utf82unicode(pn_message_get_group_id(self._msg))
1093
1095 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
1096
1097 group_id = property(_get_group_id, _set_group_id,
1098 doc="""
1099 The group id of the message.
1100 """)
1101
1103 return pn_message_get_group_sequence(self._msg)
1104
1106 self._check(pn_message_set_group_sequence(self._msg, value))
1107
1108 group_sequence = property(_get_group_sequence, _set_group_sequence,
1109 doc="""
1110 The sequence of the message within its group.
1111 """)
1112
1114 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
1115
1117 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
1118
1119 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1120 doc="""
1121 The group-id for any replies.
1122 """)
1123
1125 self._pre_encode()
1126 sz = 16
1127 while True:
1128 err, data = pn_message_encode(self._msg, sz)
1129 if err == PN_OVERFLOW:
1130 sz *= 2
1131 continue
1132 else:
1133 self._check(err)
1134 return data
1135
1137 self._check(pn_message_decode(self._msg, data))
1138 self._post_decode()
1139
1140 - def send(self, sender, tag=None):
1148
1149 - def recv(self, link):
1150 """
1151 Receives and decodes the message content for the current delivery
1152 from the link. Upon success it will return the current delivery
1153 for the link. If there is no current delivery, or if the current
1154 delivery is incomplete, or if the link is not a receiver, it will
1155 return None.
1156
1157 @type link: Link
1158 @param link: the link to receive a message from
1159 @return the delivery associated with the decoded message (or None)
1160
1161 """
1162 if link.is_sender: return None
1163 dlv = link.current
1164 if not dlv or dlv.partial: return None
1165 dlv.encoded = link.recv(dlv.pending)
1166 link.advance()
1167
1168
1169 if link.remote_snd_settle_mode == Link.SND_SETTLED:
1170 dlv.settle()
1171 self.decode(dlv.encoded)
1172 return dlv
1173
1175 props = []
1176 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1177 "priority", "first_acquirer", "delivery_count", "id",
1178 "correlation_id", "user_id", "group_id", "group_sequence",
1179 "reply_to_group_id", "instructions", "annotations",
1180 "properties", "body"):
1181 value = getattr(self, attr)
1182 if value: props.append("%s=%r" % (attr, value))
1183 return "Message(%s)" % ", ".join(props)
1184
1186 tmp = pn_string(None)
1187 err = pn_inspect(self._msg, tmp)
1188 result = pn_string_get(tmp)
1189 pn_free(tmp)
1190 self._check(err)
1191 return result
1192
1194
1197
1198 @property
1200 return pn_subscription_address(self._impl)
1201
1202 _DEFAULT = object()
1205
1206 @staticmethod
1208 if impl is None:
1209 return None
1210 else:
1211 return Selectable(impl)
1212
1215
1218
1220 if fd is _DEFAULT:
1221 return pn_selectable_get_fd(self._impl)
1222 elif fd is None:
1223 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
1224 else:
1225 pn_selectable_set_fd(self._impl, fd)
1226
1228 return pn_selectable_is_reading(self._impl)
1229
1231 pn_selectable_set_reading(self._impl, bool(val))
1232
1233 reading = property(_is_reading, _set_reading)
1234
1236 return pn_selectable_is_writing(self._impl)
1237
1239 pn_selectable_set_writing(self._impl, bool(val))
1240
1241 writing = property(_is_writing, _set_writing)
1242
1244 tstamp = pn_selectable_get_deadline(self._impl)
1245 if tstamp:
1246 return millis2secs(tstamp)
1247 else:
1248 return None
1249
1251 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1252
1253 deadline = property(_get_deadline, _set_deadline)
1254
1256 pn_selectable_readable(self._impl)
1257
1259 pn_selectable_writable(self._impl)
1260
1262 pn_selectable_expired(self._impl)
1263
1265 return pn_selectable_is_registered(self._impl)
1266
1268 pn_selectable_set_registered(self._impl, registered)
1269
1270 registered = property(_is_registered, _set_registered,
1271 doc="""
1272 The registered property may be get/set by an I/O polling system to
1273 indicate whether the fd has been registered or not.
1274 """)
1275
1276 @property
1278 return pn_selectable_is_terminal(self._impl)
1279
1281 pn_selectable_terminate(self._impl)
1282
1284 pn_selectable_release(self._impl)
1285
1287 """
1288 The DataException class is the root of the Data exception hierarchy.
1289 All exceptions raised by the Data class extend this exception.
1290 """
1291 pass
1292
1294
1297
1299 return "UnmappedType(%s)" % self.msg
1300
1302
1304 return "ulong(%s)" % long.__repr__(self)
1305
1307
1309 return "timestamp(%s)" % long.__repr__(self)
1310
1312
1314 return "symbol(%s)" % unicode.__repr__(self)
1315
1316 -class char(unicode):
1317
1319 return "char(%s)" % unicode.__repr__(self)
1320
1325
1330
1335
1340
1345
1347
1349 return "uint(%s)" % long.__repr__(self)
1350
1352
1354 return "float32(%s)" % float.__repr__(self)
1355
1360
1362
1364 return "decimal64(%s)" % long.__repr__(self)
1365
1367
1369 return "decimal128(%s)" % bytes.__repr__(self)
1370
1372
1373 - def __init__(self, descriptor, value):
1374 self.descriptor = descriptor
1375 self.value = value
1376
1378 return "Described(%r, %r)" % (self.descriptor, self.value)
1379
1381 if isinstance(o, Described):
1382 return self.descriptor == o.descriptor and self.value == o.value
1383 else:
1384 return False
1385
1386 UNDESCRIBED = Constant("UNDESCRIBED")
1387
1388 -class Array(object):
1389
1390 - def __init__(self, descriptor, type, *elements):
1391 self.descriptor = descriptor
1392 self.type = type
1393 self.elements = elements
1394
1396 return iter(self.elements)
1397
1399 if self.elements:
1400 els = ", %s" % (", ".join(map(repr, self.elements)))
1401 else:
1402 els = ""
1403 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1404
1406 if isinstance(o, Array):
1407 return self.descriptor == o.descriptor and \
1408 self.type == o.type and self.elements == o.elements
1409 else:
1410 return False
1411
1413 """
1414 The L{Data} class provides an interface for decoding, extracting,
1415 creating, and encoding arbitrary AMQP data. A L{Data} object
1416 contains a tree of AMQP values. Leaf nodes in this tree correspond
1417 to scalars in the AMQP type system such as L{ints<INT>} or
1418 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1419 compound values in the AMQP type system such as L{lists<LIST>},
1420 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1421 The root node of the tree is the L{Data} object itself and can have
1422 an arbitrary number of children.
1423
1424 A L{Data} object maintains the notion of the current sibling node
1425 and a current parent node. Siblings are ordered within their parent.
1426 Values are accessed and/or added by using the L{next}, L{prev},
1427 L{enter}, and L{exit} methods to navigate to the desired location in
1428 the tree and using the supplied variety of put_*/get_* methods to
1429 access or add a value of the desired type.
1430
1431 The put_* methods will always add a value I{after} the current node
1432 in the tree. If the current node has a next sibling the put_* method
1433 will overwrite the value on this node. If there is no current node
1434 or the current node has no next sibling then one will be added. The
1435 put_* methods always set the added/modified node to the current
1436 node. The get_* methods read the value of the current node and do
1437 not change which node is current.
1438
1439 The following types of scalar values are supported:
1440
1441 - L{NULL}
1442 - L{BOOL}
1443 - L{UBYTE}
1444 - L{USHORT}
1445 - L{SHORT}
1446 - L{UINT}
1447 - L{INT}
1448 - L{ULONG}
1449 - L{LONG}
1450 - L{FLOAT}
1451 - L{DOUBLE}
1452 - L{BINARY}
1453 - L{STRING}
1454 - L{SYMBOL}
1455
1456 The following types of compound values are supported:
1457
1458 - L{DESCRIBED}
1459 - L{ARRAY}
1460 - L{LIST}
1461 - L{MAP}
1462 """
1463
1464 NULL = PN_NULL; "A null value."
1465 BOOL = PN_BOOL; "A boolean value."
1466 UBYTE = PN_UBYTE; "An unsigned byte value."
1467 BYTE = PN_BYTE; "A signed byte value."
1468 USHORT = PN_USHORT; "An unsigned short value."
1469 SHORT = PN_SHORT; "A short value."
1470 UINT = PN_UINT; "An unsigned int value."
1471 INT = PN_INT; "A signed int value."
1472 CHAR = PN_CHAR; "A character value."
1473 ULONG = PN_ULONG; "An unsigned long value."
1474 LONG = PN_LONG; "A signed long value."
1475 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1476 FLOAT = PN_FLOAT; "A float value."
1477 DOUBLE = PN_DOUBLE; "A double value."
1478 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1479 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1480 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1481 UUID = PN_UUID; "A UUID value."
1482 BINARY = PN_BINARY; "A binary string."
1483 STRING = PN_STRING; "A unicode string."
1484 SYMBOL = PN_SYMBOL; "A symbolic string."
1485 DESCRIBED = PN_DESCRIBED; "A described value."
1486 ARRAY = PN_ARRAY; "An array value."
1487 LIST = PN_LIST; "A list value."
1488 MAP = PN_MAP; "A map value."
1489
1490 type_names = {
1491 NULL: "null",
1492 BOOL: "bool",
1493 BYTE: "byte",
1494 UBYTE: "ubyte",
1495 SHORT: "short",
1496 USHORT: "ushort",
1497 INT: "int",
1498 UINT: "uint",
1499 CHAR: "char",
1500 LONG: "long",
1501 ULONG: "ulong",
1502 TIMESTAMP: "timestamp",
1503 FLOAT: "float",
1504 DOUBLE: "double",
1505 DECIMAL32: "decimal32",
1506 DECIMAL64: "decimal64",
1507 DECIMAL128: "decimal128",
1508 UUID: "uuid",
1509 BINARY: "binary",
1510 STRING: "string",
1511 SYMBOL: "symbol",
1512 DESCRIBED: "described",
1513 ARRAY: "array",
1514 LIST: "list",
1515 MAP: "map"
1516 }
1517
1518 @classmethod
1520
1528
1530 if self._free and hasattr(self, "_data"):
1531 pn_data_free(self._data)
1532 del self._data
1533
1535 if err < 0:
1536 exc = EXCEPTIONS.get(err, DataException)
1537 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1538 else:
1539 return err
1540
1542 """
1543 Clears the data object.
1544 """
1545 pn_data_clear(self._data)
1546
1548 """
1549 Clears current node and sets the parent to the root node. Clearing the
1550 current node sets it _before_ the first node, calling next() will advance to
1551 the first node.
1552 """
1553 assert self._data is not None
1554 pn_data_rewind(self._data)
1555
1557 """
1558 Advances the current node to its next sibling and returns its
1559 type. If there is no next sibling the current node remains
1560 unchanged and None is returned.
1561 """
1562 found = pn_data_next(self._data)
1563 if found:
1564 return self.type()
1565 else:
1566 return None
1567
1569 """
1570 Advances the current node to its previous sibling and returns its
1571 type. If there is no previous sibling the current node remains
1572 unchanged and None is returned.
1573 """
1574 found = pn_data_prev(self._data)
1575 if found:
1576 return self.type()
1577 else:
1578 return None
1579
1581 """
1582 Sets the parent node to the current node and clears the current node.
1583 Clearing the current node sets it _before_ the first child,
1584 call next() advances to the first child.
1585 """
1586 return pn_data_enter(self._data)
1587
1589 """
1590 Sets the current node to the parent node and the parent node to
1591 its own parent.
1592 """
1593 return pn_data_exit(self._data)
1594
1596 return pn_data_lookup(self._data, name)
1597
1599 pn_data_narrow(self._data)
1600
1602 pn_data_widen(self._data)
1603
1605 """
1606 Returns the type of the current node.
1607 """
1608 dtype = pn_data_type(self._data)
1609 if dtype == -1:
1610 return None
1611 else:
1612 return dtype
1613
1615 """
1616 Returns the size in bytes needed to encode the data in AMQP format.
1617 """
1618 return pn_data_encoded_size(self._data)
1619
1621 """
1622 Returns a representation of the data encoded in AMQP format.
1623 """
1624 size = 1024
1625 while True:
1626 cd, enc = pn_data_encode(self._data, size)
1627 if cd == PN_OVERFLOW:
1628 size *= 2
1629 elif cd >= 0:
1630 return enc
1631 else:
1632 self._check(cd)
1633
1635 """
1636 Decodes the first value from supplied AMQP data and returns the
1637 number of bytes consumed.
1638
1639 @type encoded: binary
1640 @param encoded: AMQP encoded binary data
1641 """
1642 return self._check(pn_data_decode(self._data, encoded))
1643
1645 """
1646 Puts a list value. Elements may be filled by entering the list
1647 node and putting element values.
1648
1649 >>> data = Data()
1650 >>> data.put_list()
1651 >>> data.enter()
1652 >>> data.put_int(1)
1653 >>> data.put_int(2)
1654 >>> data.put_int(3)
1655 >>> data.exit()
1656 """
1657 self._check(pn_data_put_list(self._data))
1658
1660 """
1661 Puts a map value. Elements may be filled by entering the map node
1662 and putting alternating key value pairs.
1663
1664 >>> data = Data()
1665 >>> data.put_map()
1666 >>> data.enter()
1667 >>> data.put_string("key")
1668 >>> data.put_string("value")
1669 >>> data.exit()
1670 """
1671 self._check(pn_data_put_map(self._data))
1672
1673 - def put_array(self, described, element_type):
1674 """
1675 Puts an array value. Elements may be filled by entering the array
1676 node and putting the element values. The values must all be of the
1677 specified array element type. If an array is described then the
1678 first child value of the array is the descriptor and may be of any
1679 type.
1680
1681 >>> data = Data()
1682 >>>
1683 >>> data.put_array(False, Data.INT)
1684 >>> data.enter()
1685 >>> data.put_int(1)
1686 >>> data.put_int(2)
1687 >>> data.put_int(3)
1688 >>> data.exit()
1689 >>>
1690 >>> data.put_array(True, Data.DOUBLE)
1691 >>> data.enter()
1692 >>> data.put_symbol("array-descriptor")
1693 >>> data.put_double(1.1)
1694 >>> data.put_double(1.2)
1695 >>> data.put_double(1.3)
1696 >>> data.exit()
1697
1698 @type described: bool
1699 @param described: specifies whether the array is described
1700 @type element_type: int
1701 @param element_type: the type of the array elements
1702 """
1703 self._check(pn_data_put_array(self._data, described, element_type))
1704
1706 """
1707 Puts a described value. A described node has two children, the
1708 descriptor and the value. These are specified by entering the node
1709 and putting the desired values.
1710
1711 >>> data = Data()
1712 >>> data.put_described()
1713 >>> data.enter()
1714 >>> data.put_symbol("value-descriptor")
1715 >>> data.put_string("the value")
1716 >>> data.exit()
1717 """
1718 self._check(pn_data_put_described(self._data))
1719
1721 """
1722 Puts a null value.
1723 """
1724 self._check(pn_data_put_null(self._data))
1725
1727 """
1728 Puts a boolean value.
1729
1730 @param b: a boolean value
1731 """
1732 self._check(pn_data_put_bool(self._data, b))
1733
1735 """
1736 Puts an unsigned byte value.
1737
1738 @param ub: an integral value
1739 """
1740 self._check(pn_data_put_ubyte(self._data, ub))
1741
1743 """
1744 Puts a signed byte value.
1745
1746 @param b: an integral value
1747 """
1748 self._check(pn_data_put_byte(self._data, b))
1749
1751 """
1752 Puts an unsigned short value.
1753
1754 @param us: an integral value.
1755 """
1756 self._check(pn_data_put_ushort(self._data, us))
1757
1759 """
1760 Puts a signed short value.
1761
1762 @param s: an integral value
1763 """
1764 self._check(pn_data_put_short(self._data, s))
1765
1767 """
1768 Puts an unsigned int value.
1769
1770 @param ui: an integral value
1771 """
1772 self._check(pn_data_put_uint(self._data, ui))
1773
1775 """
1776 Puts a signed int value.
1777
1778 @param i: an integral value
1779 """
1780 self._check(pn_data_put_int(self._data, i))
1781
1783 """
1784 Puts a char value.
1785
1786 @param c: a single character
1787 """
1788 self._check(pn_data_put_char(self._data, ord(c)))
1789
1791 """
1792 Puts an unsigned long value.
1793
1794 @param ul: an integral value
1795 """
1796 self._check(pn_data_put_ulong(self._data, ul))
1797
1799 """
1800 Puts a signed long value.
1801
1802 @param l: an integral value
1803 """
1804 self._check(pn_data_put_long(self._data, l))
1805
1807 """
1808 Puts a timestamp value.
1809
1810 @param t: an integral value
1811 """
1812 self._check(pn_data_put_timestamp(self._data, t))
1813
1815 """
1816 Puts a float value.
1817
1818 @param f: a floating point value
1819 """
1820 self._check(pn_data_put_float(self._data, f))
1821
1823 """
1824 Puts a double value.
1825
1826 @param d: a floating point value.
1827 """
1828 self._check(pn_data_put_double(self._data, d))
1829
1831 """
1832 Puts a decimal32 value.
1833
1834 @param d: a decimal32 value
1835 """
1836 self._check(pn_data_put_decimal32(self._data, d))
1837
1839 """
1840 Puts a decimal64 value.
1841
1842 @param d: a decimal64 value
1843 """
1844 self._check(pn_data_put_decimal64(self._data, d))
1845
1847 """
1848 Puts a decimal128 value.
1849
1850 @param d: a decimal128 value
1851 """
1852 self._check(pn_data_put_decimal128(self._data, d))
1853
1855 """
1856 Puts a UUID value.
1857
1858 @param u: a uuid value
1859 """
1860 self._check(pn_data_put_uuid(self._data, u.bytes))
1861
1863 """
1864 Puts a binary value.
1865
1866 @type b: binary
1867 @param b: a binary value
1868 """
1869 self._check(pn_data_put_binary(self._data, b))
1870
1872 """Put a python memoryview object as an AMQP binary value"""
1873 self.put_binary(mv.tobytes())
1874
1876 """Put a python buffer object as an AMQP binary value"""
1877 self.put_binary(bytes(buff))
1878
1880 """
1881 Puts a unicode value.
1882
1883 @type s: unicode
1884 @param s: a unicode value
1885 """
1886 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1887
1889 """
1890 Puts a symbolic value.
1891
1892 @type s: string
1893 @param s: the symbol name
1894 """
1895 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1896
1898 """
1899 If the current node is a list, return the number of elements,
1900 otherwise return zero. List elements can be accessed by entering
1901 the list.
1902
1903 >>> count = data.get_list()
1904 >>> data.enter()
1905 >>> for i in range(count):
1906 ... type = data.next()
1907 ... if type == Data.STRING:
1908 ... print data.get_string()
1909 ... elif type == ...:
1910 ... ...
1911 >>> data.exit()
1912 """
1913 return pn_data_get_list(self._data)
1914
1916 """
1917 If the current node is a map, return the number of child elements,
1918 otherwise return zero. Key value pairs can be accessed by entering
1919 the map.
1920
1921 >>> count = data.get_map()
1922 >>> data.enter()
1923 >>> for i in range(count/2):
1924 ... type = data.next()
1925 ... if type == Data.STRING:
1926 ... print data.get_string()
1927 ... elif type == ...:
1928 ... ...
1929 >>> data.exit()
1930 """
1931 return pn_data_get_map(self._data)
1932
1934 """
1935 If the current node is an array, return a tuple of the element
1936 count, a boolean indicating whether the array is described, and
1937 the type of each element, otherwise return (0, False, None). Array
1938 data can be accessed by entering the array.
1939
1940 >>> # read an array of strings with a symbolic descriptor
1941 >>> count, described, type = data.get_array()
1942 >>> data.enter()
1943 >>> data.next()
1944 >>> print "Descriptor:", data.get_symbol()
1945 >>> for i in range(count):
1946 ... data.next()
1947 ... print "Element:", data.get_string()
1948 >>> data.exit()
1949 """
1950 count = pn_data_get_array(self._data)
1951 described = pn_data_is_array_described(self._data)
1952 type = pn_data_get_array_type(self._data)
1953 if type == -1:
1954 type = None
1955 return count, described, type
1956
1958 """
1959 Checks if the current node is a described value. The descriptor
1960 and value may be accessed by entering the described value.
1961
1962 >>> # read a symbolically described string
1963 >>> assert data.is_described() # will error if the current node is not described
1964 >>> data.enter()
1965 >>> data.next()
1966 >>> print data.get_symbol()
1967 >>> data.next()
1968 >>> print data.get_string()
1969 >>> data.exit()
1970 """
1971 return pn_data_is_described(self._data)
1972
1974 """
1975 Checks if the current node is a null.
1976 """
1977 return pn_data_is_null(self._data)
1978
1980 """
1981 If the current node is a boolean, returns its value, returns False
1982 otherwise.
1983 """
1984 return pn_data_get_bool(self._data)
1985
1987 """
1988 If the current node is an unsigned byte, returns its value,
1989 returns 0 otherwise.
1990 """
1991 return ubyte(pn_data_get_ubyte(self._data))
1992
1994 """
1995 If the current node is a signed byte, returns its value, returns 0
1996 otherwise.
1997 """
1998 return byte(pn_data_get_byte(self._data))
1999
2001 """
2002 If the current node is an unsigned short, returns its value,
2003 returns 0 otherwise.
2004 """
2005 return ushort(pn_data_get_ushort(self._data))
2006
2008 """
2009 If the current node is a signed short, returns its value, returns
2010 0 otherwise.
2011 """
2012 return short(pn_data_get_short(self._data))
2013
2015 """
2016 If the current node is an unsigned int, returns its value, returns
2017 0 otherwise.
2018 """
2019 return uint(pn_data_get_uint(self._data))
2020
2022 """
2023 If the current node is a signed int, returns its value, returns 0
2024 otherwise.
2025 """
2026 return int32(pn_data_get_int(self._data))
2027
2029 """
2030 If the current node is a char, returns its value, returns 0
2031 otherwise.
2032 """
2033 return char(_compat.unichar(pn_data_get_char(self._data)))
2034
2036 """
2037 If the current node is an unsigned long, returns its value,
2038 returns 0 otherwise.
2039 """
2040 return ulong(pn_data_get_ulong(self._data))
2041
2043 """
2044 If the current node is an signed long, returns its value, returns
2045 0 otherwise.
2046 """
2047 return long(pn_data_get_long(self._data))
2048
2050 """
2051 If the current node is a timestamp, returns its value, returns 0
2052 otherwise.
2053 """
2054 return timestamp(pn_data_get_timestamp(self._data))
2055
2057 """
2058 If the current node is a float, returns its value, raises 0
2059 otherwise.
2060 """
2061 return float32(pn_data_get_float(self._data))
2062
2064 """
2065 If the current node is a double, returns its value, returns 0
2066 otherwise.
2067 """
2068 return pn_data_get_double(self._data)
2069
2070
2072 """
2073 If the current node is a decimal32, returns its value, returns 0
2074 otherwise.
2075 """
2076 return decimal32(pn_data_get_decimal32(self._data))
2077
2078
2080 """
2081 If the current node is a decimal64, returns its value, returns 0
2082 otherwise.
2083 """
2084 return decimal64(pn_data_get_decimal64(self._data))
2085
2086
2088 """
2089 If the current node is a decimal128, returns its value, returns 0
2090 otherwise.
2091 """
2092 return decimal128(pn_data_get_decimal128(self._data))
2093
2095 """
2096 If the current node is a UUID, returns its value, returns None
2097 otherwise.
2098 """
2099 if pn_data_type(self._data) == Data.UUID:
2100 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
2101 else:
2102 return None
2103
2105 """
2106 If the current node is binary, returns its value, returns ""
2107 otherwise.
2108 """
2109 return pn_data_get_binary(self._data)
2110
2112 """
2113 If the current node is a string, returns its value, returns ""
2114 otherwise.
2115 """
2116 return pn_data_get_string(self._data).decode("utf8")
2117
2119 """
2120 If the current node is a symbol, returns its value, returns ""
2121 otherwise.
2122 """
2123 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2124
2125 - def copy(self, src):
2126 self._check(pn_data_copy(self._data, src._data))
2127
2138
2140 pn_data_dump(self._data)
2141
2151
2153 if self.enter():
2154 try:
2155 result = {}
2156 while self.next():
2157 k = self.get_object()
2158 if self.next():
2159 v = self.get_object()
2160 else:
2161 v = None
2162 result[k] = v
2163 finally:
2164 self.exit()
2165 return result
2166
2175
2177 if self.enter():
2178 try:
2179 result = []
2180 while self.next():
2181 result.append(self.get_object())
2182 finally:
2183 self.exit()
2184 return result
2185
2196
2205
2207 """
2208 If the current node is an array, return an Array object
2209 representing the array and its contents. Otherwise return None.
2210 This is a convenience wrapper around get_array, enter, etc.
2211 """
2212
2213 count, described, type = self.get_array()
2214 if type is None: return None
2215 if self.enter():
2216 try:
2217 if described:
2218 self.next()
2219 descriptor = self.get_object()
2220 else:
2221 descriptor = UNDESCRIBED
2222 elements = []
2223 while self.next():
2224 elements.append(self.get_object())
2225 finally:
2226 self.exit()
2227 return Array(descriptor, type, *elements)
2228
2240
2241 put_mappings = {
2242 None.__class__: lambda s, _: s.put_null(),
2243 bool: put_bool,
2244 ubyte: put_ubyte,
2245 ushort: put_ushort,
2246 uint: put_uint,
2247 ulong: put_ulong,
2248 byte: put_byte,
2249 short: put_short,
2250 int32: put_int,
2251 long: put_long,
2252 float32: put_float,
2253 float: put_double,
2254 decimal32: put_decimal32,
2255 decimal64: put_decimal64,
2256 decimal128: put_decimal128,
2257 char: put_char,
2258 timestamp: put_timestamp,
2259 uuid.UUID: put_uuid,
2260 bytes: put_binary,
2261 unicode: put_string,
2262 symbol: put_symbol,
2263 list: put_sequence,
2264 tuple: put_sequence,
2265 dict: put_dict,
2266 Described: put_py_described,
2267 Array: put_py_array
2268 }
2269
2270
2271 if int not in put_mappings:
2272 put_mappings[int] = put_int
2273
2274 try: put_mappings[memoryview] = put_memoryview
2275 except NameError: pass
2276 try: put_mappings[buffer] = put_buffer
2277 except NameError: pass
2278 get_mappings = {
2279 NULL: lambda s: None,
2280 BOOL: get_bool,
2281 BYTE: get_byte,
2282 UBYTE: get_ubyte,
2283 SHORT: get_short,
2284 USHORT: get_ushort,
2285 INT: get_int,
2286 UINT: get_uint,
2287 CHAR: get_char,
2288 LONG: get_long,
2289 ULONG: get_ulong,
2290 TIMESTAMP: get_timestamp,
2291 FLOAT: get_float,
2292 DOUBLE: get_double,
2293 DECIMAL32: get_decimal32,
2294 DECIMAL64: get_decimal64,
2295 DECIMAL128: get_decimal128,
2296 UUID: get_uuid,
2297 BINARY: get_binary,
2298 STRING: get_string,
2299 SYMBOL: get_symbol,
2300 DESCRIBED: get_py_described,
2301 ARRAY: get_py_array,
2302 LIST: get_sequence,
2303 MAP: get_dict
2304 }
2305
2306
2308 putter = self.put_mappings[obj.__class__]
2309 putter(self, obj)
2310
2312 type = self.type()
2313 if type is None: return None
2314 getter = self.get_mappings.get(type)
2315 if getter:
2316 return getter(self)
2317 else:
2318 return UnmappedType(str(type))
2319
2322
2324
2325 LOCAL_UNINIT = PN_LOCAL_UNINIT
2326 REMOTE_UNINIT = PN_REMOTE_UNINIT
2327 LOCAL_ACTIVE = PN_LOCAL_ACTIVE
2328 REMOTE_ACTIVE = PN_REMOTE_ACTIVE
2329 LOCAL_CLOSED = PN_LOCAL_CLOSED
2330 REMOTE_CLOSED = PN_REMOTE_CLOSED
2331
2334
2336 obj2cond(self.condition, self._get_cond_impl())
2337
2338 @property
2340 return cond2obj(self._get_remote_cond_impl())
2341
2342
2344 assert False, "Subclass must override this!"
2345
2347 assert False, "Subclass must override this!"
2348
2358
2370
2371 handler = property(_get_handler, _set_handler)
2372
2373 @property
2376
2378
2379 - def __init__(self, name, description=None, info=None):
2380 self.name = name
2381 self.description = description
2382 self.info = info
2383
2385 return "Condition(%s)" % ", ".join([repr(x) for x in
2386 (self.name, self.description, self.info)
2387 if x])
2388
2390 if not isinstance(o, Condition): return False
2391 return self.name == o.name and \
2392 self.description == o.description and \
2393 self.info == o.info
2394
2396 pn_condition_clear(cond)
2397 if obj:
2398 pn_condition_set_name(cond, str(obj.name))
2399 pn_condition_set_description(cond, obj.description)
2400 info = Data(pn_condition_info(cond))
2401 if obj.info:
2402 info.put_object(obj.info)
2403
2405 if pn_condition_is_set(cond):
2406 return Condition(pn_condition_get_name(cond),
2407 pn_condition_get_description(cond),
2408 dat2obj(pn_condition_info(cond)))
2409 else:
2410 return None
2411
2420
2425
2427 return long(secs*1000)
2428
2430 return float(millis)/1000.0
2431
2433 if secs is None: return PN_MILLIS_MAX
2434 return secs2millis(secs)
2435
2437 if millis == PN_MILLIS_MAX: return None
2438 return millis2secs(millis)
2439
2441 """Some Proton APIs expect a null terminated string. Convert python text
2442 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings.
2443 This method will throw if the string cannot be converted.
2444 """
2445 if string is None:
2446 return None
2447 if _compat.IS_PY2:
2448 if isinstance(string, unicode):
2449 return string.encode('utf-8')
2450 elif isinstance(string, str):
2451 return string
2452 else:
2453
2454 if isinstance(string, str):
2455 string = string.encode('utf-8')
2456
2457 if isinstance(string, bytes):
2458 return string.decode('utf-8')
2459 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2460
2462 """Covert C strings returned from proton-c into python unicode"""
2463 if string is None:
2464 return None
2465 if isinstance(string, _compat.TEXT_TYPES):
2466
2467 return string
2468 elif isinstance(string, _compat.BINARY_TYPES):
2469 return string.decode('utf8')
2470 else:
2471 raise TypeError("Unrecognized string type")
2472
2474 """
2475 A representation of an AMQP connection
2476 """
2477
2478 @staticmethod
2480 if impl is None:
2481 return None
2482 else:
2483 return Connection(impl)
2484
2485 - def __init__(self, impl = pn_connection):
2487
2489 Endpoint._init(self)
2490 self.offered_capabilities = None
2491 self.desired_capabilities = None
2492 self.properties = None
2493
2495 return pn_connection_attachments(self._impl)
2496
2497 @property
2500
2501 @property
2504
2506 if err < 0:
2507 exc = EXCEPTIONS.get(err, ConnectionException)
2508 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl)))
2509 else:
2510 return err
2511
2513 return pn_connection_condition(self._impl)
2514
2516 return pn_connection_remote_condition(self._impl)
2517
2519 if collector is None:
2520 pn_connection_collect(self._impl, None)
2521 else:
2522 pn_connection_collect(self._impl, collector._impl)
2523 self._collector = weakref.ref(collector)
2524
2526 return utf82unicode(pn_connection_get_container(self._impl))
2528 return pn_connection_set_container(self._impl, unicode2utf8(name))
2529
2530 container = property(_get_container, _set_container)
2531
2533 return utf82unicode(pn_connection_get_hostname(self._impl))
2535 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2536
2537 hostname = property(_get_hostname, _set_hostname,
2538 doc="""
2539 Set the name of the host (either fully qualified or relative) to which this
2540 connection is connecting to. This information may be used by the remote
2541 peer to determine the correct back-end service to connect the client to.
2542 This value will be sent in the Open performative, and will be used by SSL
2543 and SASL layers to identify the peer.
2544 """)
2545
2547 return utf82unicode(pn_connection_get_user(self._impl))
2549 return pn_connection_set_user(self._impl, unicode2utf8(name))
2550
2551 user = property(_get_user, _set_user)
2552
2556 return pn_connection_set_password(self._impl, unicode2utf8(name))
2557
2558 password = property(_get_password, _set_password)
2559
2560 @property
2562 """The container identifier specified by the remote peer for this connection."""
2563 return pn_connection_remote_container(self._impl)
2564
2565 @property
2567 """The hostname specified by the remote peer for this connection."""
2568 return pn_connection_remote_hostname(self._impl)
2569
2570 @property
2572 """The capabilities offered by the remote peer for this connection."""
2573 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2574
2575 @property
2577 """The capabilities desired by the remote peer for this connection."""
2578 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2579
2580 @property
2582 """The properties specified by the remote peer for this connection."""
2583 return dat2obj(pn_connection_remote_properties(self._impl))
2584
2586 """
2587 Opens the connection.
2588
2589 In more detail, this moves the local state of the connection to
2590 the ACTIVE state and triggers an open frame to be sent to the
2591 peer. A connection is fully active once both peers have opened it.
2592 """
2593 obj2dat(self.offered_capabilities,
2594 pn_connection_offered_capabilities(self._impl))
2595 obj2dat(self.desired_capabilities,
2596 pn_connection_desired_capabilities(self._impl))
2597 obj2dat(self.properties, pn_connection_properties(self._impl))
2598 pn_connection_open(self._impl)
2599
2601 """
2602 Closes the connection.
2603
2604 In more detail, this moves the local state of the connection to
2605 the CLOSED state and triggers a close frame to be sent to the
2606 peer. A connection is fully closed once both peers have closed it.
2607 """
2608 self._update_cond()
2609 pn_connection_close(self._impl)
2610 if hasattr(self, '_session_policy'):
2611
2612 del self._session_policy
2613
2614 @property
2616 """
2617 The state of the connection as a bit field. The state has a local
2618 and a remote component. Each of these can be in one of three
2619 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2620 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2621 REMOTE_ACTIVE and REMOTE_CLOSED.
2622 """
2623 return pn_connection_state(self._impl)
2624
2626 """
2627 Returns a new session on this connection.
2628 """
2629 ssn = pn_session(self._impl)
2630 if ssn is None:
2631 raise(SessionException("Session allocation failed."))
2632 else:
2633 return Session(ssn)
2634
2636 return Session.wrap(pn_session_head(self._impl, mask))
2637
2639 return Link.wrap(pn_link_head(self._impl, mask))
2640
2641 @property
2644
2645 @property
2647 return pn_error_code(pn_connection_error(self._impl))
2648
2650 pn_connection_release(self._impl)
2651
2654
2656
2657 @staticmethod
2659 if impl is None:
2660 return None
2661 else:
2662 return Session(impl)
2663
2666
2668 return pn_session_attachments(self._impl)
2669
2671 return pn_session_condition(self._impl)
2672
2674 return pn_session_remote_condition(self._impl)
2675
2677 return pn_session_get_incoming_capacity(self._impl)
2678
2680 pn_session_set_incoming_capacity(self._impl, capacity)
2681
2682 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2683
2685 return pn_session_get_outgoing_window(self._impl)
2686
2688 pn_session_set_outgoing_window(self._impl, window)
2689
2690 outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
2691
2692 @property
2694 return pn_session_outgoing_bytes(self._impl)
2695
2696 @property
2698 return pn_session_incoming_bytes(self._impl)
2699
2701 pn_session_open(self._impl)
2702
2704 self._update_cond()
2705 pn_session_close(self._impl)
2706
2707 - def next(self, mask):
2708 return Session.wrap(pn_session_next(self._impl, mask))
2709
2710 @property
2712 return pn_session_state(self._impl)
2713
2714 @property
2717
2719 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2720
2722 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2723
2725 pn_session_free(self._impl)
2726
2729
2730 -class Link(Wrapper, Endpoint):
2731 """
2732 A representation of an AMQP link, of which there are two concrete
2733 implementations, Sender and Receiver.
2734 """
2735
2736 SND_UNSETTLED = PN_SND_UNSETTLED
2737 SND_SETTLED = PN_SND_SETTLED
2738 SND_MIXED = PN_SND_MIXED
2739
2740 RCV_FIRST = PN_RCV_FIRST
2741 RCV_SECOND = PN_RCV_SECOND
2742
2743 @staticmethod
2745 if impl is None: return None
2746 if pn_link_is_sender(impl):
2747 return Sender(impl)
2748 else:
2749 return Receiver(impl)
2750
2753
2755 return pn_link_attachments(self._impl)
2756
2758 if err < 0:
2759 exc = EXCEPTIONS.get(err, LinkException)
2760 raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
2761 else:
2762 return err
2763
2765 return pn_link_condition(self._impl)
2766
2768 return pn_link_remote_condition(self._impl)
2769
2771 """
2772 Opens the link.
2773
2774 In more detail, this moves the local state of the link to the
2775 ACTIVE state and triggers an attach frame to be sent to the
2776 peer. A link is fully active once both peers have attached it.
2777 """
2778 pn_link_open(self._impl)
2779
2781 """
2782 Closes the link.
2783
2784 In more detail, this moves the local state of the link to the
2785 CLOSED state and triggers an detach frame (with the closed flag
2786 set) to be sent to the peer. A link is fully closed once both
2787 peers have detached it.
2788 """
2789 self._update_cond()
2790 pn_link_close(self._impl)
2791
2792 @property
2794 """
2795 The state of the link as a bit field. The state has a local
2796 and a remote component. Each of these can be in one of three
2797 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking
2798 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT,
2799 REMOTE_ACTIVE and REMOTE_CLOSED.
2800 """
2801 return pn_link_state(self._impl)
2802
2803 @property
2805 """The source of the link as described by the local peer."""
2806 return Terminus(pn_link_source(self._impl))
2807
2808 @property
2810 """The target of the link as described by the local peer."""
2811 return Terminus(pn_link_target(self._impl))
2812
2813 @property
2815 """The source of the link as described by the remote peer."""
2816 return Terminus(pn_link_remote_source(self._impl))
2817 @property
2819 """The target of the link as described by the remote peer."""
2820 return Terminus(pn_link_remote_target(self._impl))
2821
2822 @property
2825
2826 @property
2828 """The connection on which this link was attached."""
2829 return self.session.connection
2830
2833
2834 @property
2837
2839 return pn_link_advance(self._impl)
2840
2841 @property
2843 return pn_link_unsettled(self._impl)
2844
2845 @property
2847 """The amount of outstanding credit on this link."""
2848 return pn_link_credit(self._impl)
2849
2850 @property
2852 return pn_link_available(self._impl)
2853
2854 @property
2856 return pn_link_queued(self._impl)
2857
2858 - def next(self, mask):
2859 return Link.wrap(pn_link_next(self._impl, mask))
2860
2861 @property
2863 """Returns the name of the link"""
2864 return utf82unicode(pn_link_name(self._impl))
2865
2866 @property
2868 """Returns true if this link is a sender."""
2869 return pn_link_is_sender(self._impl)
2870
2871 @property
2873 """Returns true if this link is a receiver."""
2874 return pn_link_is_receiver(self._impl)
2875
2876 @property
2878 return pn_link_remote_snd_settle_mode(self._impl)
2879
2880 @property
2882 return pn_link_remote_rcv_settle_mode(self._impl)
2883
2885 return pn_link_snd_settle_mode(self._impl)
2887 pn_link_set_snd_settle_mode(self._impl, mode)
2888 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2889
2891 return pn_link_rcv_settle_mode(self._impl)
2893 pn_link_set_rcv_settle_mode(self._impl, mode)
2894 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2895
2897 return pn_link_get_drain(self._impl)
2898
2900 pn_link_set_drain(self._impl, bool(b))
2901
2902 drain_mode = property(_get_drain, _set_drain)
2903
2905 return pn_link_drained(self._impl)
2906
2907 @property
2909 return pn_link_remote_max_message_size(self._impl)
2910
2912 return pn_link_max_message_size(self._impl)
2914 pn_link_set_max_message_size(self._impl, mode)
2915 max_message_size = property(_get_max_message_size, _set_max_message_size)
2916
2918 return pn_link_detach(self._impl)
2919
2921 pn_link_free(self._impl)
2922
2924
2925 UNSPECIFIED = PN_UNSPECIFIED
2926 SOURCE = PN_SOURCE
2927 TARGET = PN_TARGET
2928 COORDINATOR = PN_COORDINATOR
2929
2930 NONDURABLE = PN_NONDURABLE
2931 CONFIGURATION = PN_CONFIGURATION
2932 DELIVERIES = PN_DELIVERIES
2933
2934 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2935 DIST_MODE_COPY = PN_DIST_MODE_COPY
2936 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2937
2938 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK
2939 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION
2940 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION
2941 EXPIRE_NEVER = PN_EXPIRE_NEVER
2942
2945
2947 if err < 0:
2948 exc = EXCEPTIONS.get(err, LinkException)
2949 raise exc("[%s]" % err)
2950 else:
2951 return err
2952
2954 return pn_terminus_get_type(self._impl)
2956 self._check(pn_terminus_set_type(self._impl, type))
2957 type = property(_get_type, _set_type)
2958
2960 """The address that identifies the source or target node"""
2961 return utf82unicode(pn_terminus_get_address(self._impl))
2963 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2964 address = property(_get_address, _set_address)
2965
2967 return pn_terminus_get_durability(self._impl)
2969 self._check(pn_terminus_set_durability(self._impl, seconds))
2970 durability = property(_get_durability, _set_durability)
2971
2973 return pn_terminus_get_expiry_policy(self._impl)
2975 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2976 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2977
2979 return pn_terminus_get_timeout(self._impl)
2981 self._check(pn_terminus_set_timeout(self._impl, seconds))
2982 timeout = property(_get_timeout, _set_timeout)
2983
2985 """Indicates whether the source or target node was dynamically
2986 created"""
2987 return pn_terminus_is_dynamic(self._impl)
2989 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2990 dynamic = property(_is_dynamic, _set_dynamic)
2991
2993 return pn_terminus_get_distribution_mode(self._impl)
2995 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2996 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2997
2998 @property
3000 """Properties of a dynamic source or target."""
3001 return Data(pn_terminus_properties(self._impl))
3002
3003 @property
3005 """Capabilities of the source or target."""
3006 return Data(pn_terminus_capabilities(self._impl))
3007
3008 @property
3010 return Data(pn_terminus_outcomes(self._impl))
3011
3012 @property
3014 """A filter on a source allows the set of messages transfered over
3015 the link to be restricted"""
3016 return Data(pn_terminus_filter(self._impl))
3017
3018 - def copy(self, src):
3019 self._check(pn_terminus_copy(self._impl, src._impl))
3020
3022 """
3023 A link over which messages are sent.
3024 """
3025
3027 pn_link_offered(self._impl, n)
3028
3030 """
3031 Send specified data as part of the current delivery
3032
3033 @type data: binary
3034 @param data: data to send
3035 """
3036 return self._check(pn_link_send(self._impl, data))
3037
3038 - def send(self, obj, tag=None):
3039 """
3040 Send specified object over this sender; the object is expected to
3041 have a send() method on it that takes the sender and an optional
3042 tag as arguments.
3043
3044 Where the object is a Message, this will send the message over
3045 this link, creating a new delivery for the purpose.
3046 """
3047 if hasattr(obj, 'send'):
3048 return obj.send(self, tag=tag)
3049 else:
3050
3051 return self.stream(obj)
3052
3054 if not hasattr(self, 'tag_generator'):
3055 def simple_tags():
3056 count = 1
3057 while True:
3058 yield str(count)
3059 count += 1
3060 self.tag_generator = simple_tags()
3061 return next(self.tag_generator)
3062
3064 """
3065 A link over which messages are received.
3066 """
3067
3068 - def flow(self, n):
3069 """Increases the credit issued to the remote sender by the specified number of messages."""
3070 pn_link_flow(self._impl, n)
3071
3072 - def recv(self, limit):
3073 n, binary = pn_link_recv(self._impl, limit)
3074 if n == PN_EOS:
3075 return None
3076 else:
3077 self._check(n)
3078 return binary
3079
3081 pn_link_drain(self._impl, n)
3082
3084 return pn_link_draining(self._impl)
3085
3087
3088 values = {}
3089
3091 ni = super(NamedInt, cls).__new__(cls, i)
3092 cls.values[i] = ni
3093 return ni
3094
3097
3100
3103
3104 @classmethod
3106 return cls.values.get(i, i)
3107
3110
3112
3113 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED")
3114 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED")
3115 REJECTED = DispositionType(PN_REJECTED, "REJECTED")
3116 RELEASED = DispositionType(PN_RELEASED, "RELEASED")
3117 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED")
3118
3120 self._impl = impl
3121 self.local = local
3122 self._data = None
3123 self._condition = None
3124 self._annotations = None
3125
3126 @property
3128 return DispositionType.get(pn_disposition_type(self._impl))
3129
3131 return pn_disposition_get_section_number(self._impl)
3133 pn_disposition_set_section_number(self._impl, n)
3134 section_number = property(_get_section_number, _set_section_number)
3135
3137 return pn_disposition_get_section_offset(self._impl)
3139 pn_disposition_set_section_offset(self._impl, n)
3140 section_offset = property(_get_section_offset, _set_section_offset)
3141
3143 return pn_disposition_is_failed(self._impl)
3145 pn_disposition_set_failed(self._impl, b)
3146 failed = property(_get_failed, _set_failed)
3147
3149 return pn_disposition_is_undeliverable(self._impl)
3151 pn_disposition_set_undeliverable(self._impl, b)
3152 undeliverable = property(_get_undeliverable, _set_undeliverable)
3153
3155 if self.local:
3156 return self._data
3157 else:
3158 return dat2obj(pn_disposition_data(self._impl))
3160 if self.local:
3161 self._data = obj
3162 else:
3163 raise AttributeError("data attribute is read-only")
3164 data = property(_get_data, _set_data)
3165
3167 if self.local:
3168 return self._annotations
3169 else:
3170 return dat2obj(pn_disposition_annotations(self._impl))
3172 if self.local:
3173 self._annotations = obj
3174 else:
3175 raise AttributeError("annotations attribute is read-only")
3176 annotations = property(_get_annotations, _set_annotations)
3177
3179 if self.local:
3180 return self._condition
3181 else:
3182 return cond2obj(pn_disposition_condition(self._impl))
3184 if self.local:
3185 self._condition = obj
3186 else:
3187 raise AttributeError("condition attribute is read-only")
3188 condition = property(_get_condition, _set_condition)
3189
3191 """
3192 Tracks and/or records the delivery of a message over a link.
3193 """
3194
3195 RECEIVED = Disposition.RECEIVED
3196 ACCEPTED = Disposition.ACCEPTED
3197 REJECTED = Disposition.REJECTED
3198 RELEASED = Disposition.RELEASED
3199 MODIFIED = Disposition.MODIFIED
3200
3201 @staticmethod
3203 if impl is None:
3204 return None
3205 else:
3206 return Delivery(impl)
3207
3210
3212 self.local = Disposition(pn_delivery_local(self._impl), True)
3213 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3214
3215 @property
3217 """The identifier for the delivery."""
3218 return pn_delivery_tag(self._impl)
3219
3220 @property
3222 """Returns true for an outgoing delivery to which data can now be written."""
3223 return pn_delivery_writable(self._impl)
3224
3225 @property
3227 """Returns true for an incoming delivery that has data to read."""
3228 return pn_delivery_readable(self._impl)
3229
3230 @property
3232 """Returns true if the state of the delivery has been updated
3233 (e.g. it has been settled and/or accepted, rejected etc)."""
3234 return pn_delivery_updated(self._impl)
3235
3237 """
3238 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED.
3239 """
3240 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
3241 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
3242 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
3243 pn_delivery_update(self._impl, state)
3244
3245 @property
3247 return pn_delivery_pending(self._impl)
3248
3249 @property
3251 """
3252 Returns true for an incoming delivery if not all the data is
3253 yet available.
3254 """
3255 return pn_delivery_partial(self._impl)
3256
3257 @property
3259 """Returns the local state of the delivery."""
3260 return DispositionType.get(pn_delivery_local_state(self._impl))
3261
3262 @property
3264 """
3265 Returns the state of the delivery as indicated by the remote
3266 peer.
3267 """
3268 return DispositionType.get(pn_delivery_remote_state(self._impl))
3269
3270 @property
3272 """
3273 Returns true if the delivery has been settled by the remote peer.
3274 """
3275 return pn_delivery_settled(self._impl)
3276
3278 """
3279 Settles the delivery locally. This indicates the application
3280 considers the delivery complete and does not wish to receive any
3281 further events about it. Every delivery should be settled locally.
3282 """
3283 pn_delivery_settle(self._impl)
3284
3285 @property
3287 """Returns true if the delivery has been aborted."""
3288 return pn_delivery_aborted(self._impl)
3289
3291 """
3292 Aborts the delivery. This indicates the application wishes to
3293 invalidate any data that may have already been sent on this delivery.
3294 The delivery cannot be aborted after it has been completely delivered.
3295 """
3296 pn_delivery_abort(self._impl)
3297
3298 @property
3301
3302 @property
3304 """
3305 Returns the link on which the delivery was sent or received.
3306 """
3307 return Link.wrap(pn_delivery_link(self._impl))
3308
3309 @property
3311 """
3312 Returns the session over which the delivery was sent or received.
3313 """
3314 return self.link.session
3315
3316 @property
3318 """
3319 Returns the connection over which the delivery was sent or received.
3320 """
3321 return self.session.connection
3322
3323 @property
3326
3329
3331
3334
3335 - def __call__(self, trans_impl, message):
3337
3339
3340 TRACE_OFF = PN_TRACE_OFF
3341 TRACE_DRV = PN_TRACE_DRV
3342 TRACE_FRM = PN_TRACE_FRM
3343 TRACE_RAW = PN_TRACE_RAW
3344
3345 CLIENT = 1
3346 SERVER = 2
3347
3348 @staticmethod
3350 if impl is None:
3351 return None
3352 else:
3353 return Transport(_impl=impl)
3354
3355 - def __init__(self, mode=None, _impl = pn_transport):
3363
3365 self._sasl = None
3366 self._ssl = None
3367
3369 if err < 0:
3370 exc = EXCEPTIONS.get(err, TransportException)
3371 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl))))
3372 else:
3373 return err
3374
3376 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3377
3379 adapter = pn_transport_get_pytracer(self._impl)
3380 if adapter:
3381 return adapter.tracer
3382 else:
3383 return None
3384
3385 tracer = property(_get_tracer, _set_tracer,
3386 doc="""
3387 A callback for trace logging. The callback is passed the transport and log message.
3388 """)
3389
3390 - def log(self, message):
3391 pn_transport_log(self._impl, message)
3392
3394 pn_transport_require_auth(self._impl, bool)
3395
3396 @property
3398 return pn_transport_is_authenticated(self._impl)
3399
3401 pn_transport_require_encryption(self._impl, bool)
3402
3403 @property
3405 return pn_transport_is_encrypted(self._impl)
3406
3407 @property
3409 return pn_transport_get_user(self._impl)
3410
3411 - def bind(self, connection):
3412 """Assign a connection to the transport"""
3413 self._check(pn_transport_bind(self._impl, connection._impl))
3414
3416 """Release the connection"""
3417 self._check(pn_transport_unbind(self._impl))
3418
3420 pn_transport_trace(self._impl, n)
3421
3422 - def tick(self, now):
3423 """Process any timed events (like heartbeat generation).
3424 now = seconds since epoch (float).
3425 """
3426 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3427
3429 c = pn_transport_capacity(self._impl)
3430 if c >= PN_EOS:
3431 return c
3432 else:
3433 return self._check(c)
3434
3435 - def push(self, binary):
3436 n = self._check(pn_transport_push(self._impl, binary))
3437 if n != len(binary):
3438 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3439
3441 self._check(pn_transport_close_tail(self._impl))
3442
3444 p = pn_transport_pending(self._impl)
3445 if p >= PN_EOS:
3446 return p
3447 else:
3448 return self._check(p)
3449
3450 - def peek(self, size):
3451 cd, out = pn_transport_peek(self._impl, size)
3452 if cd == PN_EOS:
3453 return None
3454 else:
3455 self._check(cd)
3456 return out
3457
3458 - def pop(self, size):
3459 pn_transport_pop(self._impl, size)
3460
3462 self._check(pn_transport_close_head(self._impl))
3463
3464 @property
3466 return pn_transport_closed(self._impl)
3467
3468
3470 return pn_transport_get_max_frame(self._impl)
3471
3473 pn_transport_set_max_frame(self._impl, value)
3474
3475 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
3476 doc="""
3477 Sets the maximum size for received frames (in bytes).
3478 """)
3479
3480 @property
3482 return pn_transport_get_remote_max_frame(self._impl)
3483
3485 return pn_transport_get_channel_max(self._impl)
3486
3488 if pn_transport_set_channel_max(self._impl, value):
3489 raise SessionException("Too late to change channel max.")
3490
3491 channel_max = property(_get_channel_max, _set_channel_max,
3492 doc="""
3493 Sets the maximum channel that may be used on the transport.
3494 """)
3495
3496 @property
3498 return pn_transport_remote_channel_max(self._impl)
3499
3500
3502 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3503
3505 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3506
3507 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
3508 doc="""
3509 The idle timeout of the connection (float, in seconds).
3510 """)
3511
3512 @property
3514 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3515
3516 @property
3518 return pn_transport_get_frames_output(self._impl)
3519
3520 @property
3523
3526
3527 - def ssl(self, domain=None, session_details=None):
3528
3529 if not self._ssl:
3530 self._ssl = SSL(self, domain, session_details)
3531 return self._ssl
3532
3533 @property
3535 return cond2obj(pn_transport_condition(self._impl))
3536
3537 @property
3540
3543
3544 -class SASL(Wrapper):
3545
3546 OK = PN_SASL_OK
3547 AUTH = PN_SASL_AUTH
3548 SYS = PN_SASL_SYS
3549 PERM = PN_SASL_PERM
3550 TEMP = PN_SASL_TEMP
3551
3552 @staticmethod
3554 return pn_sasl_extended()
3555
3559
3561 if err < 0:
3562 exc = EXCEPTIONS.get(err, SASLException)
3563 raise exc("[%s]" % (err))
3564 else:
3565 return err
3566
3567 @property
3569 return pn_sasl_get_user(self._sasl)
3570
3571 @property
3573 return pn_sasl_get_mech(self._sasl)
3574
3575 @property
3577 outcome = pn_sasl_outcome(self._sasl)
3578 if outcome == PN_SASL_NONE:
3579 return None
3580 else:
3581 return outcome
3582
3584 pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
3585
3587 return pn_sasl_get_allow_insecure_mechs(self._sasl)
3588
3590 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
3591
3592 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs,
3593 doc="""
3594 Allow unencrypted cleartext passwords (PLAIN mech)
3595 """)
3596
3597 - def done(self, outcome):
3598 pn_sasl_done(self._sasl, outcome)
3599
3601 pn_sasl_config_name(self._sasl, name)
3602
3604 pn_sasl_config_path(self._sasl, path)
3605
3608
3611
3612 -class SSLDomain(object):
3613
3614 MODE_CLIENT = PN_SSL_MODE_CLIENT
3615 MODE_SERVER = PN_SSL_MODE_SERVER
3616 VERIFY_PEER = PN_SSL_VERIFY_PEER
3617 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
3618 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
3619
3620 - def __init__(self, mode):
3621 self._domain = pn_ssl_domain(mode)
3622 if self._domain is None:
3623 raise SSLUnavailable()
3624
3625 - def _check(self, err):
3626 if err < 0:
3627 exc = EXCEPTIONS.get(err, SSLException)
3628 raise exc("SSL failure.")
3629 else:
3630 return err
3631
3632 - def set_credentials(self, cert_file, key_file, password):
3633 return self._check( pn_ssl_domain_set_credentials(self._domain,
3634 cert_file, key_file,
3635 password) )
3636 - def set_trusted_ca_db(self, certificate_db):
3637 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
3638 certificate_db) )
3639 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3640 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
3641 verify_mode,
3642 trusted_CAs) )
3643
3645 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3646
3647 - def __del__(self):
3648 pn_ssl_domain_free(self._domain)
3649
3651
3652 @staticmethod
3654 return pn_ssl_present()
3655
3662
3663 - def __new__(cls, transport, domain, session_details=None):
3664 """Enforce a singleton SSL object per Transport"""
3665 if transport._ssl:
3666
3667
3668
3669 ssl = transport._ssl
3670 if (domain and (ssl._domain is not domain) or
3671 session_details and (ssl._session_details is not session_details)):
3672 raise SSLException("Cannot re-configure existing SSL object!")
3673 else:
3674 obj = super(SSL, cls).__new__(cls)
3675 obj._domain = domain
3676 obj._session_details = session_details
3677 session_id = None
3678 if session_details:
3679 session_id = session_details.get_session_id()
3680 obj._ssl = pn_ssl( transport._impl )
3681 if obj._ssl is None:
3682 raise SSLUnavailable()
3683 if domain:
3684 pn_ssl_init( obj._ssl, domain._domain, session_id )
3685 transport._ssl = obj
3686 return transport._ssl
3687
3689 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3690 if rc:
3691 return name
3692 return None
3693
3695 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3696 if rc:
3697 return name
3698 return None
3699
3700 SHA1 = PN_SSL_SHA1
3701 SHA256 = PN_SSL_SHA256
3702 SHA512 = PN_SSL_SHA512
3703 MD5 = PN_SSL_MD5
3704
3705 CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME
3706 CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE
3707 CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY
3708 CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME
3709 CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT
3710 CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME
3711
3713 subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name)
3714 return subfield_value
3715
3717 subject = pn_ssl_get_remote_subject(self._ssl)
3718 return subject
3719
3723
3724
3727
3730
3733
3736
3739
3742
3744 rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name)
3745 if rc == PN_OK:
3746 return fingerprint_str
3747 return None
3748
3749
3752
3755
3759
3763
3766
3767 @property
3769 return pn_ssl_get_remote_subject( self._ssl )
3770
3771 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3772 RESUME_NEW = PN_SSL_RESUME_NEW
3773 RESUME_REUSED = PN_SSL_RESUME_REUSED
3774
3776 return pn_ssl_resume_status( self._ssl )
3777
3779 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3781 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3782 self._check(err)
3783 return utf82unicode(name)
3784 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3785 doc="""
3786 Manage the expected name of the remote peer. Used to authenticate the remote.
3787 """)
3788
3791 """ Unique identifier for the SSL session. Used to resume previous session on a new
3792 SSL connection.
3793 """
3794
3796 self._session_id = session_id
3797
3799 return self._session_id
3800
3801
3802 wrappers = {
3803 "pn_void": lambda x: pn_void2py(x),
3804 "pn_pyref": lambda x: pn_void2py(x),
3805 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)),
3806 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
3807 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
3808 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
3809 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
3810 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
3811 }
3814
3816 self._impl = pn_collector()
3817
3818 - def put(self, obj, etype):
3819 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3820
3822 return Event.wrap(pn_collector_peek(self._impl))
3823
3825 ev = self.peek()
3826 pn_collector_pop(self._impl)
3827
3829 pn_collector_free(self._impl)
3830 del self._impl
3831
3832 if "TypeExtender" not in globals():
3835 self.number = number
3837 try:
3838 return self.number
3839 finally:
3840 self.number += 1
3841
3843
3844 _lock = threading.Lock()
3845 _extended = TypeExtender(10000)
3846 TYPES = {}
3847
3848 - def __init__(self, name=None, number=None, method=None):
3849 if name is None and number is None:
3850 raise TypeError("extended events require a name")
3851 try:
3852 self._lock.acquire()
3853 if name is None:
3854 name = pn_event_type_name(number)
3855
3856 if number is None:
3857 number = self._extended.next()
3858
3859 if method is None:
3860 method = "on_%s" % name
3861
3862 self.name = name
3863 self.number = number
3864 self.method = method
3865
3866 self.TYPES[number] = self
3867 finally:
3868 self._lock.release()
3869
3872
3879
3881
3882 - def __init__(self, clazz, context, type):
3886
3889
3890 -def _none(x): return None
3891
3892 DELEGATED = Constant("DELEGATED")
3893
3894 -def _core(number, method):
3895 return EventType(number=number, method=method)
3896
3897 -class Event(Wrapper, EventBase):
3898
3899 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
3900 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
3901 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
3902
3903 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
3904
3905 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
3906 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound")
3907 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound")
3908 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open")
3909 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close")
3910 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open")
3911 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close")
3912 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final")
3913
3914 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init")
3915 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open")
3916 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close")
3917 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open")
3918 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close")
3919 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final")
3920
3921 LINK_INIT = _core(PN_LINK_INIT, "on_link_init")
3922 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open")
3923 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close")
3924 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach")
3925 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open")
3926 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close")
3927 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach")
3928 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow")
3929 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final")
3930
3931 DELIVERY = _core(PN_DELIVERY, "on_delivery")
3932
3933 TRANSPORT = _core(PN_TRANSPORT, "on_transport")
3934 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error")
3935 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed")
3936 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
3937 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
3938
3939 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
3940 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
3941 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
3942 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
3943 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
3944 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
3945 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
3946
3947 @staticmethod
3948 - def wrap(impl, number=None):
3949 if impl is None:
3950 return None
3951
3952 if number is None:
3953 number = pn_event_type(impl)
3954
3955 event = Event(impl, number)
3956
3957
3958
3959 if pn_event_class(impl) == PN_PYREF and \
3960 isinstance(event.context, EventBase):
3961 return event.context
3962 else:
3963 return event
3964
3968
3971
3975
3976 @property
3978 cls = pn_event_class(self._impl)
3979 if cls:
3980 return pn_class_name(cls)
3981 else:
3982 return None
3983
3984 @property
3986 return WrappedHandler.wrap(pn_event_root(self._impl))
3987
3988 @property
3989 - def context(self):
3990 """Returns the context object associated with the event. The type of this depend on the type of event."""
3991 return wrappers[self.clazz](pn_event_context(self._impl))
3992
3993 - def dispatch(self, handler, type=None):
4002
4003
4004 @property
4006 """Returns the reactor associated with the event."""
4007 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
4008
4010 r = self.reactor
4011 if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
4012 return r
4013 else:
4014 return super(Event, self).__getattr__(name)
4015
4016 @property
4018 """Returns the transport associated with the event, or null if none is associated with it."""
4019 return Transport.wrap(pn_event_transport(self._impl))
4020
4021 @property
4023 """Returns the connection associated with the event, or null if none is associated with it."""
4024 return Connection.wrap(pn_event_connection(self._impl))
4025
4026 @property
4028 """Returns the session associated with the event, or null if none is associated with it."""
4029 return Session.wrap(pn_event_session(self._impl))
4030
4031 @property
4033 """Returns the link associated with the event, or null if none is associated with it."""
4034 return Link.wrap(pn_event_link(self._impl))
4035
4036 @property
4038 """Returns the sender link associated with the event, or null if
4039 none is associated with it. This is essentially an alias for
4040 link(), that does an additional checkon the type of the
4041 link."""
4042 l = self.link
4043 if l and l.is_sender:
4044 return l
4045 else:
4046 return None
4047
4048 @property
4050 """Returns the receiver link associated with the event, or null if
4051 none is associated with it. This is essentially an alias for
4052 link(), that does an additional checkon the type of the link."""
4053 l = self.link
4054 if l and l.is_receiver:
4055 return l
4056 else:
4057 return None
4058
4059 @property
4061 """Returns the delivery associated with the event, or null if none is associated with it."""
4062 return Delivery.wrap(pn_event_delivery(self._impl))
4063
4066
4069 if obj is None:
4070 return self
4071 ret = []
4072 obj.__dict__['handlers'] = ret
4073 return ret
4074
4080
4082
4083 - def __init__(self, handler, on_error=None):
4086
4090
4096
4099 self.handlers = []
4100 self.delegate = weakref.ref(delegate)
4101
4103 delegate = self.delegate()
4104 if delegate:
4105 dispatch(delegate, method, event)
4106
4110 if obj is None:
4111 return None
4112 return self.surrogate(obj).handlers
4113
4115 self.surrogate(obj).handlers = value
4116
4118 key = "_surrogate"
4119 objdict = obj.__dict__
4120 surrogate = objdict.get(key, None)
4121 if surrogate is None:
4122 objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
4123 obj.add(surrogate)
4124 return surrogate
4125
4127
4128 handlers = WrappedHandlersProperty()
4129
4130 @classmethod
4131 - def wrap(cls, impl, on_error=None):
4138
4139 - def __init__(self, impl_or_constructor):
4140 Wrapper.__init__(self, impl_or_constructor)
4141 if list(self.__class__.__mro__).index(WrappedHandler) > 1:
4142
4143 self.handlers.extend([])
4144
4151
4152 - def add(self, handler, on_error=None):
4158
4160 pn_handler_clear(self._impl)
4161
4163 if obj is None:
4164 return None
4165 elif isinstance(obj, WrappedHandler):
4166 impl = obj._impl
4167 pn_incref(impl)
4168 return impl
4169 else:
4170 return pn_pyhandler(_cadapter(obj, on_error))
4171
4173 """
4174 Simple URL parser/constructor, handles URLs of the form:
4175
4176 <scheme>://<user>:<password>@<host>:<port>/<path>
4177
4178 All components can be None if not specified in the URL string.
4179
4180 The port can be specified as a service name, e.g. 'amqp' in the
4181 URL string but Url.port always gives the integer value.
4182
4183 Warning: The placement of user and password in URLs is not
4184 recommended. It can result in credentials leaking out in program
4185 logs. Use connection configuration attributes instead.
4186
4187 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps'
4188 @ivar user: Username
4189 @ivar password: Password
4190 @ivar host: Host name, ipv6 literal or ipv4 dotted quad.
4191 @ivar port: Integer port.
4192 @ivar host_port: Returns host:port
4193 """
4194
4195 AMQPS = "amqps"
4196 AMQP = "amqp"
4197
4199 """An integer port number that can be constructed from a service name string"""
4200
4202 """@param value: integer port number or string service name."""
4203 port = super(Url.Port, cls).__new__(cls, cls._port_int(value))
4204 setattr(port, 'name', str(value))
4205 return port
4206
4207 - def __eq__(self, x): return str(self) == x or int(self) == x
4208 - def __ne__(self, x): return not self == x
4210
4211 @staticmethod
4213 """Convert service, an integer or a service name, into an integer port number."""
4214 try:
4215 return int(value)
4216 except ValueError:
4217 try:
4218 return socket.getservbyname(value)
4219 except socket.error:
4220
4221 if value == Url.AMQPS: return 5671
4222 elif value == Url.AMQP: return 5672
4223 else:
4224 raise ValueError("Not a valid port number or service name: '%s'" % value)
4225
4226 - def __init__(self, url=None, defaults=True, **kwargs):
4227 """
4228 @param url: URL string to parse.
4229 @param defaults: If true, fill in missing default values in the URL.
4230 If false, you can fill them in later by calling self.defaults()
4231 @param kwargs: scheme, user, password, host, port, path.
4232 If specified, replaces corresponding part in url string.
4233 """
4234 if url:
4235 self._url = pn_url_parse(unicode2utf8(str(url)))
4236 if not self._url: raise ValueError("Invalid URL '%s'" % url)
4237 else:
4238 self._url = pn_url()
4239 for k in kwargs:
4240 getattr(self, k)
4241 setattr(self, k, kwargs[k])
4242 if defaults: self.defaults()
4243
4246 self.getter = globals()["pn_url_get_%s" % part]
4247 self.setter = globals()["pn_url_set_%s" % part]
4248 - def __get__(self, obj, type=None): return self.getter(obj._url)
4249 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
4250
4251 scheme = PartDescriptor('scheme')
4252 username = PartDescriptor('username')
4253 password = PartDescriptor('password')
4254 host = PartDescriptor('host')
4255 path = PartDescriptor('path')
4256
4258 portstr = pn_url_get_port(self._url)
4259 return portstr and Url.Port(portstr)
4260
4262 if value is None: pn_url_set_port(self._url, None)
4263 else: pn_url_set_port(self._url, str(Url.Port(value)))
4264
4265 port = property(_get_port, _set_port)
4266
4267 - def __str__(self): return pn_url_str(self._url)
4268
4271
4272 - def __eq__(self, x): return str(self) == str(x)
4273 - def __ne__(self, x): return not self == x
4274
4276 pn_url_free(self._url);
4277 del self._url
4278
4280 """
4281 Fill in missing values (scheme, host or port) with defaults
4282 @return: self
4283 """
4284 self.scheme = self.scheme or self.AMQP
4285 self.host = self.host or '0.0.0.0'
4286 self.port = self.port or self.Port(self.scheme)
4287 return self
4288
4289 __all__ = [
4290 "API_LANGUAGE",
4291 "IMPLEMENTATION_LANGUAGE",
4292 "ABORTED",
4293 "ACCEPTED",
4294 "AUTOMATIC",
4295 "PENDING",
4296 "MANUAL",
4297 "REJECTED",
4298 "RELEASED",
4299 "MODIFIED",
4300 "SETTLED",
4301 "UNDESCRIBED",
4302 "Array",
4303 "Collector",
4304 "Condition",
4305 "Connection",
4306 "Data",
4307 "Delivery",
4308 "Disposition",
4309 "Described",
4310 "Endpoint",
4311 "Event",
4312 "EventType",
4313 "Handler",
4314 "Link",
4315 "Message",
4316 "MessageException",
4317 "Messenger",
4318 "MessengerException",
4319 "ProtonException",
4320 "VERSION_MAJOR",
4321 "VERSION_MINOR",
4322 "Receiver",
4323 "SASL",
4324 "Sender",
4325 "Session",
4326 "SessionException",
4327 "SSL",
4328 "SSLDomain",
4329 "SSLSessionDetails",
4330 "SSLUnavailable",
4331 "SSLException",
4332 "Terminus",
4333 "Timeout",
4334 "Interrupt",
4335 "Transport",
4336 "TransportException",
4337 "Url",
4338 "char",
4339 "dispatch",
4340 "symbol",
4341 "timestamp",
4342 "ulong",
4343 "byte",
4344 "short",
4345 "int32",
4346 "ubyte",
4347 "ushort",
4348 "uint",
4349 "float32",
4350 "decimal32",
4351 "decimal64",
4352 "decimal128"
4353 ]
4354