class Qpid::Proton::Reactor::Container

A representation of the AMQP concept of a container which, loosely speaking, is something that establishes links to or from another container on which messages are transferred.

This is an extension to the Reactor classthat adds convenience methods for creating instances of Qpid::Proton::Connection, Qpid::Proton::Sender and Qpid::Proton::Receiver.

Attributes

container_id[RW]
global_handler[RW]

Public Class Methods

new(handlers, options = {}) click to toggle source
Calls superclass method
# File lib/reactor/container.rb, line 55
def initialize(handlers, options = {})
  super(handlers, options)

  # only do the following if we're creating a new instance
  if !options.has_key?(:impl)
    @ssl = SSLConfig.new
    if options[:global_handler]
      self.global_handler = GlobalOverrides.new(options[:global_handler])
    else
      # very ugly, but using self.global_handler doesn't work in the constructor
      ghandler = Reactor.instance_method(:global_handler).bind(self).call
      ghandler = GlobalOverrides.new(ghandler)
      Reactor.instance_method(:global_handler=).bind(self).call(ghandler)
    end
    @trigger = nil
    @container_id = generate_uuid
  end
end

Public Instance Methods

connect(url, opts = {}) click to toggle source

Initiate an AMQP connection.

@param url [String] Connect to URL host:port, using user:password@ if present @param opts [Hash] Named options

For backwards compatibility, can be called with a single parameter opts.

@option opts [String] :url Connect to URL host:port using user:password@ if present. @option opts [String] :user user name for authentication if not given by URL @option opts [String] :password password for authentication if not given by URL @option opts [Numeric] :idle_timeout seconds before closing an idle connection,

can be a fractional value.

@option opts [Boolean] :sasl_enabled Enable or disable SASL. @option opts [Boolean] :sasl_allow_insecure_mechs Allow mechanisms that disclose clear text

passwords, even over an insecure connection. By default, such mechanisms are only allowed
when SSL is enabled.

@option opts [String] :sasl_allowed_mechs the allowed SASL mechanisms for use on the connection.

@option opts [String] :address deprecated use the :url option @option opts [Numeric] :heartbeat milliseconds before closing an idle connection.

*deprecated* use :idle_timeout => heartbeat/1000

@return [Connection] the new connection

# File lib/reactor/container.rb, line 97
def connect(url, opts = {})
  # Backwards compatible with old connect(options)
  if url.is_a? Hash and opts.empty?
    opts = url
    url = nil
  end
  conn = self.connection(opts[:handler])
  conn.container = self.container_id || generate_uuid
  connector = Connector.new(conn, url, opts)
  return conn
end
create_receiver(context, opts = {}) click to toggle source

Initiates the establishment of a link over which messages can be received.

There are two accepted arguments for the context

1. If a Connection is supplied then the link is established using that

object. The source, and optionally the target, address can be supplied

2. If it is a String or a URL then a new Connection is created on which

the link will be attached. If a path is specified, but not the source address, then the path of the URL is used as the target address.

The name will be generated for the link if one is not specified.

@param context [Connection, URL, String] The connection or the address. @param opts [Hash] Additional otpions. @option opts [String, Qpid::Proton::URL] The source address. @option opts [String] :target The target address @option opts [String] :name The link name. @option opts [Boolean] :dynamic @option opts [Object] :handler @option opts [Hash] :options Additional link options.

@return [Receiver

# File lib/reactor/container.rb, line 186
def create_receiver(context, opts = {})
  if context.is_a?(::String)
    context = Qpid::Proton::URL.new(context)
  end

  source = opts[:source]
  if context.is_a?(Qpid::Proton::URL) && source.nil?
    source = context.path
  end

  session = _session(context)

  receiver = session.receiver(opts[:name] ||
                              id(session.connection.container,
                                  source, opts[:target]))
  receiver.source.address = source if source
  receiver.source.dynamic = true if opts.has_key?(:dynamic) && opts[:dynamic]
  receiver.target.address = opts[:target] if !opts[:target].nil?
  receiver.handler = opts[:handler] if !opts[:handler].nil?
  _apply_link_options(opts[:options], receiver)
  receiver.open
  return receiver
end
create_sender(context, opts = {}) click to toggle source

Initiates the establishment of a link over which messages can be sent.

@param context [String, URL] The context. @param opts [Hash] Additional options. @option opts [String, Qpid::Proton::URL] The target address. @option opts [String] :source The source address. @option opts [Boolean] :dynamic @option opts [Object] :handler @option opts [Object] :tag_generator The tag generator. @option opts [Hash] :options Addtional link options

@return [Sender] The sender.

# File lib/reactor/container.rb, line 140
def create_sender(context, opts = {})
  if context.is_a?(::String)
    context = Qpid::Proton::URL.new(context)
  end

  target = opts[:target]
  if context.is_a?(Qpid::Proton::URL) && target.nil?
    target = context.path
  end

  session = _session(context)

  sender = session.sender(opts[:name] ||
                          id(session.connection.container,
                            target, opts[:source]))
    sender.source.address = opts[:source] if !opts[:source].nil?
    sender.target.address = target if target
    sender.handler = opts[:handler] if !opts[:handler].nil?
    sender.tag_generator = opts[:tag_generator] if !opts[:tag_gnenerator].nil?
    _apply_link_options(opts[:options], sender)
    sender.open
    return sender
end
declare_transaction(context, handler = nil, settle_before_discharge = false) click to toggle source
# File lib/reactor/container.rb, line 210
def declare_transaction(context, handler = nil, settle_before_discharge = false)
  if context.respond_to? :txn_ctl && !context.__send__(:txn_ctl).nil?
    class << context
      attr_accessor :txn_ctl
    end
    context.txn_ctl = self.create_sender(context, nil, "txn-ctl",
    InternalTransactionHandler.new())
  end
  return Transaction.new(context.txn_ctl, handler, settle_before_discharge)
end
listen(url, ssl_domain = nil) click to toggle source

Initiates a server socket, accepting incoming AMQP connections on the interface and port specified.

@param url [] @param ssl_domain []

# File lib/reactor/container.rb, line 227
def listen(url, ssl_domain = nil)
  url = Qpid::Proton::URL.new(url)
  acceptor = self.acceptor(url.host, url.port)
  ssl_config = ssl_domain
  if ssl_config.nil? && (url.scheme == 'amqps') && @ssl
    ssl_config = @ssl.server
  end
  if !ssl_config.nil?
    acceptor.ssl_domain(ssl_config)
  end
  return acceptor
end

Private Instance Methods

_session(context) click to toggle source
# File lib/reactor/container.rb, line 110
def _session(context)
  if context.is_a?(Qpid::Proton::URL)
    return _session(self.connect(:url => context))
  elsif context.is_a?(Qpid::Proton::Session)
    return context
  elsif context.is_a?(Qpid::Proton::Connection)
    if context.session_policy?
      return context.session_policy.session(context)
    else
      return self.create_session(context)
    end
  else
    return context.session
  end
end
id(container, remote, local) click to toggle source
# File lib/reactor/container.rb, line 242
def id(container, remote, local)
  if !local.nil? && !remote.nil?
    "#{container}-#{remote}-#{local}"
  elsif !local.nil?
    "#{container}-#{local}"
  elsif !remote.nil?
    "#{container}-#{remote}"
  else
    "#{container}-#{generate_uuid}"
  end
end
to_s() click to toggle source
# File lib/reactor/container.rb, line 264
def to_s
  "#{self.class}<@impl=#{Cproton.pni_address_of(@impl)}>"
end