diff --git a/lib/moped/connection.rb b/lib/moped/connection.rb index f1de386..fcd753a 100644 --- a/lib/moped/connection.rb +++ b/lib/moped/connection.rb @@ -174,11 +174,21 @@ def write(operations) # # @since 1.2.9 def read_data(socket, length) - data = socket.read(length) - unless data - raise Errors::ConnectionFailure.new( - "Attempted to read #{length} bytes from the socket but nothing was returned." - ) + # Block on data to read for op_timeout seconds + # using the suggested implementation of http://www.ruby-doc.org/core-2.1.3/Kernel.html#method-i-select + # to work with SSL connections + time_left = op_timeout = @options[:op_timeout] || timeout + begin + raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0 + data = socket.read_nonblock(length) + rescue IO::WaitReadable + Kernel::select([socket], nil, [socket], 0.1) + retry + rescue IO::WaitWritable + Kernel::select(nil, [socket], [socket], 0.1) + retry + rescue SystemCallError, IOError => e + raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.") end if data.length < length data << read_data(socket, length - data.length) diff --git a/lib/moped/errors.rb b/lib/moped/errors.rb index 1c32607..59a63ab 100644 --- a/lib/moped/errors.rb +++ b/lib/moped/errors.rb @@ -8,6 +8,9 @@ module Errors # Generic error class for exceptions related to connection failures. class ConnectionFailure < StandardError; end + # Generic error class for exceptions related to read timeout failures. + class OperationTimeout < StandardError; end + # Raised when a database name is invalid. class InvalidDatabaseName < StandardError; end diff --git a/lib/moped/node.rb b/lib/moped/node.rb index 21fdb80..6e0af12 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -94,7 +94,7 @@ def command(database, cmd, options = {}) if reply.command_failure? if reply.unauthorized? && auth.has_key?(database) login(database, *auth[database]) - result = command(database, cmd, options) + raise Errors::ReplicaSetReconfigured.new(operation, result) else raise Errors::OperationFailure.new(operation, result) end @@ -372,14 +372,8 @@ def query(database, collection, selector, options = {}) process(operation) do |reply| if reply.query_failed? if reply.unauthorized? && auth.has_key?(database) - # If we got here, most likely this is the case of Moped - # authenticating successfully against the node originally, but the - # node has been reset or gone down and come back up. The most - # common case here is a rs.stepDown() which will reinitialize the - # connection. In this case we need to requthenticate and try again, - # otherwise we'll just raise the error to the user. login(database, *auth[database]) - reply = query(database, collection, selector, options) + raise Errors::ReplicaSetReconfigured.new(operation, reply.documents.first) else raise Errors::QueryFailure.new(operation, reply.documents.first) end diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index cdae2b0..3721ff6 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -398,7 +398,7 @@ end let(:node) do - stub + double end context "when a node has no peers" do @@ -459,3 +459,199 @@ end end end + +describe Moped::Cluster, "after a reconfiguration" do + let(:options) do + { + max_retries: 30, + retry_interval: 1, + timeout: 5, + database: 'test_db', + consistency: :strong, + safe: {w: 'majority'} + } + end + + let(:replica_set_name) { 'dev' } + + let(:session) do + Moped::Session.new([ "127.0.0.1:31100", "127.0.0.1:31101", "127.0.0.1:31102" ], options) + end + + def servers_status + auth = has_user_admin? ? "-u admin -p admin_pwd --authenticationDatabase admin" : "" + `echo 'rs.status().members[0].stateStr + "|" + rs.status().members[1].stateStr + "|" + rs.status().members[2].stateStr' | mongo --quiet --port 31100 #{auth} 2>/dev/null`.chomp.split("|") + end + + def has_user_admin? + auth = with_authentication? ? "-u admin -p admin_pwd --authenticationDatabase admin" : "" + `echo 'db.getSisterDB("admin").getUser("admin").user' | mongo --quiet --port 31100 #{auth} 2>/dev/null`.chomp == "admin" + end + + def step_down_servers + step_down_file = File.join(Dir.tmpdir, with_authentication? ? "step_down_with_authentication.js" : "step_down_without_authentication.js") + unless File.exists?(step_down_file) + File.open(step_down_file, "w") do |file| + user_data = with_authentication? ? ", 'admin', 'admin_pwd'" : "" + file.puts %{ + function stepDown(dbs) { + for (i in dbs) { + dbs[i].adminCommand({replSetFreeze:5}); + try { dbs[i].adminCommand({replSetStepDown:5}); } catch(e) { print(e) }; + } + }; + + var db1 = connect('localhost:31100/admin'#{user_data}); + var db2 = connect('localhost:31101/admin'#{user_data}); + var db3 = connect('localhost:31102/admin'#{user_data}); + + var dbs = [db1, db2, db3]; + stepDown(dbs); + + while (db1.adminCommand({ismaster:1}).ismaster || db2.adminCommand({ismaster:1}).ismaster || db2.adminCommand({ismaster:1}).ismaster) { + stepDown(dbs); + } + } + end + end + system "mongo --nodb #{step_down_file} 2>&1 > /dev/null" + end + + shared_examples_for "recover the session" do + it "should execute commands normally before the stepDown" do + time = Benchmark.realtime do + session[:foo].find().remove_all() + session[:foo].find().to_a.count.should eql(0) + session[:foo].insert({ name: "bar 1" }) + session[:foo].find().to_a.count.should eql(1) + expect { + session[:foo].insert({ name: "bar 1" }) + }.to raise_exception + end + time.should be < 2 + end + + it "should recover and execute a find" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + session[:foo].find().to_a.count.should eql(1) + end + time.should be > 5 + time.should be < 29 + end + + it "should recover and execute an insert" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + session[:foo].insert({ name: "bar 2" }) + session[:foo].find().to_a.count.should eql(2) + end + time.should be > 5 + time.should be < 29 + + session[:foo].insert({ name: "bar 3" }) + session[:foo].find().to_a.count.should eql(3) + end + + it "should recover and try an insert which hit a constraint" do + session[:foo].find().remove_all() + session[:foo].insert({ name: "bar 1" }) + step_down_servers + time = Benchmark.realtime do + expect { + session[:foo].insert({ name: "bar 1" }) + }.to raise_exception + end + time.should be > 5 + time.should be < 29 + + session[:foo].find().to_a.count.should eql(1) + + session[:foo].insert({ name: "bar 2" }) + session[:foo].find().to_a.count.should eql(2) + end + end + + describe "with authentication off" do + before do + unless servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} && !has_user_admin? + start_mongo_server(31100, "--replSet #{replica_set_name}") + start_mongo_server(31101, "--replSet #{replica_set_name}") + start_mongo_server(31102, "--replSet #{replica_set_name}") + + `echo "rs.initiate({_id : '#{replica_set_name}', 'members' : [{_id:0, host:'localhost:31100'},{_id:1, host:'localhost:31101'},{_id:2, host:'localhost:31102'}]})" | mongo --port 31100` + sleep 0.1 while !servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} + + master = `echo 'db.isMaster().primary' | mongo --quiet --port 31100`.chomp + + `echo " + use test_db; + db.foo.ensureIndex({name:1}, {unique:1}); + " | mongo #{master}` + end + end + + let(:with_authentication?) { false } + + it_should_behave_like "recover the session" + end + + describe "with authentication on" do + before do + unless servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} && has_user_admin? + keyfile = File.join(Dir.tmpdir, "31000", "keyfile") + FileUtils.mkdir_p(File.dirname(keyfile)) + File.open(keyfile, "w") do |f| f.puts "SyrfEmAevWPEbgRZoZx9qZcZtJAAfd269da+kzi0H/7OuowGLxM3yGGUHhD379qP +nw4X8TT2T6ecx6aqJgxG+biJYVOpNK3HHU9Dp5q6Jd0bWGHGGbgFHV32/z2FFiti +EFLimW/vfn2DcJwTW29nQWhz2wN+xfMuwA6hVxFczlQlz5hIY0+a+bQChKw8wDZk +rW1OjTQ//csqPbVA8fwB49ghLGp+o84VujhRxLJ+0sbs8dKoIgmVlX2kLeHGQSf0 +KmF9b8kAWRLwLneOR3ESovXpEoK0qpQb2ym6BNqP32JKyPA6Svb/smVONhjUI71f +/zQ2ETX7ylpxIzw2SMv/zOWcVHBqIbdP9Llrxb3X0EsB6J8PeI8qLjpS94FyEddw +ACMcAxbP+6BaLjXyJ2WsrEeqThAyUC3uF5YN/oQ9XiATqP7pDOTrmfn8LvryyzcB +ByrLRTPOicBaG7y13ATcCbBdrYH3BE4EeLkTUZOg7VzvRnATvDpt0wOkSnbqXow8 +GQ6iMUgd2XvUCuknQLD6gWyoUyHiPADKrLsgnd3Qo9BPxYJ9VWSKB4phK3N7Bic+ +BwxlcpDFzGI285GR4IjcJbRRjjywHq5XHOxrJfN+QrZ/6wy6yu2+4NTPj+BPC5iX +/dNllTEyn7V+pr6FiRv8rv8RcxJgf3nfn/Xz0t2zW2olcalEFxwKKmR20pZxPnSv +Kr6sVHEzh0mtA21LoK5G8bztXsgFgWU7hh9z8UUo7KQQnDfyPb6k4xroeeQtWBNo +TZF1pI5joLytNSEtT+BYA5wQSYm4WCbhG+j7ipcPIJw6Un4ZtAZs0aixDfVE0zo0 +w2FWrYH2dmmCMbz7cEXeqvQiHh9IU/hkTrKGY95STszGGFFjhtS2TbHAn2rRoFI0 +VwNxMJCC+9ZijTWBeGyQOuEupuI4C9IzA5Gz72048tpZ0qMJ9mOiH3lZFtNTg/5P +28Td2xzaujtXjRnP3aZ9z2lKytlr +" + end + + File.chmod(0600, keyfile) + + start_mongo_server(31100, "--replSet #{replica_set_name} --keyFile #{keyfile} --auth") + start_mongo_server(31101, "--replSet #{replica_set_name} --keyFile #{keyfile} --auth") + start_mongo_server(31102, "--replSet #{replica_set_name} --keyFile #{keyfile} --auth") + + `echo "rs.initiate({_id : '#{replica_set_name}', 'members' : [{_id:0, host:'localhost:31100'},{_id:1, host:'localhost:31101'},{_id:2, host:'localhost:31102'}]})" | mongo --port 31100` + sleep 0.1 while !servers_status.all?{|st| st == "PRIMARY" || st == "SECONDARY"} + + master = `echo 'db.isMaster().primary' | mongo --quiet --port 31100`.chomp + + `echo " + use admin; + db.addUser('admin', 'admin_pwd'); + " | mongo #{master}` + + `echo " + use test_db; + db.addUser('common', 'common_pwd'); + db.foo.ensureIndex({name:1}, {unique:1}); + " | mongo #{master} -u admin -p admin_pwd --authenticationDatabase admin` + end + + session.login('common', 'common_pwd') + end + + let(:with_authentication?) { true } + + it_should_behave_like "recover the session" + end +end diff --git a/spec/moped/query_spec.rb b/spec/moped/query_spec.rb index 11ea0aa..3a8f252 100644 --- a/spec/moped/query_spec.rb +++ b/spec/moped/query_spec.rb @@ -334,7 +334,7 @@ it "raises an error when hinting an invalid index" do expect { users.find(scope: scope).hint(scope: 1).to_a - }.to raise_error(Moped::Errors::QueryFailure, %r{failed with error 10113: "bad hint"}) + }.to raise_error(Moped::Errors::QueryFailure, %r{bad hint}) end end @@ -919,6 +919,42 @@ end end + context "with test commands enabled" do + + let(:session) do + Moped::Session.new([ "127.0.0.1:#{port}" ], database: "moped_test") + end + + let(:users) do + session.with(safe: true)[:users] + end + + describe "when a query take too long" do + let(:port) { 31100 } + + before do + start_mongo_server(port, "--setParameter enableTestCommands=1") + Process.detach(spawn("echo 'db.adminCommand({sleep: 1, w: true, secs: 10})' | mongo localhost:#{port} 2>&1 > /dev/null")) + sleep(1) # to sleep command on mongodb begins work + end + + after do + stop_mongo_server(port) + end + + it "raises a operation timeout exception" do + time = Benchmark.realtime do + expect { + Timeout::timeout(7) do + users.find("age" => { "$gte" => 65 }).first + end + }.to raise_exception("Took more than 5 seconds to receive data.") + end + expect(time).to be < 5.5 + end + end + end + context "with a remote connection", mongohq: :auth do before(:all) do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6209e43..16a0eab 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -15,6 +15,10 @@ $:.unshift((Pathname(__FILE__).dirname.parent + "lib").to_s) +require "benchmark" +require "fileutils" +require "tmpdir" +require "tempfile" require "moped" require "support/examples" require "support/mongohq" @@ -35,7 +39,29 @@ return true if value == :auth && !Support::MongoHQ.auth_node_configured? end + config.after(:suite) do + stop_mongo_server(31100) + stop_mongo_server(31101) + stop_mongo_server(31102) + end + unless Support::MongoHQ.replica_set_configured? || Support::MongoHQ.auth_node_configured? $stderr.puts Support::MongoHQ.message end end + +def start_mongo_server(port, extra_options=nil) + stop_mongo_server(port) + dbpath = File.join(Dir.tmpdir, port.to_s) + FileUtils.mkdir_p(dbpath) + `mongod --oplogSize 40 --noprealloc --smallfiles --port #{port} --dbpath #{dbpath} --logpath #{dbpath}/log --pidfilepath #{dbpath}/pid --fork #{extra_options}` + + sleep 0.1 while `echo 'db.runCommand({ping:1}).ok' | mongo --quiet --port #{port}`.chomp != "1" +end + +def stop_mongo_server(port) + dbpath = File.join(Dir.tmpdir, port.to_s) + pidfile = File.join(dbpath, "pid") + `kill #{File.read(pidfile).chomp}` if File.exists?(pidfile) + FileUtils.rm_rf(dbpath) +end diff --git a/spec/support/replica_set_simulator.rb b/spec/support/replica_set_simulator.rb index bdbf0ea..3d482cd 100644 --- a/spec/support/replica_set_simulator.rb +++ b/spec/support/replica_set_simulator.rb @@ -308,7 +308,13 @@ def next_client servers.each do |server| Moped.logger.debug "replica_set: accepting new client for #{server.port}" - @clients << server.accept + begin + @clients << server.accept + rescue IOError, Errno::EBADF, TypeError + # Looks like we hit a bad file descriptor or closed connection. + Moped.logger.debug "replica_set: io error, retrying" + retry + end end Moped.logger.debug "replica_set: closing dead clients"