# File lib/reactor/connector.rb, line 28 def initialize(connection) @connection = connection @address = nil @heartbeat = nil @reconnect = nil @ssl_domain = nil end
# File lib/reactor/connector.rb, line 75 def connect(connection) url = @address.next connection.hostname = "#{url.host}:#{url.port}" transport = Qpid::Proton::Transport.new transport.bind(connection) if !@heartbeat.nil? transport.idle_timeout = @heartbeat elsif (url.scheme == "amqps") && !@ssl_domain.nil? @ssl = Qpid::Proton::SSL.new(transport, @ssl_domain) @ss.peer_hostname = url.host elsif !url.username.nil? sasl = transport.sasl if url.username == "anonymous" sasl.mechanisms("ANONYMOUS") else sasl.plain(url.username, url.password) end end end
# File lib/reactor/connector.rb, line 36 def on_connection_local_open(event) self.connect(event.connection) end
# File lib/reactor/connector.rb, line 71 def on_connection_remote_close(event) @connection = nil end
# File lib/reactor/connector.rb, line 40 def on_connection_remote_open(event) if !@reconnect.nil? @reconnect.reset @transport = nil end end
# File lib/reactor/connector.rb, line 67 def on_timer_task(event) self.connect(@connection) end
# File lib/reactor/connector.rb, line 51 def on_transport_closed(event) if !@connection.nil? && !(@connection.state & Qpid::Proton::Endpoint::LOCAL_ACTIVE).zero? if !@reconnect.nil? event.transport.unbind delay = @reconnect.next if delay == 0 self.connect(@connection) else event.reactor.schedule(delay, self) end else @connection = nil end end end
# File lib/reactor/connector.rb, line 47 def on_transport_tail_closed(event) self.on_transport_closed(event) end