The Messenger
class defines a high level interface for sending
and receiving Messages. Every Messenger
contains a single logical queue of incoming messages and a single logical
queue of outgoing messages. These messages in these queues may be destined
for, or originate from, a variety of addresses.
The messenger interface is single-threaded. All methods except one ( interrupt ) are intended to be used from within the messenger thread.
The Messenger class works in conjuction with the Message class. The Message class is a mutable holder of message content.
The put method copies its Message to the outgoing queue, and may send queued messages if it can do so without blocking. The send method blocks until it has sent the requested number of messages, or until a timeout interrupts the attempt.
Similarly, the recv method receives messages into the incoming queue, and may block as it attempts to receive the requested number of messages, or until timeout is reached. It may receive fewer than the requested number. The get method pops the eldest Message off the incoming queue and copies it into the Message object that you supply. It will not block.
The blocking attribute allows you to turn off blocking behavior entirely, in which case send and recv will do whatever they can without blocking, and then return. You can then look at the number of incoming and outgoing messages to see how much outstanding work still remains.
Creates a new Messenger
.
The name
parameter is optional. If one is not provided then a
unique name is generated.
name - the name (def. nil)
# File lib/qpid_proton/messenger.rb, line 71 def initialize(name = nil) @impl = Cproton.pn_messenger(name) ObjectSpace.define_finalizer(self, self.class.finalize!(@impl)) end
Signal the sender that you have acted on the Message pointed to by the tracker. If no tracker is supplied, then all messages that have been returned by the get method are accepted, except those that have already been auto-settled by passing beyond your incoming window size.
tracker - the tracker
# File lib/qpid_proton/messenger.rb, line 486 def accept(tracker = nil) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end check_for_error(Cproton.pn_messenger_accept(@impl, tracker.impl, flag)) end
Blocking Attribute
Enable or disable blocking behavior during message sending and receiving. This affects every blocking call, with the exception of work(). Currently, the affected calls are send, recv, and stop.
# File lib/qpid_proton/messenger.rb, line 130 def blocking Cproton.pn_mesenger_is_blocking(@impl) end
# File lib/qpid_proton/messenger.rb, line 134 def blocking=(blocking) Cproton.pn_messenger_set_blocking(@impl, blocking) end
Returns the path to a certificate file.
# File lib/qpid_proton/messenger.rb, line 213 def certificate Cproton.pn_messenger_get_certificate(@impl) end
Path to a certificate file for the Messenger
.
This certificate is used when the Messenger
accepts or
establishes SSL/TLS connections. This property must be specified for the
Messenger to accept incoming SSL/TLS
connections and to establish client authenticated outgoing SSL/TLS
connection. Non client authenticated outgoing SSL/TLS connections do not
require this property.
certificate - the certificate
# File lib/qpid_proton/messenger.rb, line 207 def certificate=(certificate) Cproton.pn_messenger_set_certificate(@impl, certificate) end
Returns the most recent error number.
# File lib/qpid_proton/messenger.rb, line 146 def errno Cproton.pn_messenger_errno(@impl) end
Returns the most recent error message.
# File lib/qpid_proton/messenger.rb, line 152 def error Cproton.pn_error_text(Cproton.pn_messenger_error(@impl)) end
Reports whether an error occurred.
# File lib/qpid_proton/messenger.rb, line 140 def error? !Cproton.pn_messenger_errno(@impl).zero? end
Moves the message from the head of the incoming message queue into the supplied message object. Any content in the supplied message will be overwritten. A tracker for the incoming Message is returned. The tracker can later be used to communicate your acceptance or rejection of the Message.
If no message is provided in the argument, then one is created. In either case, the one returned will be the fetched message.
msg - the (optional) Message
instance to be used
# File lib/qpid_proton/messenger.rb, line 303 def get(msg = nil) msg_impl = nil if msg.nil? then msg_impl = nil else msg_impl = msg.impl end check_for_error(Cproton.pn_messenger_get(@impl, msg_impl)) msg.post_decode unless msg.nil? return incoming_tracker end
Returns the number of messages in the incoming queue that have not been retrieved.
# File lib/qpid_proton/messenger.rb, line 375 def incoming Cproton.pn_messenger_incoming(@impl) end
Returns a Tracker
for the most recently received message.
# File lib/qpid_proton/messenger.rb, line 470 def incoming_tracker impl = Cproton.pn_messenger_incoming_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end
Returns the incoming window.
# File lib/qpid_proton/messenger.rb, line 575 def incoming_window Cproton.pn_messenger_get_incoming_window(@impl) end
Sets the incoming window.
The Messenger will track the remote status of this many incoming deliveries after they have been accepted or rejected.
Messages enter this window only when you take them into your application using get(). If your incoming window size is n, and you get n+1 messages without explicitly accepting or rejecting the oldest message, then the message that passes beyond the edge of the incoming window will be assigned the default disposition of its link.
window - the window size
# File lib/qpid_proton/messenger.rb, line 568 def incoming_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) check_for_error(Cproton.pn_messenger_set_incoming_window(@impl, window)) end
Attempts interrupting of the messenger thread.
The Messenger interface is single-threaded, and this is the only function intended to be called from outside of is thread.
Call this from a non-Messenger thread to interrupt it while it is blocking. This will cause a ::InterruptError to be raised.
If there is no currently blocking call, then the next blocking call will be affected, even if it is within the same thread that originated the interrupt.
# File lib/qpid_proton/messenger.rb, line 345 def interrupt check_for_error(Cproton.pn_messenger_interrupt(@impl)) end
Returns the name.
# File lib/qpid_proton/messenger.rb, line 84 def name Cproton.pn_messenger_name(@impl) end
Returns the number messages in the outgoing queue that have not been transmitted.
# File lib/qpid_proton/messenger.rb, line 368 def outgoing Cproton.pn_messenger_outgoing(@impl) end
Returns a Tracker
for the message most recently sent via the
put method.
# File lib/qpid_proton/messenger.rb, line 462 def outgoing_tracker impl = Cproton.pn_messenger_outgoing_tracker(@impl) return nil if impl == -1 Qpid::Proton::Tracker.new(impl) end
Returns the outgoing window.
# File lib/qpid_proton/messenger.rb, line 599 def outgoing_window Cproton.pn_messenger_get_outgoing_window(@impl) end
Sets the outgoing window.
The Messenger will track the remote status of this many outgoing deliveries after calling send. A Message enters this window when you call the put() method with the message. If your outgoing window size is n, and you call put n+1 times, status information will no longer be available for the first message.
window - the window size
# File lib/qpid_proton/messenger.rb, line 592 def outgoing_window=(window) raise TypeError.new("invalid window: #{window}") unless valid_window?(window) check_for_error(Cproton.pn_messenger_set_outgoing_window(@impl, window)) end
Returns the password property for the #private_key file.
# File lib/qpid_proton/messenger.rb, line 101 def password Cproton.pn_messenger_get_password(@impl) end
This property contains the password for the #private_key file, or
nil
if the file is not encrypted.
password - the password
# File lib/qpid_proton/messenger.rb, line 95 def password=(password) check_for_error(Cproton.pn_messenger_set_password(@impl, password)) end
Returns the path to a private key file.
# File lib/qpid_proton/messenger.rb, line 234 def private_key Cproton.pn_messenger_get_private_key(@impl) end
Path to a private key file for the Messenger
.
The property must be specified for the Messenger
to accept
incoming SSL/TLS connections and to establish client authenticated outgoing
SSL/TLS connections. Non client authenticated SSL/TLS connections do not
require this property.
key - the key file
# File lib/qpid_proton/messenger.rb, line 228 def private_key=(key) Cproton.pn_messenger_set_private_key(@impl, key) end
Places the content contained in the message onto the outgoing queue of the Messenger.
This method will never block, however it will send any unblocked Messages in the outgoing queue immediately and leave any blocked Messages remaining in the outgoing queue. The send call may then be used to block until the outgoing queue is empty. The outgoing attribute may be used to check the depth of the outgoing queue.
message - the message
# File lib/qpid_proton/messenger.rb, line 270 def put(message) raise TypeError.new("invalid message: #{message}") if message.nil? raise ArgumentError.new("invalid message type: #{message.class}") unless message.kind_of?(Message) # encode the message first message.pre_encode check_for_error(Cproton.pn_messenger_put(@impl, message.impl)) return outgoing_tracker end
Receives up to limit messages into the incoming queue. If no value for limit is supplied, this call will receive as many messages as it can buffer internally. If the Messenger is in blocking mode, this call will block until at least one Message is available in the incoming queue.
Options ====
limit - the maximum number of messages to receive
# File lib/qpid_proton/messenger.rb, line 325 def receive(limit = -1) check_for_error(Cproton.pn_messenger_recv(@impl, limit)) end
# File lib/qpid_proton/messenger.rb, line 329 def receiving Cproton.pn_messenger_receiving(@impl) end
Rejects the incoming message identified by the tracker. If no tracker is supplied, all messages that have been returned by the get method are rejected, except those that have already been auto-settled by passing beyond your outgoing window size.
tracker - the tracker
# File lib/qpid_proton/messenger.rb, line 506 def reject(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless tracker.nil? or valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end check_for_error(Cproton.pn_messenger_reject(@impl, tracker.impl, flag)) end
Similar to route, except that the destination of the Message is determined before the message address is rewritten.
The outgoing address is only rewritten after routing has been finalized. If a message has an outgoing address of “amqp://0.0.0.0:5678”, and a rewriting rule that changes its outgoing address to “foo”, it will still arrive at the peer that is listening on “amqp://0.0.0.0:5678”, but when it arrives there, the receiver will see its outgoing address as “foo”.
The default rewrite rule removes username and password from addresses before they are transmitted.
pattern - the outgoing address
address - the target address
# File lib/qpid_proton/messenger.rb, line 455 def rewrite(pattern, address) check_for_error(Cproton.pn_messenger_rewrite(@impl, pattern, address)) end
Adds a routing rule to the Messenger's internal routing table.
The route procedure may be used to influence how a Messenger will internally treat a given address or class of addresses. Every call to the route procedure will result in Messenger appending a routing rule to its internal routing table.
Whenever a Message is presented to a Messenger for delivery, it will match the address of this message against the set of routing rules in order. The first rule to match will be triggered, and instead of routing based on the address presented in the message, the Messenger will route based on the address supplied in the rule.
The pattern matching syntax supports two types of matches, a '%' will match any character except a '/', and a '*' will match any character including a '/'.
A routing address is specified as a normal AMQP address, however it may additionally use substitution variables from the pattern match that triggered the rule.
pattern - the address pattern
address - the target address
# route messages sent to foo to the destionaty amqp://foo.com messenger.route("foo", "amqp://foo.com") # any message to foobar will be routed to amqp://foo.com/bar messenger.route("foobar", "amqp://foo.com/bar") # any message to bar/<path> will be routed to the same path within # the amqp://bar.com domain messenger.route("bar/*", "amqp://bar.com/$1") # route all Message objects over TLS messenger.route("amqp:*", "amqps:$1") # supply credentials for foo messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1") # supply credentials for all domains messenger.route("amqp://*", "amqp://user:password@$1") # route all addresses through a single proxy while preserving the # original destination messenger.route("amqp://%$/*", "amqp://user:password@proxy/$1/$2") # route any address through a single broker messenger.route("*", "amqp://user:password@broker/$1")
# File lib/qpid_proton/messenger.rb, line 433 def route(pattern, address) check_for_error(Cproton.pn_messenger_route(@impl, pattern, address)) end
This call will block until the indicated number of messages have been sent, or until the operation times out. If n is -1 this call will block until all outgoing messages have been sent. If n is 0 then this call will send whatever it can without blocking.
# File lib/qpid_proton/messenger.rb, line 285 def send(n = -1) check_for_error(Cproton.pn_messenger_send(@impl, n)) end
Frees a Messenger from tracking the status associated with a given tracker. If you don't supply a tracker, all outgoing messages up to the most recent will be settled.
tracker - the tracker
# File lib/qpid_proton/messenger.rb, line 542 def settle(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) if tracker.nil? then tracker = self.incoming_tracker flag = Cproton::PN_CUMULATIVE else flag = 0 end Cproton.pn_messenger_settle(@impl, tracker.impl, flag) end
Currently a no-op placeholder. For future compatibility, do not send or
recv messages before starting the Messenger
.
# File lib/qpid_proton/messenger.rb, line 160 def start check_for_error(Cproton.pn_messenger_start(@impl)) end
Gets the last known remote state of the delivery associated with the given tracker, as long as the Message is still within your outgoing window. (Also works on incoming messages that are still within your incoming queue. See TrackerStatus for details on the values returned.
tracker - the tracker
# File lib/qpid_proton/messenger.rb, line 527 def status(tracker) raise TypeError.new("invalid tracker: #{tracker}") unless valid_tracker?(tracker) Qpid::Proton::TrackerStatus.by_value(Cproton.pn_messenger_status(@impl, tracker.impl)) end
Stops the Messenger
, preventing it from sending or receiving
any more messages.
# File lib/qpid_proton/messenger.rb, line 167 def stop check_for_error(Cproton.pn_messenger_stop(@impl)) end
Returns true iff a Messenger is in the stopped state. This function does not block.
# File lib/qpid_proton/messenger.rb, line 174 def stopped Cproton.pn_messenger_stopped(@impl) end
Subscribes the Messenger to messages originating from the specified source. The source is an address as specified in the Messenger introduction with the following addition. If the domain portion of the address begins with the '~' character, the Messenger will interpret the domain as host/port, bind to it, and listen for incoming messages. For example “~0.0.0.0”, “amqp://~0.0.0.0” will all bind to any local interface and listen for incoming messages. Ad address of # “amqps://~0.0.0.0” will only permit incoming SSL connections.
# File lib/qpid_proton/messenger.rb, line 188 def subscribe(address) raise TypeError.new("invalid address: #{address}") if address.nil? subscription = Cproton.pn_messenger_subscribe(@impl, address) raise Qpid::Proton::ProtonError.new("Subscribe failed") if subscription.nil? Qpid::Proton::Subscription.new(subscription) end
Returns the timeout period
# File lib/qpid_proton/messenger.rb, line 120 def timeout Cproton.pn_messenger_get_timeout(@impl) end
Sets the timeout period, in milliseconds.
A negative timeout period implies an infinite timeout.
timeout - the timeout period
# File lib/qpid_proton/messenger.rb, line 113 def timeout=(timeout) raise TypeError.new("invalid timeout: #{timeout}") if timeout.nil? Cproton.pn_messenger_set_timeout(@impl, timeout) end
The path to the databse of trusted certificates.
# File lib/qpid_proton/messenger.rb, line 252 def trusted_certificates Cproton.pn_messenger_get_trusted_certificates(@impl) end
A path to a database of trusted certificates for use in verifying the peer
on an SSL/TLS connection. If this property is nil
, then the
peer will not be verified.
certificates - the certificates path
# File lib/qpid_proton/messenger.rb, line 246 def trusted_certificates=(certificates) Cproton.pn_messenger_set_trusted_certificates(@impl,certificates) end
Sends or receives any outstanding messages queued for a Messenger.
This will block for the indicated timeout. This method may also do I/O other than sending and receiving messages. For example, closing connections after stop() has been called.
# File lib/qpid_proton/messenger.rb, line 355 def work(timeout=-1) err = Cproton.pn_messenger_work(@impl, timeout) if (err == Cproton::PN_TIMEOUT) then return false else check_for_error(err) return true end end
# File lib/qpid_proton/messenger.rb, line 605 def valid_tracker?(tracker) !tracker.nil? && tracker.is_a?(Qpid::Proton::Tracker) end
# File lib/qpid_proton/messenger.rb, line 609 def valid_window?(window) !window.nil? && [Float, Fixnum].include?(window.class) end