diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index def2619..ae971b6 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -165,7 +165,7 @@ def initialize(*args) @read_queue = Queue.new @pubacks = {} @read_thread = nil - @write_semaphore = Mutex.new + @socket_semaphore = Mutex.new @pubacks_semaphore = Mutex.new end @@ -227,7 +227,9 @@ def connect(clientid = nil) raise 'No MQTT server host set when attempting to connect' if @host.nil? - unless connected? + @socket_semaphore.synchronize do + break if socket_alive? + # Create network socket tcp_socket = TCPSocket.new(@host, @port) @@ -261,7 +263,7 @@ def connect(clientid = nil) ) # Send packet - send_packet(packet) + send_packet(packet, false) # Receive response receive_connack @@ -290,21 +292,16 @@ def disconnect(send_msg = true) @read_thread.kill if @read_thread && @read_thread.alive? @read_thread = nil - return unless connected? - - # Close the socket if it is open - if send_msg - packet = MQTT::Packet::Disconnect.new - send_packet(packet) + @socket_semaphore.synchronize do + close_socket(send_msg) end - @socket.close unless @socket.nil? - handle_close - @socket = nil end # Checks whether the client is connected to the server. def connected? - !@socket.nil? && !@socket.closed? + @socket_semaphore.synchronize do + socket_alive? + end end # Publish a message on a particular topic to the MQTT server. @@ -454,24 +451,28 @@ def unsubscribe(*topics) private + def socket_alive? + !@socket.nil? && !@socket.closed? + end + # Try to read a packet from the server # Also sends keep-alive ping packets. def receive_packet # Poll socket - is there data waiting? result = IO.select([@socket], [], [], SELECT_TIMEOUT) handle_timeouts - unless result.nil? - # Yes - read in the packet - packet = MQTT::Packet.read(@socket) - handle_packet packet + @socket_semaphore.synchronize do + unless result.nil? + # Yes - read in the packet + packet = MQTT::Packet.read(@socket) + handle_packet packet + end + keep_alive! end - keep_alive! # Pass exceptions up to parent thread rescue Exception => exp - unless @socket.nil? - @socket.close - @socket = nil - handle_close + @socket_semaphore.synchronize do + close_socket(false) end Thread.current[:parent].raise(exp) end @@ -509,6 +510,19 @@ def handle_close end end + def close_socket(send_msg = true) + return unless socket_alive? + + # Close the socket if it is open + if send_msg + packet = MQTT::Packet::Disconnect.new + send_packet(packet, false) + end + @socket.close unless @socket.nil? + handle_close + @socket = nil + end + if Process.const_defined? :CLOCK_MONOTONIC def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -521,12 +535,12 @@ def current_time end def keep_alive! - return unless @keep_alive > 0 && connected? + return unless @keep_alive > 0 && socket_alive? response_timeout = (@keep_alive * 1.5).ceil if current_time >= @last_ping_request + @keep_alive packet = MQTT::Packet::Pingreq.new - send_packet(packet) + send_packet(packet, false) @last_ping_request = current_time elsif current_time > @last_ping_response + response_timeout raise MQTT::ProtocolException, "No Ping Response received for #{response_timeout} seconds" @@ -556,12 +570,15 @@ def receive_connack end # Send a packet to server - def send_packet(data) - # Raise exception if we aren't connected - raise MQTT::NotConnectedException unless connected? - + def send_packet(data, with_lock = true) # Only allow one thread to write to socket at a time - @write_semaphore.synchronize do + if with_lock + @socket_semaphore.synchronize do + raise MQTT::NotConnectedException unless socket_alive? + @socket.write(data.to_s) + end + else + raise MQTT::NotConnectedException unless socket_alive? @socket.write(data.to_s) end end diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index 5ae92ab..5611ca2 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -285,7 +285,7 @@ def now end it "should not create a new TCP Socket if connected" do - allow(client).to receive(:connected?).and_return(true) + allow(client).to receive(:socket_alive?).and_return(true) expect(TCPSocket).to receive(:new).never client.connect('myclient') end @@ -558,19 +558,19 @@ def now end it "should not do anything if the socket is already disconnected" do - allow(client).to receive(:connected?).and_return(false) + allow(client).to receive(:socket_alive?).and_return(false) client.disconnect(true) expect(socket.string).to eq("") end it "should write a valid DISCONNECT packet to the socket if connected and the send_msg=true an" do - allow(client).to receive(:connected?).and_return(true) + allow(client).to receive(:socket_alive?).and_return(true) client.disconnect(true) expect(socket.string).to eq("\xE0\x00") end it "should not write anything to the socket if the send_msg=false" do - allow(client).to receive(:connected?).and_return(true) + allow(client).to receive(:socket_alive?).and_return(true) client.disconnect(false) expect(socket.string).to be_empty end