Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Fix retries and failover #320

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
38 changes: 33 additions & 5 deletions lib/moped/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,8 @@ def retry_interval
# @since 1.0.0
def with_primary(&block)
if node = nodes.find(&:primary?)
begin
node.ensure_primary do
return yield(node)
end
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
node.ensure_primary do
return yield(node)
end
end
raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}"
Expand Down Expand Up @@ -275,6 +272,37 @@ def with_secondary(&block)
raise Errors::ConnectionFailure, "Could not connect to a secondary node for replica set #{inspect}"
end

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @example Execute with retry.
# cluster.with_retry do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(retries = max_retries, &block)
begin
block.call
rescue StandardError => e
raise e unless Failover::STRATEGIES[e.class] == Failover::Retry
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s). Exception: #{ e.class.name }, #{ e.message }", "n/a")
sleep(retry_interval)
refresh
with_retry(retries - 1, &block)
else
raise e
end
end
end

private

# Apply the credentials on all nodes
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ def initialize(database, name)
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
cluster.with_retry do
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
49 changes: 14 additions & 35 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class InvalidMongoURI < StandardError; end

# Raised when providing an invalid string from an object id.
class InvalidObjectId < StandardError

# Create the new error.
#
# @example Create the new error.
Expand Down Expand Up @@ -105,29 +104,22 @@ def error_message
end
end

# Classes of errors that should not disconnect connections.
class DoNotDisconnect < MongoError; end

# Classes of errors that could be caused by a replica set reconfiguration.
class PotentialReconfiguration < MongoError
# @api private
#
# Exception indicating that replica set was most likely reconfigured
class ReplicaSetReconfigured < MongoError; end

# Not master error codes.
NOT_MASTER = [ 13435, 13436, 10009 ]
# Exception raised when database responds with 'not master' error
class NotMaster < ReplicaSetReconfigured; end

# Error codes received around reconfiguration
CONNECTION_ERRORS_RECONFIGURATION = [ 15988, 10276, 11600, 9001, 13639, 10009 ]
# Exception raised when authentication fails.
class AuthenticationFailure < MongoError; end

# Replica set reconfigurations can be either in the form of an operation
# error with code 13435, or with an error message stating the server is
# not a master. (This encapsulates codes 10054, 10056, 10058)
def reconfiguring_replica_set?
err = details["err"] || details["errmsg"] || details["$err"] || ""
NOT_MASTER.include?(details["code"]) || err.include?("not master")
end
# Exception raised when authorization fails.
class AuthorizationFailure < MongoError; end

def connection_failure?
CONNECTION_ERRORS_RECONFIGURATION.include?(details["code"])
end
# Exception raised when operation fails
class OperationFailure < MongoError

# Is the error due to a namespace not being found?
#
Expand All @@ -154,29 +146,16 @@ def ns_not_exists?
end
end

# Exception raised when authentication fails.
class AuthenticationFailure < DoNotDisconnect; end

# Exception class for exceptions generated as a direct result of an
# operation, such as a failed insert or an invalid command.
class OperationFailure < PotentialReconfiguration; end

# Exception raised on invalid queries.
class QueryFailure < PotentialReconfiguration; end
class QueryFailure < MongoError; end

# Exception raised if the cursor could not be found.
class CursorNotFound < DoNotDisconnect
class CursorNotFound < MongoError
def initialize(operation, cursor_id)
super(operation, {"errmsg" => "cursor #{cursor_id} not found"})
end
end

# @api private
#
# Internal exception raised by Node#ensure_primary and captured by
# Cluster#with_primary.
class ReplicaSetReconfigured < DoNotDisconnect; end

# Tag applied to unhandled exceptions on a node.
module SocketError; end
end
Expand Down
8 changes: 5 additions & 3 deletions lib/moped/failover.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# encoding: utf-8
require "moped/failover/disconnect"
require "moped/failover/ignore"
require "moped/failover/reconfigure"
require "moped/failover/retry"

module Moped
Expand All @@ -18,10 +17,13 @@ module Failover
# @since 2.0.0
STRATEGIES = {
Errors::AuthenticationFailure => Ignore,
Errors::AuthorizationFailure => Retry,
Errors::ConnectionFailure => Retry,
Errors::CursorNotFound => Ignore,
Errors::OperationFailure => Reconfigure,
Errors::QueryFailure => Reconfigure
Errors::OperationFailure => Ignore,
Errors::QueryFailure => Ignore,
Errors::NotMaster => Retry,
Errors::ReplicaSetReconfigured => Retry,
}.freeze

# Get the appropriate failover handler given the provided exception.
Expand Down
36 changes: 0 additions & 36 deletions lib/moped/failover/reconfigure.rb

This file was deleted.

2 changes: 1 addition & 1 deletion lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Failover
module Retry
extend self

# Executes the failover strategy. In the case of retyr, we disconnect and
# Executes the failover strategy. In the case of retry, we disconnect and
# reconnect, then try the operation one more time.
#
# @example Execute the retry strategy.
Expand Down
26 changes: 13 additions & 13 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,16 @@ def down!
#
# @since 1.0.0
def ensure_connected(&block)
unless (conn = stack(:connection)).empty?
return yield(conn.first)
end

begin
connection do |conn|
stack(:connection) << conn
connect(conn) unless conn.connected?
conn.apply_credentials(@credentials)
yield(conn)
if (conn = stack(:connection)).length > 0
yield(conn.first)
else
connection do |conn|
stack(:connection) << conn
connect(conn) unless conn.connected?
conn.apply_credentials(@credentials)
yield(conn)
end
end
rescue Exception => e
Failover.get(e).execute(e, self, &block)
Expand Down Expand Up @@ -548,6 +548,7 @@ def configure(settings)
@arbiter = settings["arbiterOnly"]
@passive = settings["passive"]
@primary = settings["ismaster"]
@down_at = nil
@secondary = settings["secondary"]
discover(settings["hosts"]) if auto_discovering?
end
Expand Down Expand Up @@ -585,14 +586,13 @@ def discover(*nodes)
def flush(ops = queue)
operations, callbacks = ops.transpose
logging(operations) do
replies = nil
ensure_connected do |conn|
conn.write(operations)
replies = conn.receive_replies(operations)
replies.zip(callbacks).map do |reply, callback|
callback ? callback.call(reply) : reply
end.last
end
replies.zip(callbacks).map do |reply, callback|
callback ? callback[reply] : reply
end.last
end
ensure
ops.clear
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/protocol/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def failure?(reply)
#
# @since 2.0.0
def failure_exception(reply)
Errors::OperationFailure.new(self, reply.documents.first)
reply.failure_exception || Errors::OperationFailure.new(self, reply.documents.first)
end

# Instantiate the new command.
Expand Down
7 changes: 2 additions & 5 deletions lib/moped/protocol/get_more.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ def failure?(reply)
#
# @since 2.0.0
def failure_exception(reply)
if reply.cursor_not_found?
Errors::CursorNotFound.new(self, cursor_id)
else
Errors::QueryFailure.new(self, reply.documents.first)
end
return Errors::CursorNotFound.new(self, cursor_id) if reply.cursor_not_found?
reply.failure_exception || Errors::QueryFailure.new(self, reply.documents.first)
end

# Create a new GetMore command. The database and collection arguments
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/protocol/query.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def basic_selector
#
# @since 2.0.0
def failure_exception(reply)
Errors::QueryFailure.new(self, reply.documents.first)
reply.failure_exception || Errors::QueryFailure.new(self, reply.documents.first)
end

# Determine if the provided reply message is a failure with respect to a
Expand Down
45 changes: 43 additions & 2 deletions lib/moped/protocol/reply.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,24 @@ module Protocol
class Reply
include Message

# Unauthorized assertion errors.
UNAUTHORIZED = [ 10057, 16550 ]
# Error codes
UNAUTHORIZED = [
13, # not authorized for query on ...
10057,
16550, # not authorized for query on ...
16544, # not authorized for insert on ...
]
NOT_MASTER = [ 13435, 13436, 10009, 10054, 10056, 10058, 10107]

CONNECTION_ERRORS_RECONFIGURATION = [
9001,
10009,
10276, # DBClientBase::findN: transport error
11600, # interrupted at shutdown
13639, # can't connect to new replica set master
15988, # error querying server
]


# @attribute
# @return [Number] the length of the message
Expand Down Expand Up @@ -71,6 +87,31 @@ def command_failure?
(result["ok"] != 1.0 && result["ok"] != true) || error?
end

# Returns specific exception if it can be determined from error code or message returned by DB
def failure_exception
return Errors::AuthorizationFailure.new(self, documents.first) if unauthorized?
return Errors::NotMaster.new(self, documents.first) if not_master?
return Errors::ReplicaSetReconfigured.new(self, documents.first) if connection_failure?
end

# Error codes received around reconfiguration
def connection_failure?
result = documents[0]
return false if result.nil?
CONNECTION_ERRORS_RECONFIGURATION.include?(result["code"])
end

# Not master error codes.
# Replica set reconfigurations can be either in the form of an operation
# error falling into ReplicaSetReconfigured error or with an error message stating the server is
# not a master.
def not_master?
result = documents[0]
return false if result.nil?
err = error_message(result)
NOT_MASTER.include?(result["code"]) || err && err.include?("not master")
end

# Was the provided cursor id not found on the server?
#
# @example Is the cursor not on the server?
Expand Down
Loading