Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

2.0.0 fix retries operations #315

Closed
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
source "https://rubygems.org"

group :test do
gem "popen4"
gem "rspec", "~> 2.14.1"
if ENV["CI"]
gem "coveralls", :require => false
Expand Down
6 changes: 6 additions & 0 deletions lib/moped/authenticatable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,11 @@ def logout(database)
end
credentials.delete(database)
end

def refresh_authentication
credentials.each do |database, (username, password)|
login(database, username, password)
end
end
end
end
4 changes: 4 additions & 0 deletions lib/moped/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ def refresh(nodes_to_refresh = seeds)
refreshed_nodes
end

def refresh_authentication(nodes_to_refresh = seeds)
nodes_to_refresh.each(&:refresh_authentication)
end

# Get the interval in which the node list should be refreshed.
#
# @example Get the refresh interval, in seconds.
Expand Down
10 changes: 7 additions & 3 deletions lib/moped/collection.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "moped/query"
require "moped/retryable"

module Moped

Expand All @@ -8,6 +9,7 @@ module Moped
# @since 1.0.0
class Collection
include Readable
include Retryable

# @!attribute database
# @return [ Database ] The database for the collection.
Expand Down Expand Up @@ -120,9 +122,11 @@ def initialize(database, name)
#
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
with_retry(cluster) do
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
22 changes: 17 additions & 5 deletions lib/moped/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,24 @@ def write(operations)
#
# @since 1.2.9
def read_data(socket, length)
data = socket.read(length)
unless data
raise Errors::ConnectionFailure.new(
"Attempted to read #{length} bytes from the socket but nothing was returned."
)
# Block on data to read for op_timeout seconds
begin
op_timeout = @options[:op_timeout] || timeout
ready = IO.select([socket], nil, [socket], op_timeout)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just deployed this to our staging environment and this part always raises Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") when you use an SSL connection.

It has to do with IO.select and the way Ruby's SSL implements buffers (https://bugs.ruby-lang.org/issues/8875)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @matsimitsu
can you do a small test for me?
replace the line

ready = IO.select([socket], nil, [socket], op_timeout)

by

ready = Kernel::select([socket], nil, [socket], op_timeout)

and see if works under ssl, please.
Kernel::select is the same call already in use in Connectable module, so it should fix the problem.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wandenberg
I just tested it on staging, but it raises the same error. I added some debugging, so the method looks like this:

    def read_data(socket, length)
      puts '-----'
       puts socket.inspect
      # Block on data to read for op_timeout seconds
      begin
        op_timeout = @options[:op_timeout] || timeout
        ready = Kernel::select([socket], nil, [socket], timeout)
        puts ready.inspect
        puts (ready == nil).inspect
        if ready == nil
          raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.")
        end
      rescue IOError => e
        raise Errors::ConnectionFailure
      end

      # Read data from socket
      begin
        data = socket.read(length)
      rescue SystemCallError, IOError => e
        raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
      end

      if data.length < length
        data << read_data(socket, length - data.length)
      end
      data
    end

This yields the following output:

-----
#<Moped::Connection::Socket::SSL:0x007f38646753d8 @port=27017, @host="xxx", @socket=#<TCPSocket:fd 8>, @io=#<TCPSocket:fd 8>, @context=#<OpenSSL::SSL::SSLContext:0x007f3864675388 @cert=#<OpenSSL::X509::Certificate subject=#<OpenSSL::X509::Name:0x007f3864674500>, issuer=#<OpenSSL::X509::Name:0x007f3864674488>, serial=#<OpenSSL::BN:0x007f3864674410>, not_before=2014-08-25 14:47:18 UTC, not_after=2020-05-21 14:47:18 UTC>, @key=#<OpenSSL::PKey::RSA:0x007f3864675180>, @client_ca=nil, @ca_file="/etc/ssl/certs/root_ca.crt", @ca_path=nil, @timeout=nil, @verify_mode=nil, @verify_depth=nil, @renegotiation_cb=nil, @verify_callback=nil, @options=nil, @cert_store=nil, @extra_chain_cert=nil, @client_cert_cb=nil, @tmp_dh_callback=nil, @session_id_context=nil, @session_get_cb=nil, @session_new_cb=nil, @session_remove_cb=nil, @servername_cb=nil, @npn_protocols=nil, @npn_select_cb=nil>, @sync_close=true, @hostname=nil, @eof=false, @rbuffer="", @sync=true, @callback_state=nil, @wbuffer="">
[[#<Moped::Connection::Socket::SSL:0x007f38646753d8 @port=27017, @host="xxx", @socket=#<TCPSocket:fd 8>, @io=#<TCPSocket:fd 8>, @context=#<OpenSSL::SSL::SSLContext:0x007f3864675388 @cert=#<OpenSSL::X509::Certificate subject=#<OpenSSL::X509::Name:0x007f386467b968>, issuer=#<OpenSSL::X509::Name:0x007f386467b8f0>, serial=#<OpenSSL::BN:0x007f386467b878>, not_before=2014-08-25 14:47:18 UTC, not_after=2020-05-21 14:47:18 UTC>, @key=#<OpenSSL::PKey::RSA:0x007f3864675180>, @client_ca=nil, @ca_file="/etc/ssl/certs/root_ca.crt", @ca_path=nil, @timeout=nil, @verify_mode=nil, @verify_depth=nil, @renegotiation_cb=nil, @verify_callback=nil, @options=nil, @cert_store=nil, @extra_chain_cert=nil, @client_cert_cb=nil, @tmp_dh_callback=nil, @session_id_context=nil, @session_get_cb=nil, @session_new_cb=nil, @session_remove_cb=nil, @servername_cb=nil, @npn_protocols=nil, @npn_select_cb=nil>, @sync_close=true, @hostname=nil, @eof=false, @rbuffer="", @sync=true, @callback_state=nil, @wbuffer="">], [], []]
false
-----
#<Moped::Connection::Socket::SSL:0x007f38646753d8 @port=27017, @host="xxx", @socket=#<TCPSocket:fd 8>, @io=#<TCPSocket:fd 8>, @context=#<OpenSSL::SSL::SSLContext:0x007f3864675388 @cert=#<OpenSSL::X509::Certificate subject=#<OpenSSL::X509::Name:0x007f386467ac70>, issuer=#<OpenSSL::X509::Name:0x007f386467abf8>, serial=#<OpenSSL::BN:0x007f386467ab80>, not_before=2014-08-25 14:47:18 UTC, not_after=2020-05-21 14:47:18 UTC>, @key=#<OpenSSL::PKey::RSA:0x007f3864675180>, @client_ca=nil, @ca_file="/etc/ssl/certs/root_ca.crt", @ca_path=nil, @timeout=nil, @verify_mode=nil, @verify_depth=nil, @renegotiation_cb=nil, @verify_callback=nil, @options=nil, @cert_store=nil, @extra_chain_cert=nil, @client_cert_cb=nil, @tmp_dh_callback=nil, @session_id_context=nil, @session_get_cb=nil, @session_new_cb=nil, @session_remove_cb=nil, @servername_cb=nil, @npn_protocols=nil, @npn_select_cb=nil>, @sync_close=true, @hostname=nil, @eof=false, @rbuffer="-\x00\x00\x00\x02nonce\x00\x11\x00\x00\x004fdcde36d3f5e32d\x00\x01ok\x00\x00\x00\x00\x00\x00\x00\xF0?\x00", @sync=true, @callback_state=nil, @wbuffer="">
nil
true

The Kernel::select returns nil, because data is in the buffer.
After checking the Connectable module i noticed that alive? returns true if Kernel::select returns nil and only checks for eof? if Kernel::select returns something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @matsimitsu,

I set up a mongod with SSL and was able to reproduce the problem.
Following the instructions on Kernel::select doc changed the code to the version bellow.
Can you test on your server before I commit it, please?

  def read_data(socket, length)
      # Block on data to read for op_timeout seconds
      time_left = op_timeout = @options[:op_timeout] || timeout
      begin
        raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0
        data = socket.read_nonblock(length)
      rescue IO::WaitReadable
        Kernel::select([socket], nil, [socket], 0.1)
        retry
      rescue IO::WaitWritable
        Kernel::select(nil, [socket], [socket], 0.1)
        retry
      rescue SystemCallError, IOError => e
        raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
      end

      if data.length < length
        data << read_data(socket, length - data.length)
      end
      data
    end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! This works!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

unless ready
raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.")
end
rescue IOError => e
raise Errors::ConnectionFailure
end

# Read data from socket
begin
data = socket.read(length)
rescue SystemCallError, IOError => e
raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
end

if data.length < length
data << read_data(socket, length - data.length)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class PoolTimeout < RuntimeError; end
# Generic error class for exceptions related to connection failures.
class ConnectionFailure < StandardError; end

# Generic error class for exceptions related to read timeout failures.
class OperationTimeout < StandardError; end

# Raised when a database name is invalid.
class InvalidDatabaseName < StandardError; end

Expand Down
6 changes: 6 additions & 0 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ def refresh
end
end

def refresh_authentication
connection do |conn|
conn.refresh_authentication
end
end

# Execute a remove command for the provided selector.
#
# @example Remove documents.
Expand Down
55 changes: 31 additions & 24 deletions lib/moped/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Moped
# people.find.count # => 1
class Query
include Enumerable
include Retryable

# @attribute [r] collection The collection to execute the query on.
# @attribute [r] operation The query operation.
Expand Down Expand Up @@ -321,14 +322,16 @@ def modify(change, options = {})
#
# @since 1.0.0
def remove
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
end
end
end

Expand All @@ -341,13 +344,15 @@ def remove
#
# @since 1.0.0
def remove_all
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
end
end
end

Expand Down Expand Up @@ -423,15 +428,17 @@ def tailable
#
# @since 1.0.0
def update(change, flags = nil)
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
with_retry(cluster) do
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
end
end
end

Expand Down
37 changes: 2 additions & 35 deletions lib/moped/read_preference/selectable.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# encoding: utf-8
require "moped/retryable"
module Moped
module ReadPreference

Expand All @@ -7,6 +8,7 @@ module ReadPreference
#
# @since 2.0.0
module Selectable
include Retryable

# @!attribute tags
# @return [ Array<Hash> ] The tag sets.
Expand Down Expand Up @@ -39,41 +41,6 @@ def query_options(options)
options[:flags] |= [ :slave_ok ]
options
end

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure => e
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
end
47 changes: 47 additions & 0 deletions lib/moped/retryable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# encoding: utf-8
module Moped
# Provides the shared behaviour for retry failed operations.
#
# @since 2.0.0
module Retryable

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
!(e.message.include?("not master") || e.message.include?("not authorized") || e.message.include?("unauthorized"))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run load tests against application backed by local replica-set. During those tests I was killing or steppingDown nodes multiple times. It all worked fine except one exception:

Sometimes when killing node it replies with error: 11600 interrupted at shutdown. I think in case of this error we should retry as well.

As a side note it would be nice to have error recognition in one place, but this probably isn't a concern for this pull-request.


if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
cluster.refresh_authentication if e.message.include?("not authorized") || e.message.include?("unauthorized")
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/moped/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ def logout
# @since 2.0.0
option(:timeout).allow(Optionable.any(Numeric))

# Setup validation of allowed timeout options. (Any numeric)
#
# @since 2.0.0
option(:op_timeout).allow(Optionable.any(Numeric))

# Pass an object that responds to instrument as an instrumenter.
#
# @since 2.0.0
Expand Down
Loading