Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Message for pool saturated, shutdown pool instead of disconnect, logging around failover, etc #338

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions lib/moped/connection/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module Manager
# Used for synchronization of pools access.
MUTEX = Mutex.new

# Used for synchronization of pool shutdown.
SHUTDOWN_MUTEX = Mutex.new
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate Mutex for shutdown so we don't block other threads trying to get/create a new one while this thread shuts down.


# The default max size for the connection pool.
POOL_SIZE = 5

Expand All @@ -35,6 +38,41 @@ def pool(node)
end
end

# Shut down a pool for a node while immediately clearing
# the cached pool so a new one can be created by another
# thread.
#
# @example Shut down a pool for a node
# Manager.shutdown_pool(node, pool)
#
# @param [ Node ] The node.
# @param [ ConnectionPool ] The current pool to shutdown.
#
# @return [ Boolean ] true.
#
# @since 2.0.3
def shutdown_pool(node, pool)
pool_id = "#{node.address.resolved}-#{pool.object_id}"

SHUTDOWN_MUTEX.synchronize do
return if !!shutting_down[pool_id]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since other nodes in other threads might be trying to shutdown a pool at the same time, let's make sure that only one thread is shutting trying to shutdown the same pool (and do this part of code quickly so only this thread is blocking on the main shutdown block, which, based on ConnectionPool logic, will wait for all connections to get checked back in to disconnect them.

Moped.logger.debug("MOPED: Shutting down connection pool:#{pool.object_id} for node:#{node.inspect}")
shutting_down[pool_id] = true
MUTEX.synchronize do
pools[node.address.resolved] = nil
end
end

begin
if pool
pool.shutdown {|conn| conn.disconnect }
end
ensure
shutting_down[pool_id] = false
end
true
end

private

# Create a new connection pool for the provided node.
Expand All @@ -50,6 +88,8 @@ def pool(node)
#
# @since 2.0.0
def create_pool(node)
Moped.logger.debug("MOPED: Creating new connection pool for #{node.inspect}")

ConnectionPool.new(
size: node.options[:pool_size] || POOL_SIZE,
timeout: node.options[:pool_timeout] || TIMEOUT
Expand Down Expand Up @@ -77,6 +117,22 @@ def create_pool(node)
def pools
@pools ||= {}
end

# Used for tracking whether the current pool is already being shutdown
# by another thread.
#
# @api private
#
# @example Determine if a pool is already being shutdown.
# Manager.shutting_down
#
# @return [ Hash ] The state of a pool shutting down.
#
# @since 2.0.3
def shutting_down
@shutting_down ||= {}
end

end
end
end
4 changes: 3 additions & 1 deletion lib/moped/failover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ module Failover
#
# @since 2.0.0
def get(exception)
STRATEGIES.fetch(exception.class, Disconnect)
strategy=STRATEGIES.fetch(exception.class, Disconnect)
Moped.logger.warn("MOPED: Failover strategy for exception #{exception.class} executing #{strategy}")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's log any failover event, so we know what's going on, and see how it's trying to handle it.

strategy
end
end
end
47 changes: 30 additions & 17 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,27 @@ def connected?
# @example Get the node's connection.
# node.connection
#
# @rescue [ Timeout::Error ] When there are no available connections in the queue
#
# @raise [ PoolSaturated ] Raise the underlying Timeout::Error as PoolSaturated
# with a message to configure pool_size.
#
# @return [ Connection ] The connection.
#
# @since 2.0.0
def connection
pool.with do |conn|
yield(conn)
begin
pool.with do |conn|
yield(conn)
end
rescue Timeout::Error
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescue the Timeout::Error here so we don't have to worry about it everywhere else.

raise Errors::PoolSaturated, "#{$!}. Try increasing pool_size or pool_timeout."

# Could possibly rescue this here as it is one of 2 exceptions that connection_pool
# will raise, but we already have retry code other places that will wait and hopefully
# get a fresh pool if the current one is being shutdown?
# rescue ConnectionPool::PoolShuttingDownError

end
end

Expand All @@ -128,7 +143,10 @@ def down?
@down_at
end

# Force the node to disconnect from the server.
# Force the node to disconnect from the server by shutting
# down the pool (and disconnecting all connections as they
# are available/checked back in). A new pool is immediately
# made available to other threads while this one shuts down.
#
# @example Disconnect the node.
# node.disconnect
Expand All @@ -137,7 +155,7 @@ def down?
#
# @since 1.2.0
def disconnect
connection{ |conn| conn.disconnect } if address.resolved
Connection::Manager.shutdown_pool(self,pool)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to go straight to the manager to shut it down, sending along the current pool otherwise it's too hard to track across threads/nodes/shutdown events.

true
end

Expand Down Expand Up @@ -185,7 +203,6 @@ def ensure_connected(&block)
ensure
end_execution(:connection)
end

end

# Set a flag on the node for the duration of provided block so that an
Expand Down Expand Up @@ -427,17 +444,13 @@ def query(database, collection, selector, options = {})
# @since 1.0.0
def refresh
if address.resolve(self)
begin
@refreshed_at = Time.now
configure(command("admin", ismaster: 1))
if !primary? && executing?(:ensure_primary)
raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {})
elsif !messagable?
# not primary or secondary so mark it as down, since it's probably
# a recovering node withing the replica set
down!
end
rescue Timeout::Error
@refreshed_at = Time.now
configure(command("admin", ismaster: 1))
if !primary? && executing?(:ensure_primary)
raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {})
elsif !messagable?
# not primary or secondary so mark it as down, since it's probably
# a recovering node withing the replica set
down!
end
end
Expand Down Expand Up @@ -629,7 +642,7 @@ def logging(operations)
#
# @since 2.0.0
def pool
@pool ||= Connection::Manager.pool(self)
Connection::Manager.pool(self)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't cache the pool reference here, can't manage the reference across nodes when trying to shutdown an existing one (and create a new one).

end

# Execute a read operation.
Expand Down