class Qpid::Proton::Reactor::Connector

Public Class Methods

new(connection, url, opts) click to toggle source
# File lib/reactor/connector.rb, line 24
def initialize(connection, url, opts)
  @connection, @opts = connection, opts
  @urls = URLs.new(url) if url
  opts.each do |k,v|
    case k
    when :url, :urls, :address
      @urls = URLs.new(v) unless @urls
    when :reconnect
      @reconnect = v
    end
  end
  raise ::ArgumentError.new("no url for connect") unless @urls

  # TODO aconway 2017-08-17: review reconnect configuration and defaults
  @reconnect = Backoff.new() unless @reconnect
  @ssl_domain = SessionPerConnection.new # TODO seems this should be configurable
  @connection.overrides = self
  @connection.open
end

Public Instance Methods

connect(connection) click to toggle source
# File lib/reactor/connector.rb, line 80
def connect(connection)
  url = @urls.next
  transport = Qpid::Proton::Transport.new
  @opts.each do |k,v|
    case k
    when :user
      connection.user = v
    when :password
      connection.password = v
    when :heartbeat
      transport.idle_timeout = v.to_i
    when :idle_timeout
      transport.idle_timeout = v.(v*1000).to_i
    when :sasl_enabled
      transport.sasl if v
    when :sasl_allow_insecure_mechs
      transport.sasl.allow_insecure_mechs = v
    when :sasl_allowed_mechs, :sasl_mechanisms
      transport.sasl.allowed_mechs = v
    end
  end

  # TODO aconway 2017-08-11: hostname setting is incorrect, reactor only
  connection.hostname = "#{url.host}:#{url.port}"
  connection.user = url.username if url.username && !url.username.empty?
  connection.password = url.password if url.password && !url.password.empty?

  transport.bind(connection)

  if (url.scheme == "amqps") && @ssl_domain
    @ssl = Qpid::Proton::SSL.new(transport, @ssl_domain)
    @ssl.peer_hostname = url.host
  end
end
on_connection_local_open(event) click to toggle source
# File lib/reactor/connector.rb, line 44
def on_connection_local_open(event)
  self.connect(event.connection)
end
on_connection_remote_close(event) click to toggle source
# File lib/reactor/connector.rb, line 76
def on_connection_remote_close(event)
  @connection = nil
end
on_connection_remote_open(event) click to toggle source
# File lib/reactor/connector.rb, line 48
def on_connection_remote_open(event)
  @reconnect.reset if @reconnect
end
on_timer_task(event) click to toggle source
# File lib/reactor/connector.rb, line 72
def on_timer_task(event)
  self.connect(@connection)
end
on_transport_closed(event) click to toggle source
# File lib/reactor/connector.rb, line 56
def on_transport_closed(event)
  if !@connection.nil? && !(@connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero?
    if !@reconnect.nil?
      event.transport.unbind
      delay = @reconnect.next
      if delay == 0
        self.connect(@connection)
      else
        event.reactor.schedule(delay, self)
      end
    else
      @connection = nil
    end
  end
end
on_transport_tail_closed(event) click to toggle source
# File lib/reactor/connector.rb, line 52
def on_transport_tail_closed(event)
  self.on_transport_closed(event)
end