class Stomp::Connection
Low level connection which maps commands and supports synchronous receives
Attributes
Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.
The CONNECTED frame from the broker.
Any disconnect RECEIPT frame if requested.
Heartbeat receive has been on time.
Heartbeat send has been successful.
The Stomp Protocol version.
A unique session ID, assigned by the broker.
Public Class Methods
::default_port returns the default port used by the gem for TCP or SSL.
# File lib/stomp/connection.rb, line 38 def self.default_port(ssl) ssl ? 61612 : 61613 end
A new Connection object can be initialized using two forms:
Hash (this is the recommended Connection initialization method):
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], :reliable => true, :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :connect_timeout => 0, :connect_headers => {}, :parse_timeout => 5, :logger => nil, :dmh => false, :closed_check => true, :hbser => false, :stompconn => false, :usecrlf => false, } e.g. c = Stomp::Connection.new(hash)
Positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) reconnect_delay (Integer, default : 5) e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)
# File lib/stomp/connection.rb, line 82 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) @protocol = Stomp::SPL_10 # Assumed at first @hb_received = true # Assumed at first @hb_sent = true # Assumed at first @hbs = @hbr = false # Sending/Receiving heartbeats. Assume no for now. if login.is_a?(Hash) hashed_initialize(login) else @host = host @port = port @login = login @passcode = passcode @reliable = reliable @reconnect_delay = reconnect_delay @connect_headers = connect_headers @ssl = false @parameters = nil @parse_timeout = 5 # To override, use hashed parameters @connect_timeout = 0 # To override, use hashed parameters @logger = nil # To override, use hashed parameters @autoflush = false # To override, use hashed parameters or setter @closed_check = true # Run closed check in each protocol method @hbser = false # Raise if heartbeat send exception @stompconn = false # If true, use STOMP rather than CONNECT @usecrlf = false # If true, use \r\n as line ends (1.2 only) warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\// end # Use Mutexes: only one lock per each thread. # Reverted to original implementation attempt using Mutex. @transmit_semaphore = Mutex.new @read_semaphore = Mutex.new @socket_semaphore = Mutex.new @subscriptions = {} @failure = nil @connection_attempts = 0 socket end
open is syntactic sugar for 'Connection.new', see 'initialize' for usage.
# File lib/stomp/connection.rb, line 145 def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers) end
Public Instance Methods
Abort aborts a transaction by name.
# File lib/stomp/connection.rb, line 231 def abort(name, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys headers[:transaction] = name _headerCheck(headers) transmit(Stomp::CMD_ABORT, headers) end
Acknowledge a message, used when a subscription has specified client acknowledgement i.e. connection.subscribe(“/queue/a”, :ack => 'client'). Accepts an optional transaction header ( :transaction => 'some_transaction_id' ) Behavior is protocol level dependent, see the specifications or comments below.
# File lib/stomp/connection.rb, line 172 def ack(message_id, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == "" headers = headers.symbolize_keys case @protocol when Stomp::SPL_12 # The ACK frame MUST include an id header matching the ack header # of the MESSAGE being acknowledged. headers[:id] = message_id when Stomp::SPL_11 # ACK has two REQUIRED headers: message-id, which MUST contain a value # matching the message-id for the MESSAGE being acknowledged and # subscription, which MUST be set to match the value of the subscription's # id header. headers[:'message-id'] = message_id raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription] else # Stomp::SPL_10 # ACK has one required header, message-id, which must contain a value # matching the message-id for the MESSAGE being acknowledged. headers[:'message-id'] = message_id end _headerCheck(headers) transmit(Stomp::CMD_ACK, headers) end
Begin starts a transaction, and requires a name for the transaction
# File lib/stomp/connection.rb, line 160 def begin(name, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys headers[:transaction] = name _headerCheck(headers) transmit(Stomp::CMD_BEGIN, headers) end
client_ack? determines if headers contain :ack => “client”.
# File lib/stomp/connection.rb, line 335 def client_ack?(message) headers = @subscriptions[message.headers[:destination]] !headers.nil? && headers[:ack] == "client" end
closed? tests if this connection is closed.
# File lib/stomp/connection.rb, line 155 def closed? @closed end
Commit commits a transaction by name.
# File lib/stomp/connection.rb, line 222 def commit(name, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys headers[:transaction] = name _headerCheck(headers) transmit(Stomp::CMD_COMMIT, headers) end
disconnect closes this connection. If requested, a disconnect RECEIPT will be received.
# File lib/stomp/connection.rb, line 342 def disconnect(headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys _headerCheck(headers) if @protocol >= Stomp::SPL_11 @st.kill if @st # Kill ticker thread if any @rt.kill if @rt # Kill ticker thread if any end transmit(Stomp::CMD_DISCONNECT, headers) @disconnect_receipt = receive if headers[:receipt] if @logger && @logger.respond_to?(:on_disconnect) @logger.on_disconnect(log_params) end close_socket end
#hashed_initialize prepares a new connection with a Hash of initialization parameters.
# File lib/stomp/connection.rb, line 126 def hashed_initialize(params) @parameters = refine_params(params) @reliable = @parameters[:reliable] @reconnect_delay = @parameters[:initial_reconnect_delay] @connect_headers = @parameters[:connect_headers] @parse_timeout = @parameters[:parse_timeout] @connect_timeout = @parameters[:connect_timeout] @logger = @parameters[:logger] @autoflush = @parameters[:autoflush] @closed_check = @parameters[:closed_check] @hbser = @parameters[:hbser] @stompconn = @parameters[:stompconn] @usecrlf = @parameters[:usecrlf] #sets the first host to connect change_host end
#hbrecv_count returns the current connection's heartbeat receive count.
# File lib/stomp/connection.rb, line 443 def hbrecv_count() return 0 unless @hbrecv_count @hbrecv_count end
#hbrecv_interval returns the connection's heartbeat receive interval.
# File lib/stomp/connection.rb, line 431 def hbrecv_interval() return 0 unless @hbrecv_interval @hbrecv_interval / 1000.0 # ms end
#hbsend_count returns the current connection's heartbeat send count.
# File lib/stomp/connection.rb, line 437 def hbsend_count() return 0 unless @hbsend_count @hbsend_count end
#hbsend_interval returns the connection's heartbeat send interval.
# File lib/stomp/connection.rb, line 425 def hbsend_interval() return 0 unless @hbsend_interval @hbsend_interval / 1000.0 # ms end
STOMP 1.1+ NACK.
# File lib/stomp/connection.rb, line 199 def nack(message_id, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10 raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == "" headers = headers.symbolize_keys case @protocol when Stomp::SPL_12 # The ACK frame MUST include an id header matching the ack header # of the MESSAGE being acknowledged. headers[:id] = message_id else # Stomp::SPL_11 only # ACK has two REQUIRED headers: message-id, which MUST contain a value # matching the message-id for the MESSAGE being acknowledged and # subscription, which MUST be set to match the value of the subscription's # id header. headers[:'message-id'] = message_id raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription] end _headerCheck(headers) transmit(Stomp::CMD_NACK, headers) end
open? tests if this connection is open.
# File lib/stomp/connection.rb, line 150 def open? !@closed end
poll returns a pending message if one is available, otherwise returns nil.
# File lib/stomp/connection.rb, line 360 def poll() raise Stomp::Error::NoCurrentConnection if @closed_check && closed? # No need for a read lock here. The receive method eventually fulfills # that requirement. return nil if @socket.nil? || !@socket.ready? receive() end
Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => 'some_transaction_id' ).
# File lib/stomp/connection.rb, line 284 def publish(destination, message, headers = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys headers[:destination] = destination _headerCheck(headers) if @logger && @logger.respond_to?(:on_publish) @logger.on_publish(log_params, message, headers) end transmit(Stomp::CMD_SEND, headers, message) end
receive returns the next Message off of the wire.
# File lib/stomp/connection.rb, line 369 def receive() raise Stomp::Error::NoCurrentConnection if @closed_check && closed? super_result = __old_receive if super_result.nil? && @reliable && !closed? errstr = "connection.receive returning EOF as nil - resetting connection.\n" if @logger && @logger.respond_to?(:on_miscerr) @logger.on_miscerr(log_params, "es_recv: " + errstr) else $stderr.print errstr end @socket = nil super_result = __old_receive end # if @logger && @logger.respond_to?(:on_receive) @logger.on_receive(log_params, super_result) end return super_result end
#set_logger selects a new callback logger instance.
# File lib/stomp/connection.rb, line 390 def set_logger(logger) @logger = logger end
sha1 returns a SHA1 digest for arbitrary string data.
# File lib/stomp/connection.rb, line 406 def sha1(data) Digest::SHA1.hexdigest(data) end
Subscribe subscribes to a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.
# File lib/stomp/connection.rb, line 241 def subscribe(name, headers = {}, subId = nil) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys headers[:destination] = name if @protocol >= Stomp::SPL_11 raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?) headers[:id] = subId if headers[:id].nil? end _headerCheck(headers) if @logger && @logger.respond_to?(:on_subscribe) @logger.on_subscribe(log_params, headers) end # Store the subscription so that we can replay if we reconnect. if @reliable subId = name if subId.nil? raise Stomp::Error::DuplicateSubscription if @subscriptions[subId] @subscriptions[subId] = headers end transmit(Stomp::CMD_SUBSCRIBE, headers) end
Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).
# File lib/stomp/connection.rb, line 299 def unreceive(message, options = {}) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options) # Lets make sure all keys are symbols message.headers = message.headers.symbolize_keys retry_count = message.headers[:retry_count].to_i || 0 message.headers[:retry_count] = retry_count + 1 transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}" message_id = message.headers.delete(:'message-id') begin self.begin transaction_id if client_ack?(message) || options[:force_client_ack] self.ack(message_id, :transaction => transaction_id) end if retry_count <= options[:max_redeliveries] self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id)) else # Poison ack, sending the message to the DLQ self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true)) end self.commit transaction_id rescue Exception => exception self.abort transaction_id raise exception end end
Unsubscribe from a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.
# File lib/stomp/connection.rb, line 266 def unsubscribe(dest, headers = {}, subId = nil) raise Stomp::Error::NoCurrentConnection if @closed_check && closed? headers = headers.symbolize_keys headers[:destination] = dest if @protocol >= Stomp::SPL_11 raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?) end _headerCheck(headers) transmit(Stomp::CMD_UNSUBSCRIBE, headers) if @reliable subId = dest if subId.nil? @subscriptions.delete(subId) end end
uuid returns a type 4 UUID.
# File lib/stomp/connection.rb, line 411 def uuid() b = [] 0.upto(15) do |i| b << rand(255) end b[6] = (b[6] & 0x0F) | 0x40 b[8] = (b[8] & 0xbf) | 0x80 # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x", b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]) rs end
valid_utf8? returns an indicator if the given string is a valid UTF8 string.
# File lib/stomp/connection.rb, line 395 def valid_utf8?(s) case RUBY_VERSION when /1\.8/ rv = _valid_utf8?(s) else rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding? end rv end
Private Instance Methods
#__old_receive receives a frame, blocks until the frame is received.
# File lib/connection/utils.rb, line 216 def __old_receive() # The receive may fail so we may need to retry. while TRUE begin used_socket = socket() return _receive(used_socket) rescue @failure = $! raise unless @reliable errstr = "receive failed: #{$!}" if @logger && @logger.respond_to?(:on_miscerr) @logger.on_miscerr(log_params, "es_oldrecv: " + errstr) else $stderr.print errstr end end end end
decode returns a Hash of decoded headers per the Stomp 1.1 specification.
# File lib/connection/utf8.rb, line 272 def _decodeHeaders(h) dh = {} h.each_pair do |k,v| # Keys here are NOT! symbolized if v.is_a?(Array) kdec = Stomp::HeaderCodec::decode(k) dh[kdec] = [] v.each do |e| dh[kdec] << Stomp::HeaderCodec::decode(e) end else vs = v.to_s dh[Stomp::HeaderCodec::decode(k)] = Stomp::HeaderCodec::decode(vs) end end dh end
encode returns a Hash of encoded headers per the Stomp 1.1 specification.
# File lib/connection/utf8.rb, line 252 def _encodeHeaders(h) eh = {} h.each_pair do |k,v| # Keys are symbolized ks = k.to_s if v.is_a?(Array) kenc = Stomp::HeaderCodec::encode(ks) eh[kenc] = [] v.each do |e| eh[kenc] << Stomp::HeaderCodec::encode(e) end else vs = v.to_s eh[Stomp::HeaderCodec::encode(ks)] = Stomp::HeaderCodec::encode(vs) end end eh end
Support multi-homed servers.
# File lib/connection/utils.rb, line 14 def _expand_hosts(hash) new_hash = hash.clone new_hash[:hosts_cloned] = hash[:hosts].clone new_hash[:hosts] = [] # hash[:hosts].each do |host_parms| ai = Socket.getaddrinfo(host_parms[:host], nil, nil, Socket::SOCK_STREAM) next if ai.nil? || ai.size == 0 info6 = ai.detect {|info| info[4] == Socket::AF_INET6} info4 = ai.detect {|info| info[4] == Socket::AF_INET} if info6 new_hostp = host_parms.clone new_hostp[:host] = info6[3] new_hash[:hosts] << new_hostp end if info4 new_hostp = host_parms.clone new_hostp[:host] = info4[3] new_hash[:hosts] << new_hostp end end return new_hash end
Stomp 1.1+ header check for UTF8 validity. Raises Stomp::Error::UTF8ValidationError if header data is not valid UTF8.
# File lib/connection/utf8.rb, line 228 def _headerCheck(h) return if @protocol == Stomp::SPL_10 # Do nothing for this environment # h.each_pair do |k,v| # Keys here are symbolized ks = k.to_s ks.force_encoding(Stomp::UTF8) if ks.respond_to?(:force_encoding) raise Stomp::Error::UTF8ValidationError unless valid_utf8?(ks) # if v.is_a?(Array) v.each do |e| e.force_encoding(Stomp::UTF8) if e.respond_to?(:force_encoding) raise Stomp::Error::UTF8ValidationError unless valid_utf8?(e) end else vs = v.to_s + "" # Values are usually Strings, but could be TrueClass or Symbol # The + "" above forces an 'unfreeze' if necessary vs.force_encoding(Stomp::UTF8) if vs.respond_to?(:force_encoding) raise Stomp::Error::UTF8ValidationError unless valid_utf8?(vs) end end end
# File lib/connection/heartbeats.rb, line 21 def _init_heartbeats() return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats. OK. # Init. # @cx = @cy = @sx = @sy = 0 # Variable names as in spec # @hbsend_interval = @hbrecv_interval = 0.0 # Send/Receive ticker interval. # @hbsend_count = @hbrecv_count = 0 # Send/Receive ticker counts. # @ls = @lr = -1.0 # Last send/receive time (from Time.now.to_f) # @st = @rt = nil # Send/receive ticker thread # Handle current client / server capabilities. # cfh = @connection_frame.headers.symbolize_keys return if cfh[:"heart-beat"] == "0,0" # Server does not want heartbeats # Conect header parts. parts = @connect_headers[:"heart-beat"].split(",") @cx = parts[0].to_i @cy = parts[1].to_i # Connected frame header parts. parts = cfh[:"heart-beat"].split(",") @sx = parts[0].to_i @sy = parts[1].to_i # Catch odd situations like server has used => heart-beat:000,00000 return if (@cx == 0 && @cy == 0) || (@sx == 0 && @sy == 0) # See if we are doing anything at all. @hbs = @hbr = true # Sending/Receiving heartbeats. Assume yes at first. # Check if sending is possible. @hbs = false if @cx == 0 || @sy == 0 # Reset if neither side wants # Check if receiving is possible. @hbr = false if @sx == 0 || @cy == 0 # Reset if neither side wants # Check if we should not do heartbeats at all return if (!@hbs && !@hbr) # If sending if @hbs sm = @cx >= @sy ? @cx : @sy # ticker interval, ms @hbsend_interval = 1000.0 * sm # ticker interval, μs @ls = Time.now.to_f # best guess at start _start_send_ticker() end # If receiving if @hbr rm = @sx >= @cy ? @sx : @cy # ticker interval, ms @hbrecv_interval = 1000.0 * rm # ticker interval, μs @lr = Time.now.to_f # best guess at start _start_receive_ticker() end end
Normalize line ends because 1.2+ brokers can send 'mixed mode' headers, i.e.:
-
Some headers end with 'n'
-
Other headers end with 'rn'
# File lib/connection/netio.rb, line 97 def _normalize_line_end(line) return line unless @usecrlf # p [ "nleln", line ] line_len = line.respond_to?(:bytesize) ? line.bytesize : line.length last2 = line[line_len-2...line_len] # p [ "nlel2", last2 ] return line unless last2 == "\r\n" return line[0...line_len-2] + "\n" end
#_post_connect handles low level logic just after a physical connect.
# File lib/connection/utils.rb, line 84 def _post_connect() return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0 if @connection_frame.command == Stomp::CMD_ERROR @connection_frame.headers = _decodeHeaders(@connection_frame.headers) return end # We are CONNECTed cfh = @connection_frame.headers.symbolize_keys @protocol = cfh[:version] if @protocol # Should not happen, but check anyway raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol) else # CONNECTed to a 1.0 server that does not return *any* 1.1 type headers @protocol = Stomp::SPL_10 # reset return end # Heartbeats return unless @connect_headers[:"heart-beat"] _init_heartbeats() end
#_pre_connect handles low level logic just prior to a physical connect.
# File lib/connection/utils.rb, line 61 def _pre_connect() @connect_headers = @connect_headers.symbolize_keys raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host]) raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host]) return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0 # Try 1.1 or greater @hhas10 = false okvers = [] avers = @connect_headers[:"accept-version"].split(",") avers.each do |nver| if Stomp::SUPPORTED.index(nver) okvers << nver @hhas10 = true if nver == Stomp::SPL_10 end end raise Stomp::Error::UnsupportedProtocolError if okvers == [] @connect_headers[:"accept-version"] = okvers.join(",") # This goes to server # Heartbeats - pre connect return unless @connect_headers[:"heart-beat"] _validate_hbheader() end
Really read from the wire.
# File lib/connection/netio.rb, line 14 def _receive(read_socket) @read_semaphore.synchronize do line = '' if @protocol == Stomp::SPL_10 || (@protocol >= Stomp::SPL_11 && !@hbr) line = read_socket.gets # The old way else # We are >= 1.1 *AND* receiving heartbeats. while true line = read_socket.gets # Data from wire break unless line == "\n" line = '' @lr = Time.now.to_f end end return nil if line.nil? # p [ "wiredatain_01", line ] line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12 # If the reading hangs for more than X seconds, abort the parsing process. # X defaults to 5. Override allowed in connection hash parameters. Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do # Reads the beginning of the message until it runs into a empty line message_header = '' begin message_header += line line = read_socket.gets # p [ "wiredatain_02", line ] raise Stomp::Error::StompServerError if line.nil? line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12 end until line =~ /^\s?\n$/ # Checks if it includes content_length header content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/ message_body = '' # If content_length is present, read the specified amount of bytes if content_length message_body = read_socket.read content_length[1].to_i raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0" # Else read the rest of the message until the first \0 else message_body = read_socket.readline("\0") message_body.chop! end # If the buffer isn't empty, reads trailing new lines. # # Note: experiments with JRuby seem to show that .ready? never # returns true. This means that this code to drain trailing new # lines never runs using JRuby. # # Note 2: the draining of new lines must be done _after_ a message # is read. Do _not_ leave them on the wire and attempt to drain them # at the start of the next read. Attempting to do that breaks the # asynchronous nature of the 'poll' method. while read_socket.ready? last_char = read_socket.getc break unless last_char if parse_char(last_char) != "\n" read_socket.ungetc(last_char) break end end # And so, a JRuby hack. Remove any new lines at the start of the # next buffer. message_header.gsub!(/^\n?/, "") if @protocol >= Stomp::SPL_11 @lr = Time.now.to_f if @hbr end # Adds the excluded \n and \0 and tries to create a new message with it msg = Message.new(message_header + "\n" + message_body + "\0", @protocol >= Stomp::SPL_11) # if @protocol >= Stomp::SPL_11 && msg.command != Stomp::CMD_CONNECTED msg.headers = _decodeHeaders(msg.headers) end msg end end end
#_start_receive_ticker starts a thread that receives heartbeats when required.
# File lib/connection/heartbeats.rb, line 129 def _start_receive_ticker() sleeptime = @hbrecv_interval / 1000000.0 # Sleep time secs @rt = Thread.new { while true do sleep sleeptime curt = Time.now.to_f if @logger && @logger.respond_to?(:on_hbfire) @logger.on_hbfire(log_params, "receive_fire", curt) end delta = curt - @lr if delta > ((@hbrecv_interval + (@hbrecv_interval/5.0)) / 1000000.0) # Be tolerant (plus) if @logger && @logger.respond_to?(:on_hbfire) @logger.on_hbfire(log_params, "receive_heartbeat", curt) end # Client code could be off doing something else (that is, no reading of # the socket has been requested by the caller). Try to handle that case. lock = @read_semaphore.try_lock if lock last_char = @socket.getc plc = parse_char(last_char) if plc == "\n" # Server Heartbeat @lr = Time.now.to_f else @socket.ungetc(last_char) end @read_semaphore.unlock @hbrecv_count += 1 else # Shrug. Have not received one. Just set warning flag. @hb_received = false if @logger && @logger.respond_to?(:on_hbread_fail) @logger.on_hbread_fail(log_params, {"ticker_interval" => @hbrecv_interval}) end end else @hb_received = true # Reset if necessary end Thread.pass end } end
#_start_send_ticker starts a thread to send heartbeats when required.
# File lib/connection/heartbeats.rb, line 90 def _start_send_ticker() sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs @st = Thread.new { while true do sleep sleeptime curt = Time.now.to_f if @logger && @logger.respond_to?(:on_hbfire) @logger.on_hbfire(log_params, "send_fire", curt) end delta = curt - @ls if delta > (@hbsend_interval - (@hbsend_interval/5.0)) / 1000000.0 # Be tolerant (minus) if @logger && @logger.respond_to?(:on_hbfire) @logger.on_hbfire(log_params, "send_heartbeat", curt) end # Send a heartbeat @transmit_semaphore.synchronize do begin @socket.puts @ls = curt # Update last send @hb_sent = true # Reset if necessary @hbsend_count += 1 rescue Exception => sendex @hb_sent = false # Set the warning flag if @logger && @logger.respond_to?(:on_hbwrite_fail) @logger.on_hbwrite_fail(log_params, {"ticker_interval" => @hbsend_interval, "exception" => sendex}) end if @hbser raise # Re-raise if user requested this, otherwise ignore end end end end Thread.pass end } end
_transmit is the real wire write logic.
# File lib/connection/netio.rb, line 131 def _transmit(used_socket, command, headers = {}, body = '') if @protocol >= Stomp::SPL_11 && command != Stomp::CMD_CONNECT headers = _encodeHeaders(headers) end @transmit_semaphore.synchronize do # Handle nil body body = '' if body.nil? # The content-length should be expressed in bytes. # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters # With Unicode strings, # of bytes != # of characters. So, use String#bytesize when available. body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length # ActiveMQ interprets every message as a BinaryMessage # if content_length header is included. # Using :suppress_content_length => true will suppress this behaviour # and ActiveMQ will interpret the message as a TextMessage. # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/ # Lets send this header in the message, so it can maintain state when using unreceive headers[:'content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length] headers[:'content-type'] = "text/plain; charset=UTF-8" unless headers[:'content-type'] _wire_write(used_socket,command) headers.each do |k,v| if v.is_a?(Array) v.each do |e| _wire_write(used_socket,"#{k}:#{e}") end else _wire_write(used_socket,"#{k}:#{v}") end end _wire_write(used_socket,"") used_socket.write body used_socket.write "\0" used_socket.flush if autoflush if @protocol >= Stomp::SPL_11 @ls = Time.now.to_f if @hbs end end end
Ref: unicode.org/mail-arch/unicode-ml/y2003-m02/att-0467/01-The_Algorithm_to_Valide_an_UTF-8_String
CONSIDER replacing this with a dependency on the utf8_validator gem. This code has been copied from there.
# File lib/connection/utf8.rb, line 19 def _valid_utf8?(string) case RUBY_VERSION when /1\.8\.[56]/ bytes = [] 0.upto(string.length-1) {|i| bytes << string[i] } else bytes = string.bytes end # valid = true index = -1 nb_hex = nil ni_hex = nil state = "start" next_byte_save = nil # bytes.each do |next_byte| index += 1 next_byte_save = next_byte ni_hex = sprintf "%x", index nb_hex = sprintf "%x", next_byte # puts "Top: #{next_byte}(0x#{nb_hex}), index: #{index}(0x#{ni_hex})" if DEBUG case state # State: 'start' # The 'start' state: # * handles all occurrences of valid single byte characters i.e., the ASCII character set # * provides state transition logic for start bytes of valid characters with 2-4 bytes # * signals a validation failure for all other single bytes # when "start" # puts "state: start" if DEBUG case next_byte # ASCII # * Input = 0x00-0x7F : change state to START when (0x00..0x7f) # puts "state: start 1" if DEBUG state = "start" # Start byte of two byte characters # * Input = 0xC2-0xDF: change state to A when (0xc2..0xdf) # puts "state: start 2" if DEBUG state = "a" # Start byte of some three byte characters # * Input = 0xE1-0xEC, 0xEE-0xEF: change state to B when (0xe1..0xec) # puts "state: start 3" if DEBUG state = "b" when (0xee..0xef) # puts "state: start 4" if DEBUG state = "b" # Start byte of special three byte characters # * Input = 0xE0: change state to C when 0xe0 # puts "state: start 5" if DEBUG state = "c" # Start byte of the remaining three byte characters # * Input = 0xED: change state to D when 0xed # puts "state: start 6" if DEBUG state = "d" # Start byte of some four byte characters # * Input = 0xF1-0xF3:change state to E when (0xf1..0xf3) # puts "state: start 7" if DEBUG state = "e" # Start byte of special four byte characters # * Input = 0xF0: change state to F when 0xf0 # puts "state: start 8" if DEBUG state = "f" # Start byte of very special four byte characters # * Input = 0xF4: change state to G when 0xf4 # puts "state: start 9" if DEBUG state = "g" # All other single characters are invalid # * Input = Others (0x80-0xBF,0xC0-0xC1, 0xF5-0xFF): ERROR else valid = false break end # of the inner case, the 'start' state # The last continuation byte of a 2, 3, or 4 byte character # State: 'a' # o Input = 0x80-0xBF: change state to START # o Others: ERROR when "a" # puts "state: a" if DEBUG if (0x80..0xbf) === next_byte state = "start" else valid = false break end # The first continuation byte for most 3 byte characters # (those with start bytes in: 0xe1-0xec or 0xee-0xef) # State: 'b' # o Input = 0x80-0xBF: change state to A # o Others: ERROR when "b" # puts "state: b" if DEBUG if (0x80..0xbf) === next_byte state = "a" else valid = false break end # The first continuation byte for some special 3 byte characters # (those with start byte 0xe0) # State: 'c' # o Input = 0xA0-0xBF: change state to A # o Others: ERROR when "c" # puts "state: c" if DEBUG if (0xa0..0xbf) === next_byte state = "a" else valid = false break end # The first continuation byte for the remaining 3 byte characters # (those with start byte 0xed) # State: 'd' # o Input = 0x80-0x9F: change state to A # o Others: ERROR when "d" # puts "state: d" if DEBUG if (0x80..0x9f) === next_byte state = "a" else valid = false break end # The first continuation byte for some 4 byte characters # (those with start bytes in: 0xf1-0xf3) # State: 'e' # o Input = 0x80-0xBF: change state to B # o Others: ERROR when "e" # puts "state: e" if DEBUG if (0x80..0xbf) === next_byte state = "b" else valid = false break end # The first continuation byte for some special 4 byte characters # (those with start byte 0xf0) # State: 'f' # o Input = 0x90-0xBF: change state to B # o Others: ERROR when "f" # puts "state: f" if DEBUG if (0x90..0xbf) === next_byte state = "b" else valid = false break end # The first continuation byte for the remaining 4 byte characters # (those with start byte 0xf4) # State: 'g' # o Input = 0x80-0x8F: change state to B # o Others: ERROR when "g" # puts "state: g" if DEBUG if (0x80..0x8f) === next_byte state = "b" else valid = false break end # else raise RuntimeError, "state: default" end end # # puts "State at end: #{state}" if DEBUG # Catch truncation at end of string if valid and state != 'start' # puts "Resetting valid value" if DEBUG valid = false end # valid end
# File lib/connection/heartbeats.rb, line 13 def _validate_hbheader() return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats. OK. parts = @connect_headers[:"heart-beat"].split(",") if (parts.size != 2) || (parts[0] != parts[0].to_i.to_s) || (parts[1] != parts[1].to_i.to_s) raise Stomp::Error::InvalidHeartBeatHeaderError end end
Use CRLF if protocol is >= 1.2, and the client requested CRLF
# File lib/connection/netio.rb, line 174 def _wire_write(sock, data) # p [ "debug_01", @protocol, @usecrlf ] if @protocol >= Stomp::SPL_12 && @usecrlf wiredata = "#{data}#{Stomp::CR}#{Stomp::LF}" # p [ "wiredataout_01:", wiredata ] sock.write(wiredata) else # p [ "wiredataout_02:", "#{data}\n" ] sock.puts data end end
#change_host selects the next host for retires.
# File lib/connection/utils.rb, line 184 def change_host @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize] # Set first as master and send it to the end of array current_host = @parameters[:hosts].shift @parameters[:hosts] << current_host @ssl = current_host[:ssl] @host = current_host[:host] @port = current_host[:port] || Connection::default_port(@ssl) @login = current_host[:login] || "" @passcode = current_host[:passcode] || "" end
#close_socket closes the current open socket, and hence the connection.
# File lib/connection/netio.rb, line 307 def close_socket() begin # Need to set @closed = true before closing the socket # within the @read_semaphore thread @closed = true @read_semaphore.synchronize do @socket.close end rescue #Ignoring if already closed end @closed end
connect performs a basic STOMP CONNECT operation.
# File lib/connection/netio.rb, line 337 def connect(used_socket) @connect_headers = {} unless @connect_headers # Caller said nil/false headers = @connect_headers.clone headers[:login] = @login headers[:passcode] = @passcode _pre_connect if !@hhas10 && @stompconn _transmit(used_socket, Stomp::CMD_STOMP, headers) else _transmit(used_socket, Stomp::CMD_CONNECT, headers) end @connection_frame = _receive(used_socket) _post_connect @disconnect_receipt = nil @session = @connection_frame.headers["session"] if @connection_frame # replay any subscriptions. @subscriptions.each { |k,v| _transmit(used_socket, Stomp::CMD_SUBSCRIBE, v) } end
#increase_reconnect_delay increases the reconnect delay for the next connection attempt.
# File lib/connection/utils.rb, line 207 def increase_reconnect_delay @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay] @reconnect_delay end
Create parameters for any callback logger.
# File lib/connection/utils.rb, line 44 def log_params() lparms = @parameters.clone if @parameters lparms = {} unless lparms lparms[:cur_host] = @host lparms[:cur_port] = @port lparms[:cur_login] = @login lparms[:cur_passcode] = @passcode lparms[:cur_ssl] = @ssl lparms[:cur_recondelay] = @reconnect_delay lparms[:cur_parseto] = @parse_timeout lparms[:cur_conattempts] = @connection_attempts lparms[:openstat] = open? # lparms end
max_reconnect_attempts? returns nil or the number of maximum reconnect attempts.
# File lib/connection/utils.rb, line 201 def max_reconnect_attempts? !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts] end
#open_socket opens a TCP or SSL soclet as required.
# File lib/connection/netio.rb, line 322 def open_socket() used_socket = @ssl ? open_ssl_socket : open_tcp_socket # try to close the old connection if any close_socket @closed = false if @parameters # nil in some rspec tests @reconnect_delay = @parameters[:initial_reconnect_delay] ? @parameters[:initial_reconnect_delay] : 0.01 end # Use keepalive used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true) used_socket end
#open_ssl_socket opens an SSL socket.
# File lib/connection/netio.rb, line 202 def open_ssl_socket() require 'openssl' unless defined?(OpenSSL) begin # Any raised SSL exceptions ctx = OpenSSL::SSL::SSLContext.new ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE # Assume for now # # Note: if a client uses :ssl => true this results in the gem using # the _default_ Ruby ciphers list. This is _known_ to fail in later # Ruby releases. The gem provides a default cipher list that may # function in these cases. To use this connect with: # * :ssl => Stomp::SSLParams.new # * :ssl => Stomp::SSLParams.new(..., :ciphers => Stomp::DEFAULT_CIPHERS) # # If connecting with an SSLParams instance, and the _default_ Ruby # ciphers list is required, use: # * :ssl => Stomp::SSLParams.new(..., :use_ruby_ciphers => true) # # If a custom ciphers list is required, connect with: # * :ssl => Stomp::SSLParams.new(..., :ciphers => custom_ciphers_list) # if @ssl != true # # Here @ssl is: # * an instance of Stomp::SSLParams # Control would not be here if @ssl == false or @ssl.nil?. # # Back reference the SSLContext @ssl.ctx = ctx # Server authentication parameters if required if @ssl.ts_files ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER truststores = OpenSSL::X509::Store.new fl = @ssl.ts_files.split(",") fl.each do |fn| # Add next cert file listed raise Stomp::Error::SSLNoTruststoreFileError if !File::exists?(fn) raise Stomp::Error::SSLUnreadableTruststoreFileError if !File::readable?(fn) truststores.add_file(fn) end ctx.cert_store = truststores end # Client authentication parameters. # Both cert file and key file must be present or not, it can not be a mix. raise Stomp::Error::SSLClientParamsError if @ssl.cert_file.nil? && !@ssl.key_file.nil? raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? && @ssl.key_file.nil? if @ssl.cert_file # Any check will do here raise Stomp::Error::SSLNoCertFileError if !File::exists?(@ssl.cert_file) raise Stomp::Error::SSLUnreadableCertFileError if !File::readable?(@ssl.cert_file) ctx.cert = OpenSSL::X509::Certificate.new(File.read(@ssl.cert_file)) raise Stomp::Error::SSLNoKeyFileError if !File::exists?(@ssl.key_file) raise Stomp::Error::SSLUnreadableKeyFileError if !File::readable?(@ssl.key_file) ctx.key = OpenSSL::PKey::RSA.new(File.read(@ssl.key_file), @ssl.key_password) end # Cipher list if !@ssl.use_ruby_ciphers # No Ruby ciphers (the default) if @ssl.ciphers # User ciphers list? ctx.ciphers = @ssl.ciphers # Accept user supplied ciphers else ctx.ciphers = Stomp::DEFAULT_CIPHERS # Just use Stomp defaults end end end # ssl = nil if @logger && @logger.respond_to?(:on_ssl_connecting) @logger.on_ssl_connecting(log_params) end Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx) ssl.connect end def ssl.ready? ! @rbuffer.empty? || @io.ready? end if @ssl != true # Pass back results if possible if RUBY_VERSION =~ /1\.8\.[56]/ @ssl.verify_result = "N/A for Ruby #{RUBY_VERSION}" else @ssl.verify_result = ssl.verify_result end @ssl.peer_cert = ssl.peer_cert end if @logger && @logger.respond_to?(:on_ssl_connected) @logger.on_ssl_connected(log_params) end ssl rescue Exception => ex if @logger && @logger.respond_to?(:on_ssl_connectfail) lp = log_params.clone lp[:ssl_exception] = ex @logger.on_ssl_connectfail(lp) end # raise # Reraise end end
#open_tcp_socket opens a TCP socket.
# File lib/connection/netio.rb, line 187 def open_tcp_socket() tcp_socket = nil if @logger && @logger.respond_to?(:on_connecting) @logger.on_connecting(log_params) end Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do tcp_socket = TCPSocket.open(@host, @port) end tcp_socket end
Handle 1.9+ character representation.
# File lib/connection/utils.rb, line 39 def parse_char(char) RUBY_VERSION > '1.9' ? char : char.chr end
#refine_params sets up defaults for a Hash initialize.
# File lib/connection/utils.rb, line 154 def refine_params(params) params = params.uncamelize_and_symbolize_keys default_params = { :connect_headers => {}, :reliable => true, # Failover parameters :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :connect_timeout => 0, # Parse Timeout :parse_timeout => 5, :dmh => false, # Closed check logic :closed_check => true, :hbser => false, :stompconn => false, } res_params = default_params.merge(params) if res_params[:dmh] res_params = _expand_hosts(res_params) end return res_params end
socket creates and returns a new socket for use by the connection.
# File lib/connection/utils.rb, line 106 def socket() @socket_semaphore.synchronize do used_socket = @socket used_socket = nil if closed? while used_socket.nil? || !@failure.nil? @failure = nil begin used_socket = open_socket() # sets @closed = false if OK # Open is complete connect(used_socket) if @logger && @logger.respond_to?(:on_connected) @logger.on_connected(log_params) end @connection_attempts = 0 rescue @failure = $! used_socket = nil raise unless @reliable raise if @failure.is_a?(Stomp::Error::LoggerConnectionError) @closed = true if @logger && @logger.respond_to?(:on_connectfail) # on_connectfail may raise begin @logger.on_connectfail(log_params) rescue Exception => aex raise if aex.is_a?(Stomp::Error::LoggerConnectionError) end else $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n" end raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts? sleep(@reconnect_delay) @connection_attempts += 1 if @parameters change_host() increase_reconnect_delay() end end end @socket = used_socket end end
transmit logically puts a Message on the wire.
# File lib/connection/netio.rb, line 108 def transmit(command, headers = {}, body = '') # The transmit may fail so we may need to retry. while TRUE begin used_socket = socket() _transmit(used_socket, command, headers, body) return rescue Stomp::Error::MaxReconnectAttempts => e raise rescue @failure = $! raise unless @reliable errstr = "transmit to #{@host} failed: #{$!}\n" if @logger && @logger.respond_to?(:on_miscerr) @logger.on_miscerr(log_params, "es_trans: " + errstr) else $stderr.print errstr end end end end