Skip to content

Commit

Permalink
Avoid to close @socket of MQTT::Client unexpectedly
Browse files Browse the repository at this point in the history
Since it is locked only when writing, it may be closed by anothre thread
while reading or writing. In actual we often get the following error
when we are processing massive messages by fluent-plugin-mqtt-io:

  IOError,stream closed in another thread

This commit ensure to protect all atomic operations of the socket.

Signed-off-by: Takuro Ashie <[email protected]>
  • Loading branch information
ashie committed Mar 18, 2021
1 parent 5a406eb commit e9ddf6f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
75 changes: 46 additions & 29 deletions lib/mqtt/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def initialize(*args)
@read_queue = Queue.new
@pubacks = {}
@read_thread = nil
@write_semaphore = Mutex.new
@socket_semaphore = Mutex.new
@pubacks_semaphore = Mutex.new
end

Expand Down Expand Up @@ -227,7 +227,9 @@ def connect(clientid = nil)

raise 'No MQTT server host set when attempting to connect' if @host.nil?

unless connected?
@socket_semaphore.synchronize do
break if socket_alive?

# Create network socket
tcp_socket = TCPSocket.new(@host, @port)

Expand Down Expand Up @@ -261,7 +263,7 @@ def connect(clientid = nil)
)

# Send packet
send_packet(packet)
send_packet(packet, false)

# Receive response
receive_connack
Expand Down Expand Up @@ -290,21 +292,16 @@ def disconnect(send_msg = true)
@read_thread.kill if @read_thread && @read_thread.alive?
@read_thread = nil

return unless connected?

# Close the socket if it is open
if send_msg
packet = MQTT::Packet::Disconnect.new
send_packet(packet)
@socket_semaphore.synchronize do
close_socket(send_msg)
end
@socket.close unless @socket.nil?
handle_close
@socket = nil
end

# Checks whether the client is connected to the server.
def connected?
!@socket.nil? && !@socket.closed?
@socket_semaphore.synchronize do
socket_alive?
end
end

# Publish a message on a particular topic to the MQTT server.
Expand Down Expand Up @@ -454,24 +451,28 @@ def unsubscribe(*topics)

private

def socket_alive?
!@socket.nil? && !@socket.closed?
end

# Try to read a packet from the server
# Also sends keep-alive ping packets.
def receive_packet
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
handle_timeouts
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
handle_packet packet
@socket_semaphore.synchronize do
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
handle_packet packet
end
keep_alive!
end
keep_alive!
# Pass exceptions up to parent thread
rescue Exception => exp
unless @socket.nil?
@socket.close
@socket = nil
handle_close
@socket_semaphore.synchronize do
close_socket(false)
end
Thread.current[:parent].raise(exp)
end
Expand Down Expand Up @@ -509,6 +510,19 @@ def handle_close
end
end

def close_socket(send_msg = true)
return unless socket_alive?

# Close the socket if it is open
if send_msg
packet = MQTT::Packet::Disconnect.new
send_packet(packet, false)
end
@socket.close unless @socket.nil?
handle_close
@socket = nil
end

if Process.const_defined? :CLOCK_MONOTONIC
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
Expand All @@ -521,12 +535,12 @@ def current_time
end

def keep_alive!
return unless @keep_alive > 0 && connected?
return unless @keep_alive > 0 && socket_alive?

response_timeout = (@keep_alive * 1.5).ceil
if current_time >= @last_ping_request + @keep_alive
packet = MQTT::Packet::Pingreq.new
send_packet(packet)
send_packet(packet, false)
@last_ping_request = current_time
elsif current_time > @last_ping_response + response_timeout
raise MQTT::ProtocolException, "No Ping Response received for #{response_timeout} seconds"
Expand Down Expand Up @@ -556,12 +570,15 @@ def receive_connack
end

# Send a packet to server
def send_packet(data)
# Raise exception if we aren't connected
raise MQTT::NotConnectedException unless connected?

def send_packet(data, with_lock = true)
# Only allow one thread to write to socket at a time
@write_semaphore.synchronize do
if with_lock
@socket_semaphore.synchronize do
raise MQTT::NotConnectedException unless socket_alive?
@socket.write(data.to_s)
end
else
raise MQTT::NotConnectedException unless socket_alive?
@socket.write(data.to_s)
end
end
Expand Down
8 changes: 4 additions & 4 deletions spec/mqtt_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def now
end

it "should not create a new TCP Socket if connected" do
allow(client).to receive(:connected?).and_return(true)
allow(client).to receive(:socket_alive?).and_return(true)
expect(TCPSocket).to receive(:new).never
client.connect('myclient')
end
Expand Down Expand Up @@ -558,19 +558,19 @@ def now
end

it "should not do anything if the socket is already disconnected" do
allow(client).to receive(:connected?).and_return(false)
allow(client).to receive(:socket_alive?).and_return(false)
client.disconnect(true)
expect(socket.string).to eq("")
end

it "should write a valid DISCONNECT packet to the socket if connected and the send_msg=true an" do
allow(client).to receive(:connected?).and_return(true)
allow(client).to receive(:socket_alive?).and_return(true)
client.disconnect(true)
expect(socket.string).to eq("\xE0\x00")
end

it "should not write anything to the socket if the send_msg=false" do
allow(client).to receive(:connected?).and_return(true)
allow(client).to receive(:socket_alive?).and_return(true)
client.disconnect(false)
expect(socket.string).to be_empty
end
Expand Down

0 comments on commit e9ddf6f

Please sign in to comment.