Home | Trees | Indices | Help |
---|
|
object --+ | Messenger
The Messenger class defines a high level interface for sending and receiving Messages. Every Messenger contains a single logical queue of incoming messages and a single logical queue of outgoing messages. These messages in these queues may be destined for, or originate from, a variety of addresses.
The messenger interface is single-threaded. All methods except one (interrupt) are intended to be used from within the messenger thread.
An address has the following form:
[ amqp[s]:// ] [user[:password]@] domain [/[name]]
Where domain can be one of:
host | host:port | ip | ip:port | name
The following are valid examples of addresses:
The Messenger class works in conjunction with the Message class. The Message class is a mutable holder of message content.
The put method copies its Message to the outgoing queue, and may send queued messages if it can do so without blocking. The send method blocks until it has sent the requested number of messages, or until a timeout interrupts the attempt.
>>> message = Message() >>> for i in range(3): ... message.address = "amqp://host/queue" ... message.subject = "Hello World %i" % i ... messenger.put(message) >>> messenger.send()
Similarly, the recv method receives messages into the incoming queue, and may block as it attempts to receive the requested number of messages, or until timeout is reached. It may receive fewer than the requested number. The get method pops the eldest Message off the incoming queue and copies it into the Message object that you supply. It will not block.
>>> message = Message() >>> messenger.recv(10): >>> while messenger.incoming > 0: ... messenger.get(message) ... print message.subject Hello World 0 Hello World 1 Hello World 2
The blocking flag allows you to turn off blocking behavior entirely, in which case send and recv will do whatever they can without blocking, and then return. You can then look at the number of incoming and outgoing messages to see how much outstanding work still remains.
Instance Methods | |||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
Inherited from |
Class Variables | |
certificate = property(_get_certificate, _set_certificate, doc
|
|
private_key = property(_get_private_key, _set_private_key, doc
|
|
password = property(_get_password, _set_password, doc=
|
|
trusted_certificates = property(_get_trusted_certificates, _se
|
|
timeout = property(_get_timeout, _set_timeout, doc=
|
|
blocking = property(_is_blocking, _set_blocking, doc=
|
|
passive = property(_is_passive, _set_passive, doc=
|
|
incoming_window = property(_get_incoming_window, _set_incoming
|
|
outgoing_window = property(_get_outgoing_window, _set_outgoing
|
Properties | |
Inherited from |
Method Details |
Construct a new Messenger with the given name. The name has global scope. If a NULL name is supplied, a UUID based name will be chosen.
|
Destroy the Messenger. This will close all connections that are managed by the Messenger. Call the stop method before destroying the Messenger. |
The name of the Messenger.
|
Currently a no-op placeholder. For future compatibility, do not send or recv messages before starting the Messenger. |
Transitions the Messenger to an inactive state. An inactive Messenger will not send or receive messages from its internal queues. A Messenger should be stopped before being discarded to ensure a clean shutdown handshake occurs on any internally managed connections. |
Returns true iff a Messenger is in the stopped state. This function does not block.
|
Subscribes the Messenger to messages originating from the specified source. The source is an address as specified in the Messenger introduction with the following addition. If the domain portion of the address begins with the '~' character, the Messenger will interpret the domain as host/port, bind to it, and listen for incoming messages. For example "~0.0.0.0", "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any local interface and listen for incoming messages with the last variant only permitting incoming SSL connections.
|
Places the content contained in the message onto the outgoing queue of the Messenger. This method will never block, however it will send any unblocked Messages in the outgoing queue immediately and leave any blocked Messages remaining in the outgoing queue. The send call may be used to block until the outgoing queue is empty. The outgoing property may be used to check the depth of the outgoing queue. When the content in a given Message object is copied to the outgoing message queue, you may then modify or discard the Message object without having any impact on the content in the outgoing queue. This method returns an outgoing tracker for the Message. The tracker can be used to determine the delivery status of the Message.
|
Gets the last known remote state of the delivery associated with the given tracker.
|
Checks if the delivery associated with the given tracker is still waiting to be sent.
|
Frees a Messenger from tracking the status associated with a given tracker. If you don't supply a tracker, all outgoing messages up to the most recent will be settled. |
This call will block until the indicated number of messages have been sent, or until the operation times out. If n is -1 this call will block until all outgoing messages have been sent. If n is 0 then this call will send whatever it can without blocking. |
Receives up to n messages into the incoming queue. If no value for n is supplied, this call will receive as many messages as it can buffer internally. If the Messenger is in blocking mode, this call will block until at least one Message is available in the incoming queue. |
Sends or receives any outstanding messages queued for a Messenger. This will block for the indicated timeout. This method may also do I/O work other than sending and receiving messages. For example, closing connections after messenger.stop() has been called. |
|
The Messenger interface is single-threaded. This is the only Messenger function intended to be called from outside of the Messenger thread. Call this from a non-messenger thread to interrupt a Messenger that is blocking. This will cause any in-progress blocking call to throw the Interrupt exception. If there is no currently blocking call, then the next blocking call will be affected, even if it is within the same thread that interrupt was called from. |
Moves the message from the head of the incoming message queue into the supplied message object. Any content in the message will be overwritten. A tracker for the incoming Message is returned. The tracker can later be used to communicate your acceptance or rejection of the Message. If None is passed in for the Message object, the Message popped from the head of the queue is discarded.
|
Signal the sender that you have acted on the Message pointed to by the tracker. If no tracker is supplied, then all messages that have been returned by the get method are accepted, except those that have already been auto-settled by passing beyond your incoming window size.
|
Rejects the Message indicated by the tracker. If no tracker is supplied, all messages that have been returned by the get method are rejected, except those that have already been auto-settled by passing beyond your outgoing window size.
|
The outgoing queue depth.
|
The incoming queue depth.
|
Adds a routing rule to a Messenger's internal routing table. The route procedure may be used to influence how a Messenger will internally treat a given address or class of addresses. Every call to the route procedure will result in Messenger appending a routing rule to its internal routing table. Whenever a Message is presented to a Messenger for delivery, it will match the address of this message against the set of routing rules in order. The first rule to match will be triggered, and instead of routing based on the address presented in the message, the Messenger will route based on the address supplied in the rule. The pattern matching syntax supports two types of matches, a '%' will match any character except a '/', and a '*' will match any character including a '/'. A routing address is specified as a normal AMQP address, however it may additionally use substitution variables from the pattern match that triggered the rule. Any message sent to "foo" will be routed to "amqp://foo.com": >>> messenger.route("foo", "amqp://foo.com"); Any message sent to "foobar" will be routed to "amqp://foo.com/bar": >>> messenger.route("foobar", "amqp://foo.com/bar"); Any message sent to bar/<path> will be routed to the corresponding path within the amqp://bar.com domain: >>> messenger.route("bar/*", "amqp://bar.com/$1"); Route all messages over TLS: >>> messenger.route("amqp:*", "amqps:$1") Supply credentials for foo.com: >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); Supply credentials for all domains: >>> messenger.route("amqp://*", "amqp://user:password@$1"); Route all addresses through a single proxy while preserving the original destination: >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); Route any address through a single broker: >>> messenger.route("*", "amqp://user:password@broker/$1"); |
Similar to route(), except that the destination of the Message is determined before the message address is rewritten. The outgoing address is only rewritten after routing has been finalized. If a message has an outgoing address of "amqp://0.0.0.0:5678", and a rewriting rule that changes its outgoing address to "foo", it will still arrive at the peer that is listening on "amqp://0.0.0.0:5678", but when it arrives there, the receiver will see its outgoing address as "foo". The default rewrite rule removes username and password from addresses before they are transmitted. |
|
Class Variable Details |
certificate
|
private_key
|
password
|
trusted_certificates
|
timeout
|
blocking
|
passive
|
incoming_window
|
outgoing_window
|
Home | Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Thu Mar 29 15:54:02 2018 | http://epydoc.sourceforge.net |