-
Notifications
You must be signed in to change notification settings - Fork 151
Message for pool saturated, shutdown pool instead of disconnect, logging around failover, etc #338
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,9 @@ module Manager | |
# Used for synchronization of pools access. | ||
MUTEX = Mutex.new | ||
|
||
# Used for synchronization of pool shutdown. | ||
SHUTDOWN_MUTEX = Mutex.new | ||
|
||
# The default max size for the connection pool. | ||
POOL_SIZE = 5 | ||
|
||
|
@@ -35,6 +38,41 @@ def pool(node) | |
end | ||
end | ||
|
||
# Shut down a pool for a node while immediately clearing | ||
# the cached pool so a new one can be created by another | ||
# thread. | ||
# | ||
# @example Shut down a pool for a node | ||
# Manager.shutdown_pool(node, pool) | ||
# | ||
# @param [ Node ] The node. | ||
# @param [ ConnectionPool ] The current pool to shutdown. | ||
# | ||
# @return [ Boolean ] true. | ||
# | ||
# @since 2.0.3 | ||
def shutdown_pool(node, pool) | ||
pool_id = "#{node.address.resolved}-#{pool.object_id}" | ||
|
||
SHUTDOWN_MUTEX.synchronize do | ||
return if !!shutting_down[pool_id] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since other nodes in other threads might be trying to shutdown a pool at the same time, let's make sure that only one thread is shutting trying to shutdown the same pool (and do this part of code quickly so only this thread is blocking on the main shutdown block, which, based on ConnectionPool logic, will wait for all connections to get checked back in to disconnect them. |
||
Moped.logger.debug("MOPED: Shutting down connection pool:#{pool.object_id} for node:#{node.inspect}") | ||
shutting_down[pool_id] = true | ||
MUTEX.synchronize do | ||
pools[node.address.resolved] = nil | ||
end | ||
end | ||
|
||
begin | ||
if pool | ||
pool.shutdown {|conn| conn.disconnect } | ||
end | ||
ensure | ||
shutting_down[pool_id] = false | ||
end | ||
true | ||
end | ||
|
||
private | ||
|
||
# Create a new connection pool for the provided node. | ||
|
@@ -50,6 +88,8 @@ def pool(node) | |
# | ||
# @since 2.0.0 | ||
def create_pool(node) | ||
Moped.logger.debug("MOPED: Creating new connection pool for #{node.inspect}") | ||
|
||
ConnectionPool.new( | ||
size: node.options[:pool_size] || POOL_SIZE, | ||
timeout: node.options[:pool_timeout] || TIMEOUT | ||
|
@@ -77,6 +117,22 @@ def create_pool(node) | |
def pools | ||
@pools ||= {} | ||
end | ||
|
||
# Used for tracking whether the current pool is already being shutdown | ||
# by another thread. | ||
# | ||
# @api private | ||
# | ||
# @example Determine if a pool is already being shutdown. | ||
# Manager.shutting_down | ||
# | ||
# @return [ Hash ] The state of a pool shutting down. | ||
# | ||
# @since 2.0.3 | ||
def shutting_down | ||
@shutting_down ||= {} | ||
end | ||
|
||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,9 @@ module Failover | |
# | ||
# @since 2.0.0 | ||
def get(exception) | ||
STRATEGIES.fetch(exception.class, Disconnect) | ||
strategy=STRATEGIES.fetch(exception.class, Disconnect) | ||
Moped.logger.warn("MOPED: Failover strategy for exception #{exception.class} executing #{strategy}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's log any failover event, so we know what's going on, and see how it's trying to handle it. |
||
strategy | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,12 +107,27 @@ def connected? | |
# @example Get the node's connection. | ||
# node.connection | ||
# | ||
# @rescue [ Timeout::Error ] When there are no available connections in the queue | ||
# | ||
# @raise [ PoolSaturated ] Raise the underlying Timeout::Error as PoolSaturated | ||
# with a message to configure pool_size. | ||
# | ||
# @return [ Connection ] The connection. | ||
# | ||
# @since 2.0.0 | ||
def connection | ||
pool.with do |conn| | ||
yield(conn) | ||
begin | ||
pool.with do |conn| | ||
yield(conn) | ||
end | ||
rescue Timeout::Error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rescue the Timeout::Error here so we don't have to worry about it everywhere else. |
||
raise Errors::PoolSaturated, "#{$!}. Try increasing pool_size or pool_timeout." | ||
|
||
# Could possibly rescue this here as it is one of 2 exceptions that connection_pool | ||
# will raise, but we already have retry code other places that will wait and hopefully | ||
# get a fresh pool if the current one is being shutdown? | ||
# rescue ConnectionPool::PoolShuttingDownError | ||
|
||
end | ||
end | ||
|
||
|
@@ -128,7 +143,10 @@ def down? | |
@down_at | ||
end | ||
|
||
# Force the node to disconnect from the server. | ||
# Force the node to disconnect from the server by shutting | ||
# down the pool (and disconnecting all connections as they | ||
# are available/checked back in). A new pool is immediately | ||
# made available to other threads while this one shuts down. | ||
# | ||
# @example Disconnect the node. | ||
# node.disconnect | ||
|
@@ -137,7 +155,7 @@ def down? | |
# | ||
# @since 1.2.0 | ||
def disconnect | ||
connection{ |conn| conn.disconnect } if address.resolved | ||
Connection::Manager.shutdown_pool(self,pool) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to go straight to the manager to shut it down, sending along the current pool otherwise it's too hard to track across threads/nodes/shutdown events. |
||
true | ||
end | ||
|
||
|
@@ -185,7 +203,6 @@ def ensure_connected(&block) | |
ensure | ||
end_execution(:connection) | ||
end | ||
|
||
end | ||
|
||
# Set a flag on the node for the duration of provided block so that an | ||
|
@@ -427,17 +444,13 @@ def query(database, collection, selector, options = {}) | |
# @since 1.0.0 | ||
def refresh | ||
if address.resolve(self) | ||
begin | ||
@refreshed_at = Time.now | ||
configure(command("admin", ismaster: 1)) | ||
if !primary? && executing?(:ensure_primary) | ||
raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {}) | ||
elsif !messagable? | ||
# not primary or secondary so mark it as down, since it's probably | ||
# a recovering node withing the replica set | ||
down! | ||
end | ||
rescue Timeout::Error | ||
@refreshed_at = Time.now | ||
configure(command("admin", ismaster: 1)) | ||
if !primary? && executing?(:ensure_primary) | ||
raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {}) | ||
elsif !messagable? | ||
# not primary or secondary so mark it as down, since it's probably | ||
# a recovering node withing the replica set | ||
down! | ||
end | ||
end | ||
|
@@ -629,7 +642,7 @@ def logging(operations) | |
# | ||
# @since 2.0.0 | ||
def pool | ||
@pool ||= Connection::Manager.pool(self) | ||
Connection::Manager.pool(self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't cache the pool reference here, can't manage the reference across nodes when trying to shutdown an existing one (and create a new one). |
||
end | ||
|
||
# Execute a read operation. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate Mutex for shutdown so we don't block other threads trying to get/create a new one while this thread shuts down.