From eac12ae0ac07a1fe7ce8dc51ee940f0a0a052492 Mon Sep 17 00:00:00 2001 From: Joel Niedfeldt Date: Thu, 11 Dec 2014 14:55:53 -0500 Subject: [PATCH] Rescue connection pool saturation with PoolSaturated and message, make disconnect shutdown a pool/connections to start fresh, add logging around failover events and better debug messaging in places. --- lib/moped/connection/manager.rb | 56 +++++++++++++++++++++++++++++++++ lib/moped/failover.rb | 4 ++- lib/moped/node.rb | 47 +++++++++++++++++---------- 3 files changed, 89 insertions(+), 18 deletions(-) diff --git a/lib/moped/connection/manager.rb b/lib/moped/connection/manager.rb index 4939853..e4d7f3d 100644 --- a/lib/moped/connection/manager.rb +++ b/lib/moped/connection/manager.rb @@ -13,6 +13,9 @@ module Manager # Used for synchronization of pools access. MUTEX = Mutex.new + # Used for synchronization of pool shutdown. + SHUTDOWN_MUTEX = Mutex.new + # The default max size for the connection pool. POOL_SIZE = 5 @@ -35,6 +38,41 @@ def pool(node) end end + # Shut down a pool for a node while immediately clearing + # the cached pool so a new one can be created by another + # thread. + # + # @example Shut down a pool for a node + # Manager.shutdown_pool(node, pool) + # + # @param [ Node ] The node. + # @param [ ConnectionPool ] The current pool to shutdown. + # + # @return [ Boolean ] true. + # + # @since 2.0.3 + def shutdown_pool(node, pool) + pool_id = "#{node.address.resolved}-#{pool.object_id}" + + SHUTDOWN_MUTEX.synchronize do + return if !!shutting_down[pool_id] + Moped.logger.debug("MOPED: Shutting down connection pool:#{pool.object_id} for node:#{node.inspect}") + shutting_down[pool_id] = true + MUTEX.synchronize do + pools[node.address.resolved] = nil + end + end + + begin + if pool + pool.shutdown {|conn| conn.disconnect } + end + ensure + shutting_down[pool_id] = false + end + true + end + private # Create a new connection pool for the provided node. @@ -50,6 +88,8 @@ def pool(node) # # @since 2.0.0 def create_pool(node) + Moped.logger.debug("MOPED: Creating new connection pool for #{node.inspect}") + ConnectionPool.new( size: node.options[:pool_size] || POOL_SIZE, timeout: node.options[:pool_timeout] || TIMEOUT @@ -77,6 +117,22 @@ def create_pool(node) def pools @pools ||= {} end + + # Used for tracking whether the current pool is already being shutdown + # by another thread. + # + # @api private + # + # @example Determine if a pool is already being shutdown. + # Manager.shutting_down + # + # @return [ Hash ] The state of a pool shutting down. + # + # @since 2.0.3 + def shutting_down + @shutting_down ||= {} + end + end end end diff --git a/lib/moped/failover.rb b/lib/moped/failover.rb index d893adc..95180e6 100644 --- a/lib/moped/failover.rb +++ b/lib/moped/failover.rb @@ -35,7 +35,9 @@ module Failover # # @since 2.0.0 def get(exception) - STRATEGIES.fetch(exception.class, Disconnect) + strategy=STRATEGIES.fetch(exception.class, Disconnect) + Moped.logger.warn("MOPED: Failover strategy for exception #{exception.class} executing #{strategy}") + strategy end end end diff --git a/lib/moped/node.rb b/lib/moped/node.rb index c0959f2..40edb79 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -107,12 +107,27 @@ def connected? # @example Get the node's connection. # node.connection # + # @rescue [ Timeout::Error ] When there are no available connections in the queue + # + # @raise [ PoolSaturated ] Raise the underlying Timeout::Error as PoolSaturated + # with a message to configure pool_size. + # # @return [ Connection ] The connection. # # @since 2.0.0 def connection - pool.with do |conn| - yield(conn) + begin + pool.with do |conn| + yield(conn) + end + rescue Timeout::Error + raise Errors::PoolSaturated, "#{$!}. Try increasing pool_size or pool_timeout." + + # Could possibly rescue this here as it is one of 2 exceptions that connection_pool + # will raise, but we already have retry code other places that will wait and hopefully + # get a fresh pool if the current one is being shutdown? + # rescue ConnectionPool::PoolShuttingDownError + end end @@ -128,7 +143,10 @@ def down? @down_at end - # Force the node to disconnect from the server. + # Force the node to disconnect from the server by shutting + # down the pool (and disconnecting all connections as they + # are available/checked back in). A new pool is immediately + # made available to other threads while this one shuts down. # # @example Disconnect the node. # node.disconnect @@ -137,7 +155,7 @@ def down? # # @since 1.2.0 def disconnect - connection{ |conn| conn.disconnect } if address.resolved + Connection::Manager.shutdown_pool(self,pool) true end @@ -185,7 +203,6 @@ def ensure_connected(&block) ensure end_execution(:connection) end - end # Set a flag on the node for the duration of provided block so that an @@ -427,17 +444,13 @@ def query(database, collection, selector, options = {}) # @since 1.0.0 def refresh if address.resolve(self) - begin - @refreshed_at = Time.now - configure(command("admin", ismaster: 1)) - if !primary? && executing?(:ensure_primary) - raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {}) - elsif !messagable? - # not primary or secondary so mark it as down, since it's probably - # a recovering node withing the replica set - down! - end - rescue Timeout::Error + @refreshed_at = Time.now + configure(command("admin", ismaster: 1)) + if !primary? && executing?(:ensure_primary) + raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {}) + elsif !messagable? + # not primary or secondary so mark it as down, since it's probably + # a recovering node withing the replica set down! end end @@ -629,7 +642,7 @@ def logging(operations) # # @since 2.0.0 def pool - @pool ||= Connection::Manager.pool(self) + Connection::Manager.pool(self) end # Execute a read operation.