diff --git a/.github/workflows/development.yml b/.github/workflows/development.yml index 6b842918..34e09d2f 100644 --- a/.github/workflows/development.yml +++ b/.github/workflows/development.yml @@ -6,19 +6,19 @@ jobs: test: runs-on: ${{matrix.os}}-latest continue-on-error: ${{matrix.experimental}} - + strategy: matrix: os: - ubuntu - + ruby: - 2.6 - 2.7 - + experimental: [false] env: [""] - + include: - os: ubuntu ruby: head @@ -30,14 +30,14 @@ jobs: with: ruby-version: ${{matrix.ruby}} bundler-cache: true - + - name: Start server timeout-minutes: 5 env: TERM: dumb run: .github/workflows/start_cluster.sh 2 - + - name: Run tests timeout-minutes: 30 env: diff --git a/aerospike.gemspec b/aerospike.gemspec index 3ba10f22..ba0b1283 100644 --- a/aerospike.gemspec +++ b/aerospike.gemspec @@ -8,7 +8,7 @@ Gem::Specification.new do |s| s.name = "aerospike" s.version = Aerospike::VERSION s.platform = Gem::Platform::RUBY - s.authors = [ "Khosrow Afroozeh", "Jan Hecking" ] + s.authors = ["Khosrow Afroozeh", "Jan Hecking", "Sachin Venkatesha Murthy"] s.email = [ "khosrow@aerospike.com", "jhecking@aerospike.com" ] s.homepage = "http://www.github.com/aerospike/aerospike-client-ruby" s.summary = "An Aerospike driver for Ruby." diff --git a/lib/aerospike/aerospike_exception.rb b/lib/aerospike/aerospike_exception.rb index 9650eaae..15c328a9 100644 --- a/lib/aerospike/aerospike_exception.rb +++ b/lib/aerospike/aerospike_exception.rb @@ -96,5 +96,11 @@ def initialize(msg=nil) super(ResultCode::INVALID_NAMESPACE, msg) end end + + class MaxConnectionsExceeded < Aerospike + def initialize(msg = nil) + super(ResultCode::MAX_CONNECTION_EXCEEDED, msg) + end + end end end diff --git a/lib/aerospike/client.rb b/lib/aerospike/client.rb index 7bc43acd..e43037c7 100644 --- a/lib/aerospike/client.rb +++ b/lib/aerospike/client.rb @@ -225,7 +225,6 @@ def truncate(namespace, set_name = nil, before_last_update = nil, options = {}) policy = create_policy(options, Policy, default_info_policy) node = @cluster.random_node - conn = node.get_connection(policy.timeout) if set_name && !set_name.to_s.strip.empty? str_cmd = "truncate:namespace=#{namespace}" diff --git a/lib/aerospike/cluster.rb b/lib/aerospike/cluster.rb index 7e403ba3..c4c52b40 100644 --- a/lib/aerospike/cluster.rb +++ b/lib/aerospike/cluster.rb @@ -321,14 +321,24 @@ def update_partitions(parser) def request_info(policy, *commands) node = random_node - conn = node.get_connection(policy.timeout) + begin + conn = node.get_connection(policy.timeout) + rescue => e + Aerospike.logger.error("Get connection failed with exception: #{e}") + raise e + end Info.request(conn, *commands).tap do node.put_connection(conn) end end def request_node_info(node, policy, *commands) - conn = node.get_connection(policy.timeout) + begin + conn = node.get_connection(policy.timeout) + rescue => e + Aerospike.logger.error("Get connection failed with exception: #{e}") + raise e + end Info.request(conn, *commands).tap do node.put_connection(conn) end diff --git a/lib/aerospike/command/admin_command.rb b/lib/aerospike/command/admin_command.rb index 4840b111..4a1ceb19 100644 --- a/lib/aerospike/command/admin_command.rb +++ b/lib/aerospike/command/admin_command.rb @@ -345,14 +345,13 @@ def execute_command(cluster, policy) timeout = 1 timeout = policy.timeout if policy && policy.timeout > 0 - conn = node.get_connection(timeout) - begin + conn = node.get_connection(timeout) conn.write(@data_buffer, @data_offset) conn.read(@data_buffer, HEADER_SIZE) node.put_connection(conn) rescue => e - conn.close if conn + node.close_connection(conn) if conn raise e end @@ -377,13 +376,11 @@ def read_users(cluster, policy) status, list = read_user_blocks(conn) node.put_connection(conn) rescue => e - conn.close if conn + node.close_connection(conn) if conn raise e end - raise Exceptions::Aerospike.new(status) if status > 0 - - return list + list end def read_user_blocks(conn) @@ -512,13 +509,13 @@ def read_roles(cluster, policy) status, list = read_role_blocks(conn) node.put_connection(conn) rescue => e - conn.close if conn + node.close_connection(conn) if conn raise e end raise Exceptions::Aerospike.new(status) if status > 0 - return list + list end def read_role_blocks(conn) diff --git a/lib/aerospike/command/command.rb b/lib/aerospike/command/command.rb index a8088d9e..47004f36 100644 --- a/lib/aerospike/command/command.rb +++ b/lib/aerospike/command/command.rb @@ -487,6 +487,9 @@ def execute @node = get_node @conn = @node.get_connection(@policy.timeout) rescue => e + if e.is_a?(Aerospike::Exceptions::MaxConnectionsExceeded) + Aerospike.logger.error("Maximum connections established. No new connection can be created. #{e}") + end if @node # Socket connection error has occurred. Decrease health and retry. @node.decrease_health @@ -510,7 +513,7 @@ def execute # All runtime exceptions are considered fatal. Do not retry. # Close socket to flush out possible garbage. Do not put back in pool. - @conn.close if @conn + @node.close_connection(@conn) if @conn raise e end @@ -523,7 +526,7 @@ def execute rescue => e # IO errors are considered temporary anomalies. Retry. # Close socket to flush out possible garbage. Do not put back in pool. - @conn.close if @conn + @node.close_connection(@conn) if @conn Aerospike.logger.error("Node #{@node.to_s}: #{e}") # IO error means connection to server @node is unhealthy. @@ -548,7 +551,7 @@ def execute # cancelling/closing the batch/multi commands will return an error, which will # close the connection to throw away its data and signal the server about the # situation. We will not put back the connection in the buffer. - @conn.close if @conn + @node.close_connection(@conn) if @conn raise e end diff --git a/lib/aerospike/connection/authenticate.rb b/lib/aerospike/connection/authenticate.rb index 7f206dce..4bd6ff5f 100644 --- a/lib/aerospike/connection/authenticate.rb +++ b/lib/aerospike/connection/authenticate.rb @@ -31,10 +31,11 @@ def call(conn, user, hashed_pass) end end end + module AuthenticateNew class << self INVALID_SESSION_ERR = [ResultCode::INVALID_CREDENTIAL, - ResultCode::EXPIRED_SESSION] + ResultCode::EXPIRED_SESSION] def call(conn, cluster) command = LoginCommand.new @@ -65,4 +66,3 @@ def call(conn, cluster) end end end - diff --git a/lib/aerospike/node.rb b/lib/aerospike/node.rb index d5dc976d..a4f9560e 100644 --- a/lib/aerospike/node.rb +++ b/lib/aerospike/node.rb @@ -84,14 +84,14 @@ def get_connection(timeout) # Put back a connection to the cache. If cache is full, the connection will be # closed and discarded def put_connection(conn) - conn.close if !active? + @connections.cleanup(conn) unless active? @connections.offer(conn) end # Separate connection for refreshing def tend_connection if @tend_connection.nil? || @tend_connection.closed? - @tend_connection = Cluster::CreateConnection.(cluster, host) + @tend_connection = @connections.create end @tend_connection end @@ -177,7 +177,7 @@ def aliases @aliases.value end - # Marks node as inactice and closes all cached connections + # Marks node as inactive and closes all cached connections def close inactive! close_connections @@ -224,14 +224,18 @@ def refresh_reset Node::Refresh::Reset.(self) end + def close_connection(conn) + @connections.cleanup(conn) + end + private def close_connections - @tend_connection.close if @tend_connection + @connections.cleanup(@tend_connection) if @tend_connection # drain connections and close all of them # non-blocking, does not call create_block when passed false - while conn = @connections.poll(false) - conn.close if conn + while (conn = @connections.poll(create_new: false)) + @connections.cleanup(conn) end end end # class Node diff --git a/lib/aerospike/node/refresh/info.rb b/lib/aerospike/node/refresh/info.rb index 358504f7..beef6c29 100644 --- a/lib/aerospike/node/refresh/info.rb +++ b/lib/aerospike/node/refresh/info.rb @@ -51,7 +51,7 @@ def call(node, peers) peers.refresh_count += 1 node.reset_failures! rescue ::Aerospike::Exceptions::Aerospike => e - conn.close if conn + node.close_connection(conn) if conn node.decrease_health peers.generation_changed = true if peers.use_peers? Refresh::Failed.(node, e) diff --git a/lib/aerospike/node/refresh/partitions.rb b/lib/aerospike/node/refresh/partitions.rb index b0aa81c6..38c4fe6b 100644 --- a/lib/aerospike/node/refresh/partitions.rb +++ b/lib/aerospike/node/refresh/partitions.rb @@ -30,7 +30,7 @@ def call(node, peers) parser = PartitionParser.new(node, conn) node.cluster.update_partitions(parser) rescue ::Aerospike::Exceptions::Aerospike => e - conn.close + node.close_connection(conn) Refresh::Failed.(node, e) end diff --git a/lib/aerospike/node/refresh/racks.rb b/lib/aerospike/node/refresh/racks.rb index 0f5a1e36..8c2cc4d2 100644 --- a/lib/aerospike/node/refresh/racks.rb +++ b/lib/aerospike/node/refresh/racks.rb @@ -30,7 +30,7 @@ def call(node) parser = RackParser.new(node, conn) node.update_racks(parser) rescue ::Aerospike::Exceptions::Aerospike => e - conn.close + node.close_connection(conn) Refresh::Failed.(node, e) end diff --git a/lib/aerospike/node_validator.rb b/lib/aerospike/node_validator.rb index 471c8bc9..d1fa0765 100644 --- a/lib/aerospike/node_validator.rb +++ b/lib/aerospike/node_validator.rb @@ -101,4 +101,4 @@ def is_ip?(hostname) end end # class -end # module +end # module \ No newline at end of file diff --git a/lib/aerospike/result_code.rb b/lib/aerospike/result_code.rb index 2b410456..ca5719ce 100644 --- a/lib/aerospike/result_code.rb +++ b/lib/aerospike/result_code.rb @@ -20,6 +20,9 @@ module ResultCode attr_reader :code + #Maximum connections established. New connections cannot be created. + MAX_CONNECTION_EXCEEDED = -21 + # One or more keys failed in a batch. BATCH_FAILED = -20 @@ -299,6 +302,7 @@ module ResultCode # Internal error. QUERY_DUPLICATE = 215 + def self.message(code) case code when BATCH_FAILED @@ -577,6 +581,9 @@ def self.message(code) when QUERY_DUPLICATE "Internal query error" + when MAX_CONNECTION_EXCEEDED + "Maximum new connections exceeded." + else "ResultCode #{code} unknown in the client. Please file a github issue." end # case diff --git a/lib/aerospike/task/execute_task.rb b/lib/aerospike/task/execute_task.rb index 7e8a2b62..1e564ddf 100644 --- a/lib/aerospike/task/execute_task.rb +++ b/lib/aerospike/task/execute_task.rb @@ -45,8 +45,12 @@ def all_nodes_done? elsif node.supports_feature?(Aerospike::Features::QUERY_SHOW) command = cmd2 end - - conn = node.get_connection(0) + begin + conn = node.get_connection(0) + rescue => e + Aerospike.logger.error("Get connection failed with exception: #{e}") + raise e + end responseMap, _ = Info.request(conn, command) node.put_connection(conn) diff --git a/lib/aerospike/task/index_task.rb b/lib/aerospike/task/index_task.rb index e263d0ae..198e059a 100644 --- a/lib/aerospike/task/index_task.rb +++ b/lib/aerospike/task/index_task.rb @@ -42,7 +42,12 @@ def all_nodes_done? nodes = @cluster.nodes nodes.each do |node| - conn = node.get_connection(1) + begin + conn = node.get_connection(1) + rescue => e + Aerospike.logger.error("Get connection failed with exception: #{e}") + raise e + end response_map = Info.request(conn, command) _, response = response_map.first match = response.to_s.match(MATCHER) diff --git a/lib/aerospike/task/udf_register_task.rb b/lib/aerospike/task/udf_register_task.rb index 2619fbc2..989a8c5c 100644 --- a/lib/aerospike/task/udf_register_task.rb +++ b/lib/aerospike/task/udf_register_task.rb @@ -39,7 +39,12 @@ def all_nodes_done? nodes = @cluster.nodes nodes.each do |node| - conn = node.get_connection(1) + begin + conn = node.get_connection(1) + rescue => e + Aerospike.logger.error("Get connection failed with exception: #{e}") + raise e + end response_map = Info.request(conn, command) _, response = response_map.first index = response.to_s.index("filename=#{@package_name}") @@ -47,7 +52,7 @@ def all_nodes_done? return false if index.nil? end - return true + true end end # class diff --git a/lib/aerospike/task/udf_remove_task.rb b/lib/aerospike/task/udf_remove_task.rb index 8ad4c52f..d977371a 100644 --- a/lib/aerospike/task/udf_remove_task.rb +++ b/lib/aerospike/task/udf_remove_task.rb @@ -39,7 +39,12 @@ def all_nodes_done? nodes = @cluster.nodes nodes.each do |node| - conn = node.get_connection(1) + begin + conn = node.get_connection(1) + rescue => e + Aerospike.logger.error("Get connection failed with exception: #{e}") + raise e + end response_map = Info.request(conn, command) _, response = response_map.first index = response.to_s.index("filename=#{@package_name}") diff --git a/lib/aerospike/utils/connection_pool.rb b/lib/aerospike/utils/connection_pool.rb index c923fc73..8b01a344 100644 --- a/lib/aerospike/utils/connection_pool.rb +++ b/lib/aerospike/utils/connection_pool.rb @@ -17,29 +17,75 @@ module Aerospike class ConnectionPool < Pool - attr_accessor :cluster, :host + attr_accessor :cluster, :host, :total_connections + # Creates a new connection pool. + # @param cluster [Aerospike::Cluster] The Aerospike cluster that this connection pool belongs to. + # @param host [Aerospike::Host] The host that this connection pool connects to. def initialize(cluster, host) self.cluster = cluster self.host = host + @total_connections = 0 + @mutex = Mutex.new super(cluster.connection_queue_size) end + # Creates a new connection to the Aerospike server node. + # @return [Aerospike::Connection] A new connection to the Aerospike server node. + # @raise [Aerospike::Exceptions::MaxConnectionsExceeded] if the maximum number of connections has been reached. def create conn = nil - loop do - conn = cluster.create_connection(host) - break if conn.connected? + @mutex.synchronize do + if @total_connections >= @max_size + raise Aerospike::Exceptions::MaxConnectionsExceeded + else + conn = cluster.create_connection(host) + if conn.connected? + @total_connections += 1 + end + end end conn end + # Checks if the given connection is alive. + # @param conn [Aerospike::Connection] The connection to check. + # @return [Boolean] `true` if the connection is alive, `false` otherwise. def check(conn) conn.alive? end + # Cleans up the given connection by closing it and decrementing the connection count. + # @param conn [Aerospike::Connection] The connection to clean up. def cleanup(conn) - conn.close if conn + @mutex.synchronize do + begin + if conn&.connected? + conn.close + end + rescue => e + Aerospike.logger.error("Error occurred while closing a connection") + raise e + end + @total_connections -= 1 + end + end + + def connections + @pool.keys.map { |id| @pool[id].object } + end + + # Closes all the connections in the pool. + def close_all + connections.each do |conn| + conn.close if conn.connected? + end + @total_connections = 0 + end + + # Destroys the connection pool and closes all the connections. + def self.finalize(id) + ObjectSpace._id2ref(id).close_all end end end diff --git a/lib/aerospike/utils/pool.rb b/lib/aerospike/utils/pool.rb index 43bc1268..8ce983b0 100644 --- a/lib/aerospike/utils/pool.rb +++ b/lib/aerospike/utils/pool.rb @@ -38,11 +38,11 @@ def offer(obj) end alias_method :<<, :offer - def poll(create_new=true) + def poll(create_new: true) non_block = true begin obj = @pool.pop(non_block) - if !check(obj) + unless check(obj) cleanup(obj) obj = nil end diff --git a/lib/aerospike/version.rb b/lib/aerospike/version.rb index 16f0de48..d9d23c90 100644 --- a/lib/aerospike/version.rb +++ b/lib/aerospike/version.rb @@ -1,4 +1,4 @@ # encoding: utf-8 module Aerospike - VERSION = "2.26.0" + VERSION = "2.26.1" end diff --git a/spec/aerospike/cdt/cdt_bit_spec.rb b/spec/aerospike/cdt/cdt_bit_spec.rb index c73c121a..0976fdc8 100644 --- a/spec/aerospike/cdt/cdt_bit_spec.rb +++ b/spec/aerospike/cdt/cdt_bit_spec.rb @@ -53,7 +53,7 @@ def assert_bit_modify_region(bin_sz, offset, set_sz, expected, is_insert, *ops) int_sz = set_sz if set_sz < int_sz bin_bit_sz = bin_sz * 8 - bin_bit_sz += set_sz if is_insert + bin_bit_sz += set_sz if is_insert full_ops = ops.dup full_ops << BitOperation.lscan(bin_name, offset, set_sz, true) @@ -121,9 +121,9 @@ def assert_bit_modify_operations(initial, expected, *ops) end def assert_throws(code, *ops) - expect { + expect do client.operate(key, ops) - }.to raise_error (Aerospike::Exceptions::Aerospike){ |error| + end.to raise_error(Aerospike::Exceptions::Aerospike) { |error| error.result_code == code } end @@ -521,7 +521,7 @@ def assert_throws(code, *ops) [0x55, 0x54], ] - results = record.bins[bin_name].map{ |elem| elem.bytes} + results = record.bins[bin_name].map { |elem| elem.bytes} expect(results).to eq expected end @@ -665,7 +665,7 @@ def assert_throws(code, *ops) (0..(bin_bit_sz - set_sz)).each do |offset| limit = 16 - limit = set_sz + 1 if set_sz < 16 + limit = set_sz + 1 if set_sz < 16 (0..limit).each do |n_bits| assert_bit_modify_region_not_insert(bin_sz, offset, set_sz, set_data, @@ -695,7 +695,7 @@ def assert_throws(code, *ops) (0..(bin_bit_sz - set_sz)).each do |offset| limit = 16 limit = set_sz + 1 if set_sz < 16 - + (0..limit).each do |n_bits| assert_bit_modify_region_not_insert(bin_sz, offset, set_sz, set_data, BitOperation.set(bin_name, offset, set_sz, diff --git a/spec/aerospike/node/refresh/info_spec.rb b/spec/aerospike/node/refresh/info_spec.rb index 1e292e60..52e65965 100644 --- a/spec/aerospike/node/refresh/info_spec.rb +++ b/spec/aerospike/node/refresh/info_spec.rb @@ -20,17 +20,17 @@ RSpec.describe Aerospike::Node::Refresh::Info, skip: Support.is_jruby? do let(:node) { double } let(:cluster) { double } - let(:peers) { ::Aerospike::Peers.new } + let(:peers) { Aerospike::Peers.new } let(:connection) { spy } before do - allow(::Aerospike::Node::Verify::PeersGeneration).to receive(:call) - allow(::Aerospike::Node::Verify::PartitionGeneration).to receive(:call) - allow(::Aerospike::Node::Verify::RebalanceGeneration).to receive(:call) - allow(::Aerospike::Node::Verify::ClusterName).to receive(:call) - allow(::Aerospike::Node::Verify::Name).to receive(:call) - allow(::Aerospike::Node::Refresh::Failed).to receive(:call) - allow(::Aerospike::Node::Refresh::Friends).to receive(:call) + allow(Aerospike::Node::Verify::PeersGeneration).to receive(:call) + allow(Aerospike::Node::Verify::PartitionGeneration).to receive(:call) + allow(Aerospike::Node::Verify::RebalanceGeneration).to receive(:call) + allow(Aerospike::Node::Verify::ClusterName).to receive(:call) + allow(Aerospike::Node::Verify::Name).to receive(:call) + allow(Aerospike::Node::Refresh::Failed).to receive(:call) + allow(Aerospike::Node::Refresh::Friends).to receive(:call) allow(node).to receive(:tend_connection).and_return(connection) allow(node).to receive(:decrease_health) allow(node).to receive(:restore_health) @@ -39,6 +39,7 @@ allow(node).to receive(:get_connection).and_return(connection) allow(node).to receive(:cluster).and_return(cluster) allow(cluster).to receive(:rack_aware).and_return(true) + allow(node).to receive(:close_connection) end describe '::call' do @@ -50,9 +51,9 @@ call! end - it { expect(::Aerospike::Node::Verify::PeersGeneration).to have_received(:call) } - it { expect(::Aerospike::Node::Verify::PartitionGeneration).to have_received(:call) } - it { expect(::Aerospike::Node::Refresh::Friends).not_to have_received(:call) } + it { expect(Aerospike::Node::Verify::PeersGeneration).to have_received(:call) } + it { expect(Aerospike::Node::Verify::PartitionGeneration).to have_received(:call) } + it { expect(Aerospike::Node::Refresh::Friends).not_to have_received(:call) } it { expect(node).to have_received(:reset_failures!) } end @@ -62,22 +63,22 @@ call! end - it { expect(::Aerospike::Node::Verify::PartitionGeneration).to have_received(:call) } - it { expect(::Aerospike::Node::Refresh::Friends).to have_received(:call) } + it { expect(Aerospike::Node::Verify::PartitionGeneration).to have_received(:call) } + it { expect(Aerospike::Node::Refresh::Friends).to have_received(:call) } it { expect(node).to have_received(:reset_failures!) } end context 'when node name verification fails' do before do - allow(::Aerospike::Node::Verify::Name).to receive(:call).and_raise( - ::Aerospike::Exceptions::Aerospike.new(0, '') + allow(Aerospike::Node::Verify::Name).to receive(:call).and_raise( + Aerospike::Exceptions::Aerospike.new(0, '') ) call! end - it { expect(connection).to have_received(:close) } + it { expect(node).to have_received(:close_connection) } it { expect(node).to have_received(:decrease_health) } - it { expect(::Aerospike::Node::Refresh::Failed).to have_received(:call) } + it { expect(Aerospike::Node::Refresh::Failed).to have_received(:call) } end end end diff --git a/spec/aerospike/node/refresh/partitions_spec.rb b/spec/aerospike/node/refresh/partitions_spec.rb index f67bf4e5..a72025f1 100644 --- a/spec/aerospike/node/refresh/partitions_spec.rb +++ b/spec/aerospike/node/refresh/partitions_spec.rb @@ -20,7 +20,7 @@ RSpec.describe Aerospike::Node::Refresh::Partitions do let(:node) { double } let(:cluster) { double } - let(:conn) { double} + let(:conn) { double } let(:peers) { double } let(:refresh_count) { 10 } let(:healthy) { true } @@ -29,9 +29,10 @@ allow(node).to receive(:name).and_return('dummy') allow(node).to receive(:tend_connection).and_return(conn) allow(node).to receive(:cluster).and_return(cluster) + allow(node).to receive(:close_connection) allow(cluster).to receive(:update_partitions) allow(conn).to receive(:close) - allow(::Aerospike::Node::Refresh::Failed).to receive(:call) + allow(Aerospike::Node::Refresh::Failed).to receive(:call) end describe '::call' do @@ -41,7 +42,7 @@ allow(described_class).to receive(:should_refresh?).and_return(healthy) end - context 'with healty node' do + context 'with healthy node' do before { refresh } it { expect(cluster).to have_received(:update_partitions) } @@ -57,13 +58,13 @@ context 'when cluster.update_partitions fails' do before do - allow(cluster).to receive(:update_partitions).and_raise(::Aerospike::Exceptions::Aerospike.new(0)) + allow(cluster).to receive(:update_partitions).and_raise(Aerospike::Exceptions::Aerospike.new(0)) refresh end - it { expect(conn).to have_received(:close) } - it { expect(::Aerospike::Node::Refresh::Failed).to have_received(:call) } + it { expect(node).to have_received(:close_connection) } + it { expect(Aerospike::Node::Refresh::Failed).to have_received(:call) } end end end diff --git a/spec/aerospike/util/connection_pool_spec.rb b/spec/aerospike/util/connection_pool_spec.rb index 02daa4be..70150600 100644 --- a/spec/aerospike/util/connection_pool_spec.rb +++ b/spec/aerospike/util/connection_pool_spec.rb @@ -16,13 +16,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. - +require 'rspec' +require 'aerospike' RSpec.describe Aerospike::ConnectionPool do let(:pool_size) { 5 } let(:cluster) { double(connection_queue_size: pool_size) } - let(:host) { double() } + let(:host) { double } let(:instance) { described_class.new(cluster, host) } let(:good_connection) { double(:connected? => true, :alive? => true) } + let(:connection_queue) { [] } + let(:connection) { double('connection') } + describe ".poll" do context "when pool is empty" do @@ -31,8 +35,7 @@ end it "creates a new connection" do - connection = instance.poll() - + connection = instance.poll expect(connection).to be(good_connection) end end @@ -43,12 +46,14 @@ end it "returns the idle connection" do - connection = instance.poll() + connection = instance.poll expect(connection).to be(good_connection) end end + + context "when pool contains a dead connection" do let(:dead_connection) { spy(:connected? => true, :alive? => false) } @@ -60,11 +65,72 @@ end it "discards the dead connection" do - connection = instance.poll() + connection = instance.poll expect(connection).to be(good_connection) expect(dead_connection).to have_received(:close) end end end + + + describe 'enforce max_connection' do + let(:connection_to_be_closed) { double('Aerospike::Connection', connected?: true, close: true) } + + it 'closes the connection and decrements total_connections count' do + instance.total_connections = pool_size + instance.cleanup(connection_to_be_closed) + expect(connection_to_be_closed).to have_received(:close) + expect(instance.total_connections).to eq(pool_size - 1) + end + + before do + allow(cluster).to receive(:create_connection).with(host).and_return(good_connection) + end + + it 'creates a connection and increments total_connections count' do + instance.total_connections = 0 + conn = instance.poll + expect(conn).to be(good_connection) + expect(instance.total_connections).to eq(1) + end + + context "enforce max connections as a hard limit" do + before do + allow(cluster).to receive(:create_connection).with(host).and_return(good_connection) + allow(instance).to receive(:cleanup) + pool_size.times do + conn = instance.poll + connection_queue << conn + end + end + + it "raise an max connection exceeded exception" do + expect { instance.poll }.to raise_aerospike_error(-21) + end + end + + context "create a new connection after close" + before do + allow(cluster).to receive(:create_connection).with(host).and_return(good_connection) + end + + it 'creates maximum number of connections and closes one of them' do + connection_queue << connection_to_be_closed + pool = Aerospike::ConnectionPool.new(cluster, host) + (1..pool_size).each do + new_conn = pool.create + connection_queue << new_conn + end + expect { pool.create }.to raise_aerospike_error(-21) + connection_to_close = connection_queue[0] + pool.cleanup(connection_to_close) + expect(pool.total_connections).to eq(pool_size - 1) + + new_connection = pool.create + expect(new_connection).to be(good_connection) + expect(new_connection.connected?).to be true + expect(pool.total_connections).to eq(pool_size) + end + end end