1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 The proton module defines a suite of APIs that implement the AMQP 1.0
22 protocol.
23
24 The proton APIs consist of the following classes:
25
26 - L{Messenger} -- A messaging endpoint.
27 - L{Message} -- A class for creating and/or accessing AMQP message content.
28 - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded
29 data.
30
31 """
32
33 from cproton import *
34 try:
35 import uuid
36 except ImportError:
37 """
38 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases.
39 """
40 import struct
43 - def __init__(self, hex=None, bytes=None):
44 if [hex, bytes].count(None) != 1:
45 raise TypeError("need one of hex or bytes")
46 if bytes is not None:
47 self.bytes = bytes
48 elif hex is not None:
49 fields=hex.split("-")
50 fields[4:5] = [fields[4][:4], fields[4][4:]]
51 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
52
54 if isinstance(other, uuid.UUID):
55 return cmp(self.bytes, other.bytes)
56 else:
57 return -1
58
60 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
61
63 return "UUID(%r)" % str(self)
64
66 return self.bytes.__hash__()
67
68 import os, random, socket, time
69 rand = random.Random()
70 rand.seed((os.getpid(), time.time(), socket.gethostname()))
72 bytes = [rand.randint(0, 255) for i in xrange(16)]
73
74
75 bytes[7] &= 0x0F
76 bytes[7] |= 0x40
77
78
79 bytes[8] &= 0x3F
80 bytes[8] |= 0x80
81 return "".join(map(chr, bytes))
82
84 return uuid.UUID(bytes=random_uuid())
85
86 try:
87 bytes()
88 except NameError:
89 bytes = str
90
91 API_LANGUAGE = "C"
92 IMPLEMENTATION_LANGUAGE = "C"
101
103 """
104 The root of the proton exception hierarchy. All proton exception
105 classes derive from this exception.
106 """
107 pass
108
110 """
111 A timeout exception indicates that a blocking operation has timed
112 out.
113 """
114 pass
115
117 """
118 An interrupt exception indicaes that a blocking operation was interrupted.
119 """
120 pass
121
123 """
124 The root of the messenger exception hierarchy. All exceptions
125 generated by the messenger class derive from this exception.
126 """
127 pass
128
130 """
131 The MessageException class is the root of the message exception
132 hierarhcy. All exceptions generated by the Message class derive from
133 this exception.
134 """
135 pass
136
137 EXCEPTIONS = {
138 PN_TIMEOUT: Timeout,
139 PN_INTR: Interrupt
140 }
141
142 PENDING = Constant("PENDING")
143 ACCEPTED = Constant("ACCEPTED")
144 REJECTED = Constant("REJECTED")
145 RELEASED = Constant("RELEASED")
146 ABORTED = Constant("ABORTED")
147 SETTLED = Constant("SETTLED")
148
149 STATUSES = {
150 PN_STATUS_ABORTED: ABORTED,
151 PN_STATUS_ACCEPTED: ACCEPTED,
152 PN_STATUS_REJECTED: REJECTED,
153 PN_STATUS_RELEASED: RELEASED,
154 PN_STATUS_PENDING: PENDING,
155 PN_STATUS_SETTLED: SETTLED,
156 PN_STATUS_UNKNOWN: None
157 }
158
159 AUTOMATIC = Constant("AUTOMATIC")
160 MANUAL = Constant("MANUAL")
163 """
164 The L{Messenger} class defines a high level interface for sending
165 and receiving L{Messages<Message>}. Every L{Messenger} contains a
166 single logical queue of incoming messages and a single logical queue
167 of outgoing messages. These messages in these queues may be destined
168 for, or originate from, a variety of addresses.
169
170 The messenger interface is single-threaded. All methods
171 except one (L{interrupt}) are intended to be used from within
172 the messenger thread.
173
174
175 Address Syntax
176 ==============
177
178 An address has the following form::
179
180 [ amqp[s]:// ] [user[:password]@] domain [/[name]]
181
182 Where domain can be one of::
183
184 host | host:port | ip | ip:port | name
185
186 The following are valid examples of addresses:
187
188 - example.org
189 - example.org:1234
190 - amqp://example.org
191 - amqps://example.org
192 - example.org/incoming
193 - amqps://example.org/outgoing
194 - amqps://fred:trustno1@example.org
195 - 127.0.0.1:1234
196 - amqps://127.0.0.1:1234
197
198 Sending & Receiving Messages
199 ============================
200
201 The L{Messenger} class works in conjuction with the L{Message} class. The
202 L{Message} class is a mutable holder of message content.
203
204 The L{put} method copies its L{Message} to the outgoing queue, and may
205 send queued messages if it can do so without blocking. The L{send}
206 method blocks until it has sent the requested number of messages,
207 or until a timeout interrupts the attempt.
208
209
210 >>> message = Message()
211 >>> for i in range(3):
212 ... message.address = "amqp://host/queue"
213 ... message.subject = "Hello World %i" % i
214 ... messenger.put(message)
215 >>> messenger.send()
216
217 Similarly, the L{recv} method receives messages into the incoming
218 queue, and may block as it attempts to receive the requested number
219 of messages, or until timeout is reached. It may receive fewer
220 than the requested number. The L{get} method pops the
221 eldest L{Message} off the incoming queue and copies it into the L{Message}
222 object that you supply. It will not block.
223
224
225 >>> message = Message()
226 >>> messenger.recv(10):
227 >>> while messenger.incoming > 0:
228 ... messenger.get(message)
229 ... print message.subject
230 Hello World 0
231 Hello World 1
232 Hello World 2
233
234 The blocking flag allows you to turn off blocking behavior entirely,
235 in which case L{send} and L{recv} will do whatever they can without
236 blocking, and then return. You can then look at the number
237 of incoming and outgoing messages to see how much outstanding work
238 still remains.
239 """
240
242 """
243 Construct a new L{Messenger} with the given name. The name has
244 global scope. If a NULL name is supplied, a L{uuid.UUID} based
245 name will be chosen.
246
247 @type name: string
248 @param name: the name of the messenger or None
249 """
250 self._mng = pn_messenger(name)
251
253 """
254 Destroy the L{Messenger}. This will close all connections that
255 are managed by the L{Messenger}. Call the L{stop} method before
256 destroying the L{Messenger}.
257 """
258 if hasattr(self, "_mng"):
259 pn_messenger_free(self._mng)
260 del self._mng
261
263 if err < 0:
264 if (err == PN_INPROGRESS):
265 return
266 exc = EXCEPTIONS.get(err, MessengerException)
267 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng))))
268 else:
269 return err
270
271 @property
273 """
274 The name of the L{Messenger}.
275 """
276 return pn_messenger_name(self._mng)
277
279 return pn_messenger_get_certificate(self._mng)
280
282 self._check(pn_messenger_set_certificate(self._mng, value))
283
284 certificate = property(_get_certificate, _set_certificate,
285 doc="""
286 Path to a certificate file for the L{Messenger}. This certificate is
287 used when the L{Messenger} accepts or establishes SSL/TLS connections.
288 This property must be specified for the L{Messenger} to accept
289 incoming SSL/TLS connections and to establish client authenticated
290 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS
291 connections do not require this property.
292 """)
293
295 return pn_messenger_get_private_key(self._mng)
296
298 self._check(pn_messenger_set_private_key(self._mng, value))
299
300 private_key = property(_get_private_key, _set_private_key,
301 doc="""
302 Path to a private key file for the L{Messenger's<Messenger>}
303 certificate. This property must be specified for the L{Messenger} to
304 accept incoming SSL/TLS connections and to establish client
305 authenticated outgoing SSL/TLS connection. Non client authenticated
306 SSL/TLS connections do not require this property.
307 """)
308
310 return pn_messenger_get_password(self._mng)
311
313 self._check(pn_messenger_set_password(self._mng, value))
314
315 password = property(_get_password, _set_password,
316 doc="""
317 This property contains the password for the L{Messenger.private_key}
318 file, or None if the file is not encrypted.
319 """)
320
322 return pn_messenger_get_trusted_certificates(self._mng)
323
325 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
326
327 trusted_certificates = property(_get_trusted_certificates,
328 _set_trusted_certificates,
329 doc="""
330 A path to a database of trusted certificates for use in verifying the
331 peer on an SSL/TLS connection. If this property is None, then the peer
332 will not be verified.
333 """)
334
336 t = pn_messenger_get_timeout(self._mng)
337 if t == -1:
338 return None
339 else:
340 return float(t)/1000
341
343 if value is None:
344 t = -1
345 else:
346 t = long(1000*value)
347 self._check(pn_messenger_set_timeout(self._mng, t))
348
349 timeout = property(_get_timeout, _set_timeout,
350 doc="""
351 The timeout property contains the default timeout for blocking
352 operations performed by the L{Messenger}.
353 """)
354
356 return pn_messenger_is_blocking(self._mng)
357
359 self._check(pn_messenger_set_blocking(self._mng, b))
360
361 blocking = property(_is_blocking, _set_blocking,
362 doc="""
363 Enable or disable blocking behavior during L{Message} sending
364 and receiving. This affects every blocking call, with the
365 exception of L{work}. Currently, the affected calls are
366 L{send}, L{recv}, and L{stop}.
367 """)
368
370 return pn_messenger_get_incoming_window(self._mng)
371
373 self._check(pn_messenger_set_incoming_window(self._mng, window))
374
375 incoming_window = property(_get_incoming_window, _set_incoming_window,
376 doc="""
377 The incoming tracking window for the messenger. The messenger will
378 track the remote status of this many incoming deliveries after they
379 have been accepted or rejected. Defaults to zero.
380
381 L{Messages<Message>} enter this window only when you take them into your application
382 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>}
383 without explicitly accepting or rejecting the oldest message, then the
384 message that passes beyond the edge of the incoming window will be assigned
385 the default disposition of its link.
386 """)
387
389 return pn_messenger_get_outgoing_window(self._mng)
390
392 self._check(pn_messenger_set_outgoing_window(self._mng, window))
393
394 outgoing_window = property(_get_outgoing_window, _set_outgoing_window,
395 doc="""
396 The outgoing tracking window for the messenger. The messenger will
397 track the remote status of this many outgoing deliveries after calling
398 send. Defaults to zero.
399
400 A L{Message} enters this window when you call the put() method with the
401 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1
402 times, status information will no longer be available for the
403 first message.
404 """)
405
407 """
408 Currently a no-op placeholder.
409 For future compatibility, do not L{send} or L{recv} messages
410 before starting the L{Messenger}.
411 """
412 self._check(pn_messenger_start(self._mng))
413
415 """
416 Transitions the L{Messenger} to an inactive state. An inactive
417 L{Messenger} will not send or receive messages from its internal
418 queues. A L{Messenger} should be stopped before being discarded to
419 ensure a clean shutdown handshake occurs on any internally managed
420 connections.
421 """
422 self._check(pn_messenger_stop(self._mng))
423
424 @property
426 """
427 Returns true iff a L{Messenger} is in the stopped state.
428 This function does not block.
429 """
430 return pn_messenger_stopped(self._mng)
431
433 """
434 Subscribes the L{Messenger} to messages originating from the
435 specified source. The source is an address as specified in the
436 L{Messenger} introduction with the following addition. If the
437 domain portion of the address begins with the '~' character, the
438 L{Messenger} will interpret the domain as host/port, bind to it,
439 and listen for incoming messages. For example "~0.0.0.0",
440 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any
441 local interface and listen for incoming messages with the last
442 variant only permitting incoming SSL connections.
443
444 @type source: string
445 @param source: the source of messages to subscribe to
446 """
447 sub_impl = pn_messenger_subscribe(self._mng, source)
448 if not sub_impl:
449 self._check(PN_ERR)
450 return Subscription(sub_impl)
451
452 - def put(self, message):
453 """
454 Places the content contained in the message onto the outgoing
455 queue of the L{Messenger}. This method will never block, however
456 it will send any unblocked L{Messages<Message>} in the outgoing
457 queue immediately and leave any blocked L{Messages<Message>}
458 remaining in the outgoing queue. The L{send} call may be used to
459 block until the outgoing queue is empty. The L{outgoing} property
460 may be used to check the depth of the outgoing queue.
461
462 When the content in a given L{Message} object is copied to the outgoing
463 message queue, you may then modify or discard the L{Message} object
464 without having any impact on the content in the outgoing queue.
465
466 This method returns an outgoing tracker for the L{Message}. The tracker
467 can be used to determine the delivery status of the L{Message}.
468
469 @type message: Message
470 @param message: the message to place in the outgoing queue
471 @return: a tracker
472 """
473 message._pre_encode()
474 self._check(pn_messenger_put(self._mng, message._msg))
475 return pn_messenger_outgoing_tracker(self._mng)
476
478 """
479 Gets the last known remote state of the delivery associated with
480 the given tracker.
481
482 @type tracker: tracker
483 @param tracker: the tracker whose status is to be retrieved
484
485 @return: one of None, PENDING, REJECTED, or ACCEPTED
486 """
487 disp = pn_messenger_status(self._mng, tracker);
488 return STATUSES.get(disp, disp)
489
491 """
492 Checks if the delivery associated with the given tracker is still
493 waiting to be sent.
494
495 @type tracker: tracker
496 @param tracker: the tracker whose status is to be retrieved
497
498 @return true if delivery is still buffered
499 """
500 return pn_messenger_buffered(self._mng, tracker);
501
502 - def settle(self, tracker=None):
503 """
504 Frees a L{Messenger} from tracking the status associated with a given
505 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up
506 to the most recent will be settled.
507 """
508 if tracker is None:
509 tracker = pn_messenger_outgoing_tracker(self._mng)
510 flags = PN_CUMULATIVE
511 else:
512 flags = 0
513 self._check(pn_messenger_settle(self._mng, tracker, flags))
514
515 - def send(self, n=-1):
516 """
517 This call will block until the indicated number of L{messages<Message>}
518 have been sent, or until the operation times out. If n is -1 this call will
519 block until all outgoing L{messages<Message>} have been sent. If n is 0 then
520 this call will send whatever it can without blocking.
521 """
522 self._check(pn_messenger_send(self._mng, n))
523
524 - def recv(self, n=None):
525 """
526 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value
527 for I{n} is supplied, this call will receive as many L{messages<Message>} as it
528 can buffer internally. If the L{Messenger} is in blocking mode, this
529 call will block until at least one L{Message} is available in the
530 incoming queue.
531 """
532 if n is None:
533 n = -1
534 self._check(pn_messenger_recv(self._mng, n))
535
536 - def work(self, timeout=None):
537 """
538 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}.
539 This will block for the indicated timeout.
540 This method may also do I/O work other than sending and receiving
541 L{messages<Message>}. For example, closing connections after messenger.L{stop}()
542 has been called.
543 """
544 if timeout is None:
545 t = -1
546 else:
547 t = long(1000*timeout)
548 err = pn_messenger_work(self._mng, t)
549 if (err == PN_TIMEOUT):
550 return False
551 else:
552 self._check(err)
553 return True
554
555 @property
557 return pn_messenger_receiving(self._mng)
558
560 """
561 The L{Messenger} interface is single-threaded.
562 This is the only L{Messenger} function intended to be called
563 from outside of the L{Messenger} thread.
564 Call this from a non-messenger thread to interrupt
565 a L{Messenger} that is blocking.
566 This will cause any in-progress blocking call to throw
567 the L{Interrupt} exception. If there is no currently blocking
568 call, then the next blocking call will be affected, even if it
569 is within the same thread that interrupt was called from.
570 """
571 self._check(pn_messenger_interrupt(self._mng))
572
573 - def get(self, message=None):
574 """
575 Moves the message from the head of the incoming message queue into
576 the supplied message object. Any content in the message will be
577 overwritten.
578
579 A tracker for the incoming L{Message} is returned. The tracker can
580 later be used to communicate your acceptance or rejection of the
581 L{Message}.
582
583 If None is passed in for the L{Message} object, the L{Message}
584 popped from the head of the queue is discarded.
585
586 @type message: Message
587 @param message: the destination message object
588 @return: a tracker
589 """
590 if message is None:
591 impl = None
592 else:
593 impl = message._msg
594 self._check(pn_messenger_get(self._mng, impl))
595 if message is not None:
596 message._post_decode()
597 return pn_messenger_incoming_tracker(self._mng)
598
599 - def accept(self, tracker=None):
600 """
601 Signal the sender that you have acted on the L{Message}
602 pointed to by the tracker. If no tracker is supplied,
603 then all messages that have been returned by the L{get}
604 method are accepted, except those that have already been
605 auto-settled by passing beyond your incoming window size.
606
607 @type tracker: tracker
608 @param tracker: a tracker as returned by get
609 """
610 if tracker is None:
611 tracker = pn_messenger_incoming_tracker(self._mng)
612 flags = PN_CUMULATIVE
613 else:
614 flags = 0
615 self._check(pn_messenger_accept(self._mng, tracker, flags))
616
617 - def reject(self, tracker=None):
618 """
619 Rejects the L{Message} indicated by the tracker. If no tracker
620 is supplied, all messages that have been returned by the L{get}
621 method are rejected, except those that have already been auto-settled
622 by passing beyond your outgoing window size.
623
624 @type tracker: tracker
625 @param tracker: a tracker as returned by get
626 """
627 if tracker is None:
628 tracker = pn_messenger_incoming_tracker(self._mng)
629 flags = PN_CUMULATIVE
630 else:
631 flags = 0
632 self._check(pn_messenger_reject(self._mng, tracker, flags))
633
634 @property
636 """
637 The outgoing queue depth.
638 """
639 return pn_messenger_outgoing(self._mng)
640
641 @property
643 """
644 The incoming queue depth.
645 """
646 return pn_messenger_incoming(self._mng)
647
648 - def route(self, pattern, address):
649 """
650 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table.
651
652 The route procedure may be used to influence how a L{Messenger} will
653 internally treat a given address or class of addresses. Every call
654 to the route procedure will result in L{Messenger} appending a routing
655 rule to its internal routing table.
656
657 Whenever a L{Message} is presented to a L{Messenger} for delivery, it
658 will match the address of this message against the set of routing
659 rules in order. The first rule to match will be triggered, and
660 instead of routing based on the address presented in the message,
661 the L{Messenger} will route based on the address supplied in the rule.
662
663 The pattern matching syntax supports two types of matches, a '%'
664 will match any character except a '/', and a '*' will match any
665 character including a '/'.
666
667 A routing address is specified as a normal AMQP address, however it
668 may additionally use substitution variables from the pattern match
669 that triggered the rule.
670
671 Any message sent to "foo" will be routed to "amqp://foo.com":
672
673 >>> messenger.route("foo", "amqp://foo.com");
674
675 Any message sent to "foobar" will be routed to
676 "amqp://foo.com/bar":
677
678 >>> messenger.route("foobar", "amqp://foo.com/bar");
679
680 Any message sent to bar/<path> will be routed to the corresponding
681 path within the amqp://bar.com domain:
682
683 >>> messenger.route("bar/*", "amqp://bar.com/$1");
684
685 Route all L{messages<Message>} over TLS:
686
687 >>> messenger.route("amqp:*", "amqps:$1")
688
689 Supply credentials for foo.com:
690
691 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1");
692
693 Supply credentials for all domains:
694
695 >>> messenger.route("amqp://*", "amqp://user:password@$1");
696
697 Route all addresses through a single proxy while preserving the
698 original destination:
699
700 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2");
701
702 Route any address through a single broker:
703
704 >>> messenger.route("*", "amqp://user:password@broker/$1");
705 """
706 self._check(pn_messenger_route(self._mng, pattern, address))
707
708 - def rewrite(self, pattern, address):
709 """
710 Similar to route(), except that the destination of
711 the L{Message} is determined before the message address is rewritten.
712
713 The outgoing address is only rewritten after routing has been
714 finalized. If a message has an outgoing address of
715 "amqp://0.0.0.0:5678", and a rewriting rule that changes its
716 outgoing address to "foo", it will still arrive at the peer that
717 is listening on "amqp://0.0.0.0:5678", but when it arrives there,
718 the receiver will see its outgoing address as "foo".
719
720 The default rewrite rule removes username and password from addresses
721 before they are transmitted.
722 """
723 self._check(pn_messenger_rewrite(self._mng, pattern, address))
724
726 """
727 The L{Message} class is a mutable holder of message content.
728
729 @ivar instructions: delivery instructions for the message
730 @type instructions: dict
731 @ivar annotations: infrastructure defined message annotations
732 @type annotations: dict
733 @ivar properties: application defined message properties
734 @type properties: dict
735 @ivar body: message body
736 @type body: bytes | unicode | dict | list | int | long | float | UUID
737 """
738
739 DATA = PN_DATA
740 TEXT = PN_TEXT
741 AMQP = PN_AMQP
742 JSON = PN_JSON
743
744 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY
745
747 self._msg = pn_message()
748 self._id = Data(pn_message_id(self._msg))
749 self._correlation_id = Data(pn_message_correlation_id(self._msg))
750 self.instructions = None
751 self.annotations = None
752 self.properties = None
753 self.body = None
754
756 if hasattr(self, "_msg"):
757 pn_message_free(self._msg)
758 del self._msg
759
761 if err < 0:
762 exc = EXCEPTIONS.get(err, MessageException)
763 raise exc("[%s]: %s" % (err, pn_message_error(self._msg)))
764 else:
765 return err
766
785
786 - def _post_decode(self):
787 inst = Data(pn_message_instructions(self._msg))
788 ann = Data(pn_message_annotations(self._msg))
789 props = Data(pn_message_properties(self._msg))
790 body = Data(pn_message_body(self._msg))
791
792 if inst.next():
793 self.instructions = inst.get_object()
794 else:
795 self.instructions = None
796 if ann.next():
797 self.annotations = ann.get_object()
798 else:
799 self.annotations = None
800 if props.next():
801 self.properties = props.get_object()
802 else:
803 self.properties = None
804 if body.next():
805 self.body = body.get_object()
806 else:
807 self.body = None
808
810 """
811 Clears the contents of the L{Message}. All fields will be reset to
812 their default values.
813 """
814 pn_message_clear(self._msg)
815 self.instructions = None
816 self.annotations = None
817 self.properties = None
818 self.body = None
819
821 return pn_message_is_inferred(self._msg)
822
824 self._check(pn_message_set_inferred(self._msg, bool(value)))
825
826 inferred = property(_is_inferred, _set_inferred)
827
829 return pn_message_is_durable(self._msg)
830
832 self._check(pn_message_set_durable(self._msg, bool(value)))
833
834 durable = property(_is_durable, _set_durable,
835 doc="""
836 The durable property indicates that the message should be held durably
837 by any intermediaries taking responsibility for the message.
838 """)
839
841 return pn_message_get_priority(self._msg)
842
844 self._check(pn_message_set_priority(self._msg, value))
845
846 priority = property(_get_priority, _set_priority,
847 doc="""
848 The priority of the message.
849 """)
850
852 return pn_message_get_ttl(self._msg)
853
855 self._check(pn_message_set_ttl(self._msg, value))
856
857 ttl = property(_get_ttl, _set_ttl,
858 doc="""
859 The time to live of the message measured in milliseconds. Expired
860 messages may be dropped.
861 """)
862
864 return pn_message_is_first_acquirer(self._msg)
865
867 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
868
869 first_acquirer = property(_is_first_acquirer, _set_first_acquirer,
870 doc="""
871 True iff the recipient is the first to acquire the message.
872 """)
873
875 return pn_message_get_delivery_count(self._msg)
876
878 self._check(pn_message_set_delivery_count(self._msg, value))
879
880 delivery_count = property(_get_delivery_count, _set_delivery_count,
881 doc="""
882 The number of delivery attempts made for this message.
883 """)
884
885
893 id = property(_get_id, _set_id,
894 doc="""
895 The id of the message.
896 """)
897
899 return pn_message_get_user_id(self._msg)
900
902 self._check(pn_message_set_user_id(self._msg, value))
903
904 user_id = property(_get_user_id, _set_user_id,
905 doc="""
906 The user id of the message creator.
907 """)
908
910 return pn_message_get_address(self._msg)
911
913 self._check(pn_message_set_address(self._msg, value))
914
915 address = property(_get_address, _set_address,
916 doc="""
917 The address of the message.
918 """)
919
921 return pn_message_get_subject(self._msg)
922
924 self._check(pn_message_set_subject(self._msg, value))
925
926 subject = property(_get_subject, _set_subject,
927 doc="""
928 The subject of the message.
929 """)
930
932 return pn_message_get_reply_to(self._msg)
933
935 self._check(pn_message_set_reply_to(self._msg, value))
936
937 reply_to = property(_get_reply_to, _set_reply_to,
938 doc="""
939 The reply-to address for the message.
940 """)
941
945 if type(value) in (int, long):
946 value = ulong(value)
947 self._correlation_id.rewind()
948 self._correlation_id.put_object(value)
949
950 correlation_id = property(_get_correlation_id, _set_correlation_id,
951 doc="""
952 The correlation-id for the message.
953 """)
954
956 return pn_message_get_content_type(self._msg)
957
958 - def _set_content_type(self, value):
959 self._check(pn_message_set_content_type(self._msg, value))
960
961 content_type = property(_get_content_type, _set_content_type,
962 doc="""
963 The content-type of the message.
964 """)
965
967 return pn_message_get_content_encoding(self._msg)
968
969 - def _set_content_encoding(self, value):
970 self._check(pn_message_set_content_encoding(self._msg, value))
971
972 content_encoding = property(_get_content_encoding, _set_content_encoding,
973 doc="""
974 The content-encoding of the message.
975 """)
976
978 return pn_message_get_expiry_time(self._msg)
979
981 self._check(pn_message_set_expiry_time(self._msg, value))
982
983 expiry_time = property(_get_expiry_time, _set_expiry_time,
984 doc="""
985 The expiry time of the message.
986 """)
987
989 return pn_message_get_creation_time(self._msg)
990
992 self._check(pn_message_set_creation_time(self._msg, value))
993
994 creation_time = property(_get_creation_time, _set_creation_time,
995 doc="""
996 The creation time of the message.
997 """)
998
1000 return pn_message_get_group_id(self._msg)
1001
1003 self._check(pn_message_set_group_id(self._msg, value))
1004
1005 group_id = property(_get_group_id, _set_group_id,
1006 doc="""
1007 The group id of the message.
1008 """)
1009
1011 return pn_message_get_group_sequence(self._msg)
1012
1014 self._check(pn_message_set_group_sequence(self._msg, value))
1015
1016 group_sequence = property(_get_group_sequence, _set_group_sequence,
1017 doc="""
1018 The sequence of the message within its group.
1019 """)
1020
1022 return pn_message_get_reply_to_group_id(self._msg)
1023
1025 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1026
1027 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id,
1028 doc="""
1029 The group-id for any replies.
1030 """)
1031
1032
1035
1038
1039 format = property(_get_format, _set_format,
1040 doc="""
1041 The format of the message.
1042 """)
1043
1045 self._pre_encode()
1046 sz = 16
1047 while True:
1048 err, data = pn_message_encode(self._msg, sz)
1049 if err == PN_OVERFLOW:
1050 sz *= 2
1051 continue
1052 else:
1053 self._check(err)
1054 return data
1055
1057 self._check(pn_message_decode(self._msg, data, len(data)))
1058 self._post_decode()
1059
1060 - def load(self, data):
1061 self._check(pn_message_load(self._msg, data))
1062
1064 sz = 16
1065 while True:
1066 err, data = pn_message_save(self._msg, sz)
1067 if err == PN_OVERFLOW:
1068 sz *= 2
1069 continue
1070 else:
1071 self._check(err)
1072 return data
1073
1075 props = []
1076 for attr in ("inferred", "address", "reply_to", "durable", "ttl",
1077 "priority", "first_acquirer", "delivery_count", "id",
1078 "correlation_id", "user_id", "group_id", "group_sequence",
1079 "reply_to_group_id", "instructions", "annotations",
1080 "properties", "body"):
1081 value = getattr(self, attr)
1082 if value: props.append("%s=%r" % (attr, value))
1083 return "Message(%s)" % ", ".join(props)
1084
1086 tmp = pn_string(None)
1087 err = pn_inspect(self._msg, tmp)
1088 result = pn_string_get(tmp)
1089 pn_free(tmp)
1090 self._check(err)
1091 return result
1092
1094
1097
1098 @property
1100 return pn_subscription_address(self._impl)
1101
1103 """
1104 The DataException class is the root of the Data exception hierarchy.
1105 All exceptions raised by the Data class extend this exception.
1106 """
1107 pass
1108
1110
1113
1115 return "UnmappedType(%s)" % self.msg
1116
1118
1120 return "ulong(%s)" % long.__repr__(self)
1121
1123
1125 return "timestamp(%s)" % long.__repr__(self)
1126
1128
1130 return "symbol(%s)" % unicode.__repr__(self)
1131
1132 -class char(unicode):
1133
1135 return "char(%s)" % unicode.__repr__(self)
1136
1138
1139 - def __init__(self, descriptor, value):
1140 self.descriptor = descriptor
1141 self.value = value
1142
1144 return "Described(%r, %r)" % (self.descriptor, self.value)
1145
1147 if isinstance(o, Described):
1148 return self.descriptor == o.descriptor and self.value == o.value
1149 else:
1150 return False
1151
1152 UNDESCRIBED = Constant("UNDESCRIBED")
1153
1154 -class Array(object):
1155
1156 - def __init__(self, descriptor, type, *elements):
1157 self.descriptor = descriptor
1158 self.type = type
1159 self.elements = elements
1160
1162 if self.elements:
1163 els = ", %s" % (", ".join(map(repr, self.elements)))
1164 else:
1165 els = ""
1166 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1167
1169 if isinstance(o, Array):
1170 return self.descriptor == o.descriptor and \
1171 self.type == o.type and self.elements == o.elements
1172 else:
1173 return False
1174
1176 """
1177 The L{Data} class provides an interface for decoding, extracting,
1178 creating, and encoding arbitrary AMQP data. A L{Data} object
1179 contains a tree of AMQP values. Leaf nodes in this tree correspond
1180 to scalars in the AMQP type system such as L{ints<INT>} or
1181 L{strings<STRING>}. Non-leaf nodes in this tree correspond to
1182 compound values in the AMQP type system such as L{lists<LIST>},
1183 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}.
1184 The root node of the tree is the L{Data} object itself and can have
1185 an arbitrary number of children.
1186
1187 A L{Data} object maintains the notion of the current sibling node
1188 and a current parent node. Siblings are ordered within their parent.
1189 Values are accessed and/or added by using the L{next}, L{prev},
1190 L{enter}, and L{exit} methods to navigate to the desired location in
1191 the tree and using the supplied variety of put_*/get_* methods to
1192 access or add a value of the desired type.
1193
1194 The put_* methods will always add a value I{after} the current node
1195 in the tree. If the current node has a next sibling the put_* method
1196 will overwrite the value on this node. If there is no current node
1197 or the current node has no next sibling then one will be added. The
1198 put_* methods always set the added/modified node to the current
1199 node. The get_* methods read the value of the current node and do
1200 not change which node is current.
1201
1202 The following types of scalar values are supported:
1203
1204 - L{NULL}
1205 - L{BOOL}
1206 - L{UBYTE}
1207 - L{USHORT}
1208 - L{SHORT}
1209 - L{UINT}
1210 - L{INT}
1211 - L{ULONG}
1212 - L{LONG}
1213 - L{FLOAT}
1214 - L{DOUBLE}
1215 - L{BINARY}
1216 - L{STRING}
1217 - L{SYMBOL}
1218
1219 The following types of compound values are supported:
1220
1221 - L{DESCRIBED}
1222 - L{ARRAY}
1223 - L{LIST}
1224 - L{MAP}
1225 """
1226
1227 NULL = PN_NULL; "A null value."
1228 BOOL = PN_BOOL; "A boolean value."
1229 UBYTE = PN_UBYTE; "An unsigned byte value."
1230 BYTE = PN_BYTE; "A signed byte value."
1231 USHORT = PN_USHORT; "An unsigned short value."
1232 SHORT = PN_SHORT; "A short value."
1233 UINT = PN_UINT; "An unsigned int value."
1234 INT = PN_INT; "A signed int value."
1235 CHAR = PN_CHAR; "A character value."
1236 ULONG = PN_ULONG; "An unsigned long value."
1237 LONG = PN_LONG; "A signed long value."
1238 TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
1239 FLOAT = PN_FLOAT; "A float value."
1240 DOUBLE = PN_DOUBLE; "A double value."
1241 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value."
1242 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value."
1243 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value."
1244 UUID = PN_UUID; "A UUID value."
1245 BINARY = PN_BINARY; "A binary string."
1246 STRING = PN_STRING; "A unicode string."
1247 SYMBOL = PN_SYMBOL; "A symbolic string."
1248 DESCRIBED = PN_DESCRIBED; "A described value."
1249 ARRAY = PN_ARRAY; "An array value."
1250 LIST = PN_LIST; "A list value."
1251 MAP = PN_MAP; "A map value."
1252
1253 type_names = {
1254 NULL: "null",
1255 BOOL: "bool",
1256 BYTE: "byte",
1257 UBYTE: "ubyte",
1258 SHORT: "short",
1259 USHORT: "ushort",
1260 INT: "int",
1261 UINT: "uint",
1262 CHAR: "char",
1263 LONG: "long",
1264 ULONG: "ulong",
1265 TIMESTAMP: "timestamp",
1266 FLOAT: "float",
1267 DOUBLE: "double",
1268 DECIMAL32: "decimal32",
1269 DECIMAL64: "decimal64",
1270 DECIMAL128: "decimal128",
1271 UUID: "uuid",
1272 BINARY: "binary",
1273 STRING: "string",
1274 SYMBOL: "symbol",
1275 DESCRIBED: "described",
1276 ARRAY: "array",
1277 LIST: "list",
1278 MAP: "map"
1279 }
1280
1281 @classmethod
1283
1285 if type(capacity) in (int, long):
1286 self._data = pn_data(capacity)
1287 self._free = True
1288 else:
1289 self._data = capacity
1290 self._free = False
1291
1293 if self._free and hasattr(self, "_data"):
1294 pn_data_free(self._data)
1295 del self._data
1296
1298 if err < 0:
1299 exc = EXCEPTIONS.get(err, DataException)
1300 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data))))
1301 else:
1302 return err
1303
1305 """
1306 Clears the data object.
1307 """
1308 pn_data_clear(self._data)
1309
1311 """
1312 Clears current node and sets the parent to the root node. Clearing the
1313 current node sets it _before_ the first node, calling next() will advance to
1314 the first node.
1315 """
1316 pn_data_rewind(self._data)
1317
1319 """
1320 Advances the current node to its next sibling and returns its
1321 type. If there is no next sibling the current node remains
1322 unchanged and None is returned.
1323 """
1324 found = pn_data_next(self._data)
1325 if found:
1326 return self.type()
1327 else:
1328 return None
1329
1331 """
1332 Advances the current node to its previous sibling and returns its
1333 type. If there is no previous sibling the current node remains
1334 unchanged and None is returned.
1335 """
1336 found = pn_data_prev(self._data)
1337 if found:
1338 return self.type()
1339 else:
1340 return None
1341
1343 """
1344 Sets the parent node to the current node and clears the current node.
1345 Clearing the current node sets it _before_ the first child,
1346 call next() advances to the first child.
1347 """
1348 return pn_data_enter(self._data)
1349
1351 """
1352 Sets the current node to the parent node and the parent node to
1353 its own parent.
1354 """
1355 return pn_data_exit(self._data)
1356
1358 return pn_data_lookup(self._data, name)
1359
1361 pn_data_narrow(self._data)
1362
1364 pn_data_widen(self._data)
1365
1367 """
1368 Returns the type of the current node.
1369 """
1370 dtype = pn_data_type(self._data)
1371 if dtype == -1:
1372 return None
1373 else:
1374 return dtype
1375
1377 """
1378 Returns a representation of the data encoded in AMQP format.
1379 """
1380 size = 1024
1381 while True:
1382 cd, enc = pn_data_encode(self._data, size)
1383 if cd == PN_OVERFLOW:
1384 size *= 2
1385 elif cd >= 0:
1386 return enc
1387 else:
1388 self._check(cd)
1389
1391 """
1392 Decodes the first value from supplied AMQP data and returns the
1393 number of bytes consumed.
1394
1395 @type encoded: binary
1396 @param encoded: AMQP encoded binary data
1397 """
1398 return self._check(pn_data_decode(self._data, encoded))
1399
1401 """
1402 Puts a list value. Elements may be filled by entering the list
1403 node and putting element values.
1404
1405 >>> data = Data()
1406 >>> data.put_list()
1407 >>> data.enter()
1408 >>> data.put_int(1)
1409 >>> data.put_int(2)
1410 >>> data.put_int(3)
1411 >>> data.exit()
1412 """
1413 self._check(pn_data_put_list(self._data))
1414
1416 """
1417 Puts a map value. Elements may be filled by entering the map node
1418 and putting alternating key value pairs.
1419
1420 >>> data = Data()
1421 >>> data.put_map()
1422 >>> data.enter()
1423 >>> data.put_string("key")
1424 >>> data.put_string("value")
1425 >>> data.exit()
1426 """
1427 self._check(pn_data_put_map(self._data))
1428
1429 - def put_array(self, described, element_type):
1430 """
1431 Puts an array value. Elements may be filled by entering the array
1432 node and putting the element values. The values must all be of the
1433 specified array element type. If an array is described then the
1434 first child value of the array is the descriptor and may be of any
1435 type.
1436
1437 >>> data = Data()
1438 >>>
1439 >>> data.put_array(False, Data.INT)
1440 >>> data.enter()
1441 >>> data.put_int(1)
1442 >>> data.put_int(2)
1443 >>> data.put_int(3)
1444 >>> data.exit()
1445 >>>
1446 >>> data.put_array(True, Data.DOUBLE)
1447 >>> data.enter()
1448 >>> data.put_symbol("array-descriptor")
1449 >>> data.put_double(1.1)
1450 >>> data.put_double(1.2)
1451 >>> data.put_double(1.3)
1452 >>> data.exit()
1453
1454 @type described: bool
1455 @param described: specifies whether the array is described
1456 @type element_type: int
1457 @param element_type: the type of the array elements
1458 """
1459 self._check(pn_data_put_array(self._data, described, element_type))
1460
1462 """
1463 Puts a described value. A described node has two children, the
1464 descriptor and the value. These are specified by entering the node
1465 and putting the desired values.
1466
1467 >>> data = Data()
1468 >>> data.put_described()
1469 >>> data.enter()
1470 >>> data.put_symbol("value-descriptor")
1471 >>> data.put_string("the value")
1472 >>> data.exit()
1473 """
1474 self._check(pn_data_put_described(self._data))
1475
1477 """
1478 Puts a null value.
1479 """
1480 self._check(pn_data_put_null(self._data))
1481
1483 """
1484 Puts a boolean value.
1485
1486 @param b: a boolean value
1487 """
1488 self._check(pn_data_put_bool(self._data, b))
1489
1491 """
1492 Puts an unsigned byte value.
1493
1494 @param ub: an integral value
1495 """
1496 self._check(pn_data_put_ubyte(self._data, ub))
1497
1499 """
1500 Puts a signed byte value.
1501
1502 @param b: an integral value
1503 """
1504 self._check(pn_data_put_byte(self._data, b))
1505
1507 """
1508 Puts an unsigned short value.
1509
1510 @param us: an integral value.
1511 """
1512 self._check(pn_data_put_ushort(self._data, us))
1513
1515 """
1516 Puts a signed short value.
1517
1518 @param s: an integral value
1519 """
1520 self._check(pn_data_put_short(self._data, s))
1521
1523 """
1524 Puts an unsigned int value.
1525
1526 @param ui: an integral value
1527 """
1528 self._check(pn_data_put_uint(self._data, ui))
1529
1531 """
1532 Puts a signed int value.
1533
1534 @param i: an integral value
1535 """
1536 self._check(pn_data_put_int(self._data, i))
1537
1539 """
1540 Puts a char value.
1541
1542 @param c: a single character
1543 """
1544 self._check(pn_data_put_char(self._data, ord(c)))
1545
1547 """
1548 Puts an unsigned long value.
1549
1550 @param ul: an integral value
1551 """
1552 self._check(pn_data_put_ulong(self._data, ul))
1553
1555 """
1556 Puts a signed long value.
1557
1558 @param l: an integral value
1559 """
1560 self._check(pn_data_put_long(self._data, l))
1561
1563 """
1564 Puts a timestamp value.
1565
1566 @param t: an integral value
1567 """
1568 self._check(pn_data_put_timestamp(self._data, t))
1569
1571 """
1572 Puts a float value.
1573
1574 @param f: a floating point value
1575 """
1576 self._check(pn_data_put_float(self._data, f))
1577
1579 """
1580 Puts a double value.
1581
1582 @param d: a floating point value.
1583 """
1584 self._check(pn_data_put_double(self._data, d))
1585
1587 """
1588 Puts a decimal32 value.
1589
1590 @param d: a decimal32 value
1591 """
1592 self._check(pn_data_put_decimal32(self._data, d))
1593
1595 """
1596 Puts a decimal64 value.
1597
1598 @param d: a decimal64 value
1599 """
1600 self._check(pn_data_put_decimal64(self._data, d))
1601
1603 """
1604 Puts a decimal128 value.
1605
1606 @param d: a decimal128 value
1607 """
1608 self._check(pn_data_put_decimal128(self._data, d))
1609
1611 """
1612 Puts a UUID value.
1613
1614 @param u: a uuid value
1615 """
1616 self._check(pn_data_put_uuid(self._data, u.bytes))
1617
1619 """
1620 Puts a binary value.
1621
1622 @type b: binary
1623 @param b: a binary value
1624 """
1625 self._check(pn_data_put_binary(self._data, b))
1626
1628 """
1629 Puts a unicode value.
1630
1631 @type s: unicode
1632 @param s: a unicode value
1633 """
1634 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1635
1637 """
1638 Puts a symbolic value.
1639
1640 @type s: string
1641 @param s: the symbol name
1642 """
1643 self._check(pn_data_put_symbol(self._data, s))
1644
1646 """
1647 If the current node is a list, return the number of elements,
1648 otherwise return zero. List elements can be accessed by entering
1649 the list.
1650
1651 >>> count = data.get_list()
1652 >>> data.enter()
1653 >>> for i in range(count):
1654 ... type = data.next()
1655 ... if type == Data.STRING:
1656 ... print data.get_string()
1657 ... elif type == ...:
1658 ... ...
1659 >>> data.exit()
1660 """
1661 return pn_data_get_list(self._data)
1662
1664 """
1665 If the current node is a map, return the number of child elements,
1666 otherwise return zero. Key value pairs can be accessed by entering
1667 the map.
1668
1669 >>> count = data.get_map()
1670 >>> data.enter()
1671 >>> for i in range(count/2):
1672 ... type = data.next()
1673 ... if type == Data.STRING:
1674 ... print data.get_string()
1675 ... elif type == ...:
1676 ... ...
1677 >>> data.exit()
1678 """
1679 return pn_data_get_map(self._data)
1680
1682 """
1683 If the current node is an array, return a tuple of the element
1684 count, a boolean indicating whether the array is described, and
1685 the type of each element, otherwise return (0, False, None). Array
1686 data can be accessed by entering the array.
1687
1688 >>> # read an array of strings with a symbolic descriptor
1689 >>> count, described, type = data.get_array()
1690 >>> data.enter()
1691 >>> data.next()
1692 >>> print "Descriptor:", data.get_symbol()
1693 >>> for i in range(count):
1694 ... data.next()
1695 ... print "Element:", data.get_string()
1696 >>> data.exit()
1697 """
1698 count = pn_data_get_array(self._data)
1699 described = pn_data_is_array_described(self._data)
1700 type = pn_data_get_array_type(self._data)
1701 if type == -1:
1702 type = None
1703 return count, described, type
1704
1706 """
1707 Checks if the current node is a described value. The descriptor
1708 and value may be accessed by entering the described value.
1709
1710 >>> # read a symbolically described string
1711 >>> assert data.is_described() # will error if the current node is not described
1712 >>> data.enter()
1713 >>> print data.get_symbol()
1714 >>> print data.get_string()
1715 >>> data.exit()
1716 """
1717 return pn_data_is_described(self._data)
1718
1720 """
1721 Checks if the current node is a null.
1722 """
1723 return pn_data_is_null(self._data)
1724
1726 """
1727 If the current node is a boolean, returns its value, returns False
1728 otherwise.
1729 """
1730 return pn_data_get_bool(self._data)
1731
1733 """
1734 If the current node is an unsigned byte, returns its value,
1735 returns 0 otherwise.
1736 """
1737 return pn_data_get_ubyte(self._data)
1738
1740 """
1741 If the current node is a signed byte, returns its value, returns 0
1742 otherwise.
1743 """
1744 return pn_data_get_byte(self._data)
1745
1747 """
1748 If the current node is an unsigned short, returns its value,
1749 returns 0 otherwise.
1750 """
1751 return pn_data_get_ushort(self._data)
1752
1754 """
1755 If the current node is a signed short, returns its value, returns
1756 0 otherwise.
1757 """
1758 return pn_data_get_short(self._data)
1759
1761 """
1762 If the current node is an unsigned int, returns its value, returns
1763 0 otherwise.
1764 """
1765 return pn_data_get_uint(self._data)
1766
1768 """
1769 If the current node is a signed int, returns its value, returns 0
1770 otherwise.
1771 """
1772 return pn_data_get_int(self._data)
1773
1775 """
1776 If the current node is a char, returns its value, returns 0
1777 otherwise.
1778 """
1779 return char(unichr(pn_data_get_char(self._data)))
1780
1782 """
1783 If the current node is an unsigned long, returns its value,
1784 returns 0 otherwise.
1785 """
1786 return ulong(pn_data_get_ulong(self._data))
1787
1789 """
1790 If the current node is an signed long, returns its value, returns
1791 0 otherwise.
1792 """
1793 return pn_data_get_long(self._data)
1794
1796 """
1797 If the current node is a timestamp, returns its value, returns 0
1798 otherwise.
1799 """
1800 return timestamp(pn_data_get_timestamp(self._data))
1801
1803 """
1804 If the current node is a float, returns its value, raises 0
1805 otherwise.
1806 """
1807 return pn_data_get_float(self._data)
1808
1810 """
1811 If the current node is a double, returns its value, returns 0
1812 otherwise.
1813 """
1814 return pn_data_get_double(self._data)
1815
1816
1818 """
1819 If the current node is a decimal32, returns its value, returns 0
1820 otherwise.
1821 """
1822 return pn_data_get_decimal32(self._data)
1823
1824
1826 """
1827 If the current node is a decimal64, returns its value, returns 0
1828 otherwise.
1829 """
1830 return pn_data_get_decimal64(self._data)
1831
1832
1834 """
1835 If the current node is a decimal128, returns its value, returns 0
1836 otherwise.
1837 """
1838 return pn_data_get_decimal128(self._data)
1839
1841 """
1842 If the current node is a UUID, returns its value, returns None
1843 otherwise.
1844 """
1845 if pn_data_type(self._data) == Data.UUID:
1846 return uuid.UUID(bytes=pn_data_get_uuid(self._data))
1847 else:
1848 return None
1849
1851 """
1852 If the current node is binary, returns its value, returns ""
1853 otherwise.
1854 """
1855 return pn_data_get_binary(self._data)
1856
1858 """
1859 If the current node is a string, returns its value, returns ""
1860 otherwise.
1861 """
1862 return pn_data_get_string(self._data).decode("utf8")
1863
1865 """
1866 If the current node is a symbol, returns its value, returns ""
1867 otherwise.
1868 """
1869 return symbol(pn_data_get_symbol(self._data))
1870
1871 - def copy(self, src):
1872 self._check(pn_data_copy(self._data, src._data))
1873
1884
1886 pn_data_dump(self._data)
1887
1897
1899 if self.enter():
1900 try:
1901 result = {}
1902 while self.next():
1903 k = self.get_object()
1904 if self.next():
1905 v = self.get_object()
1906 else:
1907 v = None
1908 result[k] = v
1909 finally:
1910 self.exit()
1911 return result
1912
1921
1923 if self.enter():
1924 try:
1925 result = []
1926 while self.next():
1927 result.append(self.get_object())
1928 finally:
1929 self.exit()
1930 return result
1931
1942
1951
1953 """
1954 If the current node is an array, return an Array object
1955 representing the array and its contents. Otherwise return None.
1956 This is a convenience wrapper around get_array, enter, etc.
1957 """
1958
1959 count, described, type = self.get_array()
1960 if type is None: return None
1961 if self.enter():
1962 try:
1963 if described:
1964 self.next()
1965 descriptor = self.get_object()
1966 else:
1967 descriptor = UNDESCRIBED
1968 elements = []
1969 while self.next():
1970 elements.append(self.get_object())
1971 finally:
1972 self.exit()
1973 return Array(descriptor, type, *elements)
1974
1986
1987 put_mappings = {
1988 None.__class__: lambda s, _: s.put_null(),
1989 bool: put_bool,
1990 dict: put_dict,
1991 list: put_sequence,
1992 tuple: put_sequence,
1993 unicode: put_string,
1994 bytes: put_binary,
1995 symbol: put_symbol,
1996 int: put_long,
1997 char: put_char,
1998 long: put_long,
1999 ulong: put_ulong,
2000 timestamp: put_timestamp,
2001 float: put_double,
2002 uuid.UUID: put_uuid,
2003 Described: put_py_described,
2004 Array: put_py_array
2005 }
2006 get_mappings = {
2007 NULL: lambda s: None,
2008 BOOL: get_bool,
2009 BYTE: get_byte,
2010 UBYTE: get_ubyte,
2011 SHORT: get_short,
2012 USHORT: get_ushort,
2013 INT: get_int,
2014 UINT: get_uint,
2015 CHAR: get_char,
2016 LONG: get_long,
2017 ULONG: get_ulong,
2018 TIMESTAMP: get_timestamp,
2019 FLOAT: get_float,
2020 DOUBLE: get_double,
2021 DECIMAL32: get_decimal32,
2022 DECIMAL64: get_decimal64,
2023 DECIMAL128: get_decimal128,
2024 UUID: get_uuid,
2025 BINARY: get_binary,
2026 STRING: get_string,
2027 SYMBOL: get_symbol,
2028 DESCRIBED: get_py_described,
2029 ARRAY: get_py_array,
2030 LIST: get_sequence,
2031 MAP: get_dict
2032 }
2033
2034
2036 putter = self.put_mappings[obj.__class__]
2037 putter(self, obj)
2038
2040 type = self.type()
2041 if type is None: return None
2042 getter = self.get_mappings.get(type)
2043 if getter:
2044 return getter(self)
2045 else:
2046 return UnmappedType(str(type))
2047
2050
2052
2053 LOCAL_UNINIT = PN_LOCAL_UNINIT
2054 REMOTE_UNINIT = PN_REMOTE_UNINIT
2055 LOCAL_ACTIVE = PN_LOCAL_ACTIVE
2056 REMOTE_ACTIVE = PN_REMOTE_ACTIVE
2057 LOCAL_CLOSED = PN_LOCAL_CLOSED
2058 REMOTE_CLOSED = PN_REMOTE_CLOSED
2059
2062
2064 obj2cond(self.condition, self._get_cond_impl())
2065
2066 @property
2068 return cond2obj(self._get_remote_cond_impl())
2069
2070
2072 assert False, "Subclass must override this!"
2073
2075 assert False, "Subclass must override this!"
2076
2078
2079 - def __init__(self, name, description=None, info=None):
2080 self.name = name
2081 self.description = description
2082 self.info = info
2083
2085 return "Condition(%s)" % ", ".join([repr(x) for x in
2086 (self.name, self.description, self.info)
2087 if x])
2088
2090 if not isinstance(o, Condition): return False
2091 return self.name == o.name and \
2092 self.description == o.description and \
2093 self.info == o.info
2094
2096 pn_condition_clear(cond)
2097 if obj:
2098 pn_condition_set_name(cond, str(obj.name))
2099 pn_condition_set_description(cond, obj.description)
2100 info = Data(pn_condition_info(cond))
2101 if obj.info:
2102 info.put_object(obj.info)
2103
2105 if pn_condition_is_set(cond):
2106 return Condition(pn_condition_get_name(cond),
2107 pn_condition_get_description(cond),
2108 dat2obj(pn_condition_info(cond)))
2109 else:
2110 return None
2111
2119
2124
2126 if not conn: return None
2127 ctx = pn_connection_get_context(conn)
2128 if ctx: return ctx
2129 wrapper = Connection(_conn=conn)
2130 return wrapper
2131
2133
2135 Endpoint.__init__(self)
2136 if _conn:
2137 self._conn = _conn
2138 else:
2139 self._conn = pn_connection()
2140 pn_connection_set_context(self._conn, self)
2141 self.offered_capabilities = None
2142 self.desired_capabilities = None
2143 self.properties = None
2144
2146 if hasattr(self, "_conn"):
2147 pn_connection_free(self._conn)
2148 del self._conn
2149
2151 if err < 0:
2152 exc = EXCEPTIONS.get(err, ConnectionException)
2153 raise exc("[%s]: %s" % (err, pn_connection_error(self._conn)))
2154 else:
2155 return err
2156
2158 return pn_connection_condition(self._conn)
2159
2161 return pn_connection_remote_condition(self._conn)
2162
2164 return pn_connection_get_container(self._conn)
2166 return pn_connection_set_container(self._conn, name)
2167
2168 container = property(_get_container, _set_container)
2169
2171 return pn_connection_get_hostname(self._conn)
2173 return pn_connection_set_hostname(self._conn, name)
2174
2175 hostname = property(_get_hostname, _set_hostname)
2176
2177 @property
2179 return pn_connection_remote_container(self._conn)
2180
2181 @property
2183 return pn_connection_remote_hostname(self._conn)
2184
2185 @property
2187 return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
2188
2189 @property
2191 return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
2192
2193 @property
2195 return dat2obj(pn_connection_remote_properties(self._conn))
2196
2198 obj2dat(self.offered_capabilities,
2199 pn_connection_offered_capabilities(self._conn))
2200 obj2dat(self.desired_capabilities,
2201 pn_connection_desired_capabilities(self._conn))
2202 obj2dat(self.properties, pn_connection_properties(self._conn))
2203 pn_connection_open(self._conn)
2204
2206 self._update_cond()
2207 pn_connection_close(self._conn)
2208
2209 @property
2211 return pn_connection_state(self._conn)
2212
2214 return wrap_session(pn_session(self._conn))
2215
2217 return wrap_session(pn_session_head(self._conn, mask))
2218
2220 return wrap_link(pn_link_head(self._conn, mask))
2221
2222 @property
2224 return wrap_delivery(pn_work_head(self._conn))
2225
2226 @property
2228 return pn_error_code(pn_connection_error(self._conn))
2229
2232
2234 if ssn is None: return None
2235 ctx = pn_session_get_context(ssn)
2236 if ctx:
2237 return ctx
2238 else:
2239 wrapper = Session(ssn)
2240 pn_session_set_context(ssn, wrapper)
2241 return wrapper
2242
2244
2248
2250 if hasattr(self, "_ssn"):
2251 pn_session_free(self._ssn)
2252 del self._ssn
2253
2255 return pn_session_condition(self._ssn)
2256
2258 return pn_session_remote_condition(self._ssn)
2259
2261 return pn_session_get_incoming_capacity(self._ssn)
2262
2264 pn_session_set_incoming_capacity(self._ssn, capacity)
2265
2266 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
2267
2268 @property
2270 return pn_session_outgoing_bytes(self._ssn)
2271
2272 @property
2274 return pn_session_incoming_bytes(self._ssn)
2275
2277 pn_session_open(self._ssn)
2278
2280 self._update_cond()
2281 pn_session_close(self._ssn)
2282
2283 - def next(self, mask):
2284 return wrap_session(pn_session_next(self._ssn, mask))
2285
2286 @property
2288 return pn_session_state(self._ssn)
2289
2290 @property
2292 return wrap_connection(pn_session_connection(self._ssn))
2293
2295 return wrap_link(pn_sender(self._ssn, name))
2296
2298 return wrap_link(pn_receiver(self._ssn, name))
2299
2302
2304 if link is None: return None
2305 ctx = pn_link_get_context(link)
2306 if ctx:
2307 return ctx
2308 else:
2309 if pn_link_is_sender(link):
2310 wrapper = Sender(link)
2311 else:
2312 wrapper = Receiver(link)
2313 pn_link_set_context(link, wrapper)
2314 return wrapper
2315
2316 -class Link(Endpoint):
2317
2318 SND_UNSETTLED = PN_SND_UNSETTLED
2319 SND_SETTLED = PN_SND_SETTLED
2320 SND_MIXED = PN_SND_MIXED
2321
2322 RCV_FIRST = PN_RCV_FIRST
2323 RCV_SECOND = PN_RCV_SECOND
2324
2328
2330 if hasattr(self, "_link"):
2331 pn_link_free(self._link)
2332 del self._link
2333
2335 if err < 0:
2336 exc = EXCEPTIONS.get(err, LinkException)
2337 raise exc("[%s]: %s" % (err, pn_link_error(self._link)))
2338 else:
2339 return err
2340
2342 return pn_link_condition(self._link)
2343
2345 return pn_link_remote_condition(self._link)
2346
2348 pn_link_open(self._link)
2349
2351 self._update_cond()
2352 pn_link_close(self._link)
2353
2354 @property
2356 return pn_link_state(self._link)
2357
2358 @property
2360 return Terminus(pn_link_source(self._link))
2361
2362 @property
2364 return Terminus(pn_link_target(self._link))
2365
2366 @property
2368 return Terminus(pn_link_remote_source(self._link))
2369 @property
2371 return Terminus(pn_link_remote_target(self._link))
2372
2373 @property
2375 return wrap_session(pn_link_session(self._link))
2376
2378 return wrap_delivery(pn_delivery(self._link, tag))
2379
2380 @property
2382 return wrap_delivery(pn_link_current(self._link))
2383
2385 return pn_link_advance(self._link)
2386
2387 @property
2389 return pn_link_unsettled(self._link)
2390
2391 @property
2393 return pn_link_credit(self._link)
2394
2395 @property
2397 return pn_link_available(self._link)
2398
2399 @property
2401 return pn_link_queued(self._link)
2402
2403 - def next(self, mask):
2404 return wrap_link(pn_link_next(self._link, mask))
2405
2406 @property
2408 return pn_link_name(self._link)
2409
2410 @property
2412 return pn_link_is_sender(self._link)
2413
2414 @property
2416 return pn_link_is_receiver(self._link)
2417
2418 @property
2420 return pn_link_remote_snd_settle_mode(self._link)
2421
2422 @property
2424 return pn_link_remote_rcv_settle_mode(self._link)
2425
2427 return pn_link_snd_settle_mode(self._link)
2429 pn_link_set_snd_settle_mode(self._link, mode)
2430 snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode)
2431
2433 return pn_link_rcv_settle_mode(self._link)
2435 pn_link_set_rcv_settle_mode(self._link, mode)
2436 rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
2437
2439 return pn_link_drained(self._link)
2440
2442
2443 UNSPECIFIED = PN_UNSPECIFIED
2444 SOURCE = PN_SOURCE
2445 TARGET = PN_TARGET
2446 COORDINATOR = PN_COORDINATOR
2447
2448 NONDURABLE = PN_NONDURABLE
2449 CONFIGURATION = PN_CONFIGURATION
2450 DELIVERIES = PN_DELIVERIES
2451
2452 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED
2453 DIST_MODE_COPY = PN_DIST_MODE_COPY
2454 DIST_MODE_MOVE = PN_DIST_MODE_MOVE
2455
2458
2460 if err < 0:
2461 exc = EXCEPTIONS.get(err, LinkException)
2462 raise exc("[%s]" % err)
2463 else:
2464 return err
2465
2467 return pn_terminus_get_type(self._impl)
2469 self._check(pn_terminus_set_type(self._impl, type))
2470 type = property(_get_type, _set_type)
2471
2473 return pn_terminus_get_address(self._impl)
2475 self._check(pn_terminus_set_address(self._impl, address))
2476 address = property(_get_address, _set_address)
2477
2479 return pn_terminus_get_durability(self._impl)
2481 self._check(pn_terminus_set_durability(self._impl, seconds))
2482 durability = property(_get_durability, _set_durability)
2483
2485 return pn_terminus_get_expiry_policy(self._impl)
2487 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2488 expiry_policy = property(_get_expiry_policy, _set_expiry_policy)
2489
2491 return pn_terminus_get_timeout(self._impl)
2493 self._check(pn_terminus_set_timeout(self._impl, seconds))
2494 timeout = property(_get_timeout, _set_timeout)
2495
2497 return pn_terminus_is_dynamic(self._impl)
2499 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2500 dynamic = property(_is_dynamic, _set_dynamic)
2501
2503 return pn_terminus_get_distribution_mode(self._impl)
2505 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2506 distribution_mode = property(_get_distribution_mode, _set_distribution_mode)
2507
2508 @property
2510 return Data(pn_terminus_properties(self._impl))
2511
2512 @property
2514 return Data(pn_terminus_capabilities(self._impl))
2515
2516 @property
2518 return Data(pn_terminus_outcomes(self._impl))
2519
2520 @property
2522 return Data(pn_terminus_filter(self._impl))
2523
2524 - def copy(self, src):
2525 self._check(pn_terminus_copy(self._impl, src._impl))
2526
2529
2531 pn_link_offered(self._link, n)
2532
2533 - def send(self, bytes):
2534 return self._check(pn_link_send(self._link, bytes))
2535
2537
2538 - def flow(self, n):
2539 pn_link_flow(self._link, n)
2540
2541 - def recv(self, limit):
2542 n, bytes = pn_link_recv(self._link, limit)
2543 if n == PN_EOS:
2544 return None
2545 else:
2546 self._check(n)
2547 return bytes
2548
2550 pn_link_drain(self._link, n)
2551
2553 return pn_link_draining(self._link)
2554
2556 if not dlv: return None
2557 ctx = pn_delivery_get_context(dlv)
2558 if ctx: return ctx
2559 wrapper = Delivery(dlv)
2560 pn_delivery_set_context(dlv, wrapper)
2561 return wrapper
2562
2564
2565 RECEIVED = PN_RECEIVED
2566 ACCEPTED = PN_ACCEPTED
2567 REJECTED = PN_REJECTED
2568 RELEASED = PN_RELEASED
2569 MODIFIED = PN_MODIFIED
2570
2572 self._impl = impl
2573 self.local = local
2574 self._data = None
2575 self._condition = None
2576 self._annotations = None
2577
2578 @property
2580 return pn_disposition_type(self._impl)
2581
2583 return pn_disposition_get_section_number(self._impl)
2585 pn_disposition_set_section_number(self._impl, n)
2586 section_number = property(_get_section_number, _set_section_number)
2587
2589 return pn_disposition_get_section_offset(self._impl)
2591 pn_disposition_set_section_offset(self._impl, n)
2592 section_offset = property(_get_section_offset, _set_section_offset)
2593
2595 return pn_disposition_is_failed(self._impl)
2597 pn_disposition_set_failed(self._impl, b)
2598 failed = property(_get_failed, _set_failed)
2599
2601 return pn_disposition_is_undeliverable(self._impl)
2603 pn_disposition_set_undeliverable(self._impl, b)
2604 undeliverable = property(_get_undeliverable, _set_undeliverable)
2605
2607 if self.local:
2608 return self._data
2609 else:
2610 return dat2obj(pn_disposition_data(self._impl))
2612 if self.local:
2613 self._data = obj
2614 else:
2615 raise AttributeError("data attribute is read-only")
2616 data = property(_get_data, _set_data)
2617
2619 if self.local:
2620 return self._annotations
2621 else:
2622 return dat2obj(pn_disposition_annotations(self._impl))
2624 if self.local:
2625 self._annotations = obj
2626 else:
2627 raise AttributeError("annotations attribute is read-only")
2628 annotations = property(_get_annotations, _set_annotations)
2629
2631 if self.local:
2632 return self._condition
2633 else:
2634 return cond2obj(pn_disposition_condition(self._impl))
2636 if self.local:
2637 self._condition = obj
2638 else:
2639 raise AttributeError("condition attribute is read-only")
2640 condition = property(_get_condition, _set_condition)
2641
2643
2644 RECEIVED = Disposition.RECEIVED
2645 ACCEPTED = Disposition.ACCEPTED
2646 REJECTED = Disposition.REJECTED
2647 RELEASED = Disposition.RELEASED
2648 MODIFIED = Disposition.MODIFIED
2649
2651 self._dlv = dlv
2652 self.local = Disposition(pn_delivery_local(self._dlv), True)
2653 self.remote = Disposition(pn_delivery_remote(self._dlv), False)
2654
2655 @property
2657 return pn_delivery_tag(self._dlv)
2658
2659 @property
2661 return pn_delivery_writable(self._dlv)
2662
2663 @property
2665 return pn_delivery_readable(self._dlv)
2666
2667 @property
2669 return pn_delivery_updated(self._dlv)
2670
2672 obj2dat(self.local._data, pn_disposition_data(self.local._impl))
2673 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl))
2674 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl))
2675 pn_delivery_update(self._dlv, state)
2676
2677 @property
2679 return pn_delivery_pending(self._dlv)
2680
2681 @property
2683 return pn_delivery_partial(self._dlv)
2684
2685 @property
2687 return pn_delivery_local_state(self._dlv)
2688
2689 @property
2691 return pn_delivery_remote_state(self._dlv)
2692
2693 @property
2695 return pn_delivery_settled(self._dlv)
2696
2698 pn_delivery_settle(self._dlv)
2699
2700 @property
2702 return wrap_delivery(pn_work_next(self._dlv))
2703
2704 @property
2706 return wrap_link(pn_delivery_link(self._dlv))
2707
2710
2712
2713 TRACE_DRV = PN_TRACE_DRV
2714 TRACE_FRM = PN_TRACE_FRM
2715 TRACE_RAW = PN_TRACE_RAW
2716
2718 if not _trans:
2719 self._trans = pn_transport()
2720 else:
2721 self._shared_trans = True
2722 self._trans = _trans
2723 self._sasl = None
2724 self._ssl = None
2725
2727 if hasattr(self, "_trans"):
2728 if not hasattr(self, "_shared_trans"):
2729 pn_transport_free(self._trans)
2730 if hasattr(self, "_sasl") and self._sasl:
2731
2732
2733 self._sasl._sasl = None
2734 self._sasl = None
2735 if hasattr(self, "_ssl") and self._ssl:
2736
2737 self._ssl._ssl = None
2738 self._ssl = None
2739 del self._trans
2740
2742 if err < 0:
2743 exc = EXCEPTIONS.get(err, TransportException)
2744 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans))))
2745 else:
2746 return err
2747
2748 - def bind(self, connection):
2749 self._check(pn_transport_bind(self._trans, connection._conn))
2750
2752 pn_transport_trace(self._trans, n)
2753
2754 - def tick(self, now):
2755 """Process any timed events (like heartbeat generation).
2756 now = seconds since epoch (float).
2757 """
2758 next = pn_transport_tick(self._trans, long(now * 1000))
2759 return float(next) / 1000.0
2760
2762 c = pn_transport_capacity(self._trans)
2763 if c >= PN_EOS:
2764 return c
2765 else:
2766 return self._check(c)
2767
2768 - def push(self, bytes):
2769 self._check(pn_transport_push(self._trans, bytes))
2770
2772 self._check(pn_transport_close_tail(self._trans))
2773
2775 p = pn_transport_pending(self._trans)
2776 if p >= PN_EOS:
2777 return p
2778 else:
2779 return self._check(p)
2780
2781 - def peek(self, size):
2782 cd, out = pn_transport_peek(self._trans, size)
2783 if cd == PN_EOS:
2784 return None
2785 else:
2786 self._check(cd)
2787 return out
2788
2789 - def pop(self, size):
2790 pn_transport_pop(self._trans, size)
2791
2793 self._check(pn_transport_close_head(self._trans))
2794
2796 p = self.pending()
2797 if p < 0:
2798 return None
2799 else:
2800 out = self.peek(min(size, p))
2801 self.pop(len(out))
2802 return out
2803
2815
2816
2818 return pn_transport_get_max_frame(self._trans)
2819
2821 pn_transport_set_max_frame(self._trans, value)
2822
2823 max_frame_size = property(_get_max_frame_size, _set_max_frame_size,
2824 doc="""
2825 Sets the maximum size for received frames (in bytes).
2826 """)
2827
2828 @property
2830 return pn_transport_get_remote_max_frame(self._trans)
2831
2832
2834 msec = pn_transport_get_idle_timeout(self._trans)
2835 return float(msec)/1000.0
2836
2838 pn_transport_set_idle_timeout(self._trans, long(sec * 1000))
2839
2840 idle_timeout = property(_get_idle_timeout, _set_idle_timeout,
2841 doc="""
2842 The idle timeout of the connection (in milliseconds).
2843 """)
2844
2845 @property
2847 msec = pn_transport_get_remote_idle_timeout(self._trans)
2848 return float(msec)/1000.0
2849
2850 @property
2852 return pn_transport_get_frames_output(self._trans)
2853
2854 @property
2857
2859
2860 if not self._sasl:
2861 self._sasl = SASL(self)
2862 return self._sasl
2863
2864 - def ssl(self, domain=None, session_details=None):
2865
2866 if not self._ssl:
2867 self._ssl = SSL(self, domain, session_details)
2868 return self._ssl
2869
2872
2873 -class SASL(object):
2874
2875 OK = PN_SASL_OK
2876 AUTH = PN_SASL_AUTH
2877
2885
2887 if err < 0:
2888 exc = EXCEPTIONS.get(err, SASLException)
2889 raise exc("[%s]" % (err))
2890 else:
2891 return err
2892
2894 pn_sasl_mechanisms(self._sasl, mechs)
2895
2897 pn_sasl_client(self._sasl)
2898
2900 pn_sasl_server(self._sasl)
2901
2902 - def plain(self, user, password):
2903 pn_sasl_plain(self._sasl, user, password)
2904
2905 - def send(self, data):
2906 self._check(pn_sasl_send(self._sasl, data, len(data)))
2907
2909 sz = 16
2910 while True:
2911 n, data = pn_sasl_recv(self._sasl, sz)
2912 if n == PN_OVERFLOW:
2913 sz *= 2
2914 continue
2915 elif n == PN_EOS:
2916 return None
2917 else:
2918 self._check(n)
2919 return data
2920
2921 @property
2923 outcome = pn_sasl_outcome(self._sasl)
2924 if outcome == PN_SASL_NONE:
2925 return None
2926 else:
2927 return outcome
2928
2929 - def done(self, outcome):
2930 pn_sasl_done(self._sasl, outcome)
2931
2932 STATE_CONF = PN_SASL_CONF
2933 STATE_IDLE = PN_SASL_IDLE
2934 STATE_STEP = PN_SASL_STEP
2935 STATE_PASS = PN_SASL_PASS
2936 STATE_FAIL = PN_SASL_FAIL
2937
2938 @property
2940 return pn_sasl_state(self._sasl)
2941
2945
2948
2949 -class SSLDomain(object):
2950
2951 MODE_CLIENT = PN_SSL_MODE_CLIENT
2952 MODE_SERVER = PN_SSL_MODE_SERVER
2953 VERIFY_PEER = PN_SSL_VERIFY_PEER
2954 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME
2955 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER
2956
2957 - def __init__(self, mode):
2958 self._domain = pn_ssl_domain(mode)
2959 if self._domain is None:
2960 raise SSLUnavailable()
2961
2962 - def _check(self, err):
2963 if err < 0:
2964 exc = EXCEPTIONS.get(err, SSLException)
2965 raise exc("SSL failure.")
2966 else:
2967 return err
2968
2969 - def set_credentials(self, cert_file, key_file, password):
2970 return self._check( pn_ssl_domain_set_credentials(self._domain,
2971 cert_file, key_file,
2972 password) )
2973 - def set_trusted_ca_db(self, certificate_db):
2974 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain,
2975 certificate_db) )
2976 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
2977 return self._check( pn_ssl_domain_set_peer_authentication(self._domain,
2978 verify_mode,
2979 trusted_CAs) )
2980
2982 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
2983
2985
2992
2993 - def __new__(cls, transport, domain, session_details=None):
2994 """Enforce a singleton SSL object per Transport"""
2995 if transport._ssl:
2996
2997
2998
2999 ssl = transport._ssl
3000 if (domain and (ssl._domain is not domain) or
3001 session_details and (ssl._session_details is not session_details)):
3002 raise SSLException("Cannot re-configure existing SSL object!")
3003 else:
3004 obj = super(SSL, cls).__new__(cls)
3005 obj._domain = domain
3006 obj._session_details = session_details
3007 session_id = None
3008 if session_details:
3009 session_id = session_details.get_session_id()
3010 obj._ssl = pn_ssl( transport._trans )
3011 if obj._ssl is None:
3012 raise SSLUnavailable()
3013 pn_ssl_init( obj._ssl, domain._domain, session_id )
3014 transport._ssl = obj
3015 return transport._ssl
3016
3018 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 )
3019 if rc:
3020 return name
3021 return None
3022
3024 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 )
3025 if rc:
3026 return name
3027 return None
3028
3029 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN
3030 RESUME_NEW = PN_SSL_RESUME_NEW
3031 RESUME_REUSED = PN_SSL_RESUME_REUSED
3032
3034 return pn_ssl_resume_status( self._ssl )
3035
3037 self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
3039 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 )
3040 self._check(err)
3041 return name
3042 peer_hostname = property(_get_peer_hostname, _set_peer_hostname,
3043 doc="""
3044 Manage the expected name of the remote peer. Used to authenticate the remote.
3045 """)
3046
3049 """ Unique identifier for the SSL session. Used to resume previous session on a new
3050 SSL connection.
3051 """
3052
3054 self._session_id = session_id
3055
3057 return self._session_id
3058
3065 """
3066 The DriverException class is the root of the driver exception hierarchy.
3067 """
3068 pass
3069
3072 if not cxtr: return None
3073 ctx = pn_connector_context(cxtr)
3074 if ctx: return ctx
3075 wrapper = Connector(_cxtr=cxtr)
3076 pn_connector_set_context(cxtr, wrapper)
3077 return wrapper
3078
3082
3084 return wrap_connector(pn_connector_next(self._cxtr))
3085
3087 pn_connector_process(self._cxtr)
3088
3090 return wrap_listener(pn_connector_listener(self._cxtr))
3091
3098
3099 @property
3101 trans = pn_connector_transport(self._cxtr)
3102 if trans:
3103 return Transport(trans)
3104 return None
3105
3107 return pn_connector_close(self._cxtr)
3108
3109 @property
3111 return pn_connector_closed(self._cxtr)
3112
3114 return wrap_connection(pn_connector_connection(self._cxtr))
3115
3117 pn_connector_set_connection(self._cxtr, conn._conn)
3118
3119 connection = property(_get_connection, _set_connection,
3120 doc="""
3121 Associate a Connection with this Connector.
3122 """)
3123
3125 if not lsnr: return None
3126 ctx = pn_listener_context(lsnr)
3127 if ctx: return ctx
3128 wrapper = Listener(_lsnr=lsnr)
3129 pn_listener_set_context(lsnr, wrapper)
3130 return wrapper
3131
3135
3137 return wrap_listener(pn_listener_next(self._lsnr))
3138
3140 cxtr = pn_listener_accept(self._lsnr)
3141 return wrap_connector(cxtr)
3142
3144 pn_listener_close(self._lsnr)
3145
3148 self._driver = pn_driver()
3149
3151 if hasattr(self, "_driver"):
3152 pn_driver_free(self._driver)
3153 del self._driver
3154
3155 - def wait(self, timeout_sec):
3156 if timeout_sec is None or timeout_sec < 0.0:
3157 t = -1
3158 else:
3159 t = long(1000*timeout_sec)
3160 return pn_driver_wait(self._driver, t)
3161
3163 return pn_driver_wakeup(self._driver)
3164
3166 return wrap_listener(pn_listener(self._driver, host, port, None))
3167
3169 return wrap_listener(pn_driver_listener(self._driver))
3170
3172 return wrap_listener(pn_listener_head(self._driver))
3173
3175 return wrap_connector(pn_connector(self._driver, host, port, None))
3176
3178 return wrap_connector(pn_connector_head(self._driver))
3179
3181 return wrap_connector(pn_driver_connector(self._driver))
3182
3183 __all__ = [
3184 "API_LANGUAGE",
3185 "IMPLEMENTATION_LANGUAGE",
3186 "ABORTED",
3187 "ACCEPTED",
3188 "AUTOMATIC",
3189 "PENDING",
3190 "MANUAL",
3191 "REJECTED",
3192 "RELEASED",
3193 "SETTLED",
3194 "UNDESCRIBED",
3195 "Array",
3196 "Condition",
3197 "Connection",
3198 "Connector",
3199 "Data",
3200 "Delivery",
3201 "Disposition",
3202 "Described",
3203 "Driver",
3204 "DriverException",
3205 "Endpoint",
3206 "Link",
3207 "Listener",
3208 "Message",
3209 "MessageException",
3210 "Messenger",
3211 "MessengerException",
3212 "ProtonException",
3213 "Receiver",
3214 "SASL",
3215 "Sender",
3216 "Session",
3217 "SSL",
3218 "SSLDomain",
3219 "SSLSessionDetails",
3220 "SSLUnavailable",
3221 "SSLException",
3222 "Terminus",
3223 "Timeout",
3224 "Interrupt",
3225 "Transport",
3226 "TransportException",
3227 "char",
3228 "symbol",
3229 "timestamp",
3230 "ulong"
3231 ]
3232