From 188a1be9dfddd9a9e72bda945f3fe97814e24ca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lipka?= Date: Wed, 29 Apr 2015 10:35:08 +0200 Subject: [PATCH 1/4] Add support for tag sets --- lib/moped/cluster.rb | 6 +- lib/moped/node.rb | 6 +- spec/moped/cluster_spec.rb | 117 +++++++++++++++++++++++++- spec/support/replica_set_simulator.rb | 6 +- 4 files changed, 128 insertions(+), 7 deletions(-) diff --git a/lib/moped/cluster.rb b/lib/moped/cluster.rb index fa19d96..24e3102 100644 --- a/lib/moped/cluster.rb +++ b/lib/moped/cluster.rb @@ -284,7 +284,11 @@ def with_secondary(&block) private def available_secondary_nodes - nodes.select(&:secondary?).shuffle! + secondary =nodes.select(&:secondary?) + if options[:tags] && !options[:tags].empty? + secondary = secondary.select{|node| node.tags.merge(options[:tags]) == node.tags} + end + secondary end # Apply the credentials on all nodes diff --git a/lib/moped/node.rb b/lib/moped/node.rb index 0cafd4e..e74288d 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -26,7 +26,7 @@ class Node # @return [ Hash ] The node options. # @!attribute refreshed_at # @return [ Time ] The last time the node did a refresh. - attr_reader :address, :down_at, :latency, :options, :refreshed_at + attr_reader :address, :down_at, :latency, :options, :refreshed_at, :tags # @!attribute credentials # @return [ Hash ] The credentials of the node. @@ -251,6 +251,7 @@ def initialize(address, options = {}) @latency = nil @primary = nil @secondary = nil + @tags = nil @credentials = {} @instrumenter = options[:instrumenter] || Instrumentable::Log @address = Address.new(address, timeout) @@ -511,7 +512,7 @@ def update(database, collection, selector, change, concern, options = {}) # # @since 1.0.0 def inspect - "<#{self.class.name} resolved_address=#{address.resolved.inspect}>" + "<#{self.class.name} resolved_address=#{address.resolved.inspect} tags=#{tags.inspect}>" end private @@ -549,6 +550,7 @@ def configure(settings) @passive = settings["passive"] @primary = settings["ismaster"] @secondary = settings["secondary"] + @tags = settings["tags"] ? settings["tags"].to_h.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} : {} discover(settings["hosts"]) if auto_discovering? end diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index baa4d30..7de50e9 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -397,6 +397,118 @@ end end + context "with tagsets" do + let(:cluster) do + Moped::Cluster.new(seeds,{tags: {tag: "test"}}) + end + + it "reads tags from all nodes" do + cluster.nodes.map(&:tags).reject{|x| x.empty?}.size.should eq(@secondaries.size) + cluster.nodes.map(&:tags).reject{|x| x.empty?}.uniq.size.should eq(1) + cluster.nodes.select {|n| n.primary? }.map(&:tags).reject{|x| x.empty?}.size.should eq(0) + cluster.nodes.select {|n| n.secondary?}.map(&:tags)[0].should eq({tag: "test"}) + end + + context "when only one secondary is tagged" do + before do + @secondaries[0].tags = nil + @secondaries[0].restart + cluster.refresh + end + + it "filters out untagged secondary" do + cluster.send(:available_secondary_nodes).size.should eq(@secondaries.size - 1) + end + + describe "#with_primary" do + it "connects and yields the primary node" do + cluster.with_primary do |node| + node.address.original.should eq @primary.address + end + end + end + + describe "#with_secondary" do + it "connects and yields a tagged secondary node" do + cluster.with_secondary do |node| + @secondaries.map(&:address).should include node.address.original + node.tags.has_key?(:tag).should eq(true) + node.tags[:tag].should eq("test") + end + end + end + end + + context "when a node is added to the set" do + context " and the new node is tagged" do + before do + @secondaries.each do |s| + s.tags = {tag: "test"} + s.restart + end + node = @replica_set.add_node + node.demote + node.restart + @secondaries.push(node) + cluster.refresh + end + + it "includes the newly added tagged node in the available set" do + cluster.send(:available_secondary_nodes).size.should eq(@secondaries.size) + end + end + + context" and the new node is untagged" do + before do + @secondaries.each do |s| + s.tags = {tag: "test"} + s.restart + end + node = @replica_set.add_node + node.demote + node.tags = nil + node.restart + @secondaries.push(node) + cluster.refresh + end + + it "excludes the newly added untagged node from the available set" do + cluster.send(:available_secondary_nodes).size.should eq(@secondaries.size - 1) + end + end + + end + + context "when there are no tagged nodes" do + before do + @secondaries.each do |s| + s.tags = nil + s.restart + end + cluster.refresh + end + + describe "#with_primary" do + it "connects and yields the primary node" do + cluster.with_primary do |node| + node.address.original == @primary.address + end + end + end + + describe "#with_secondary" do + it "raises an error" do + expect { + cluster.with_secondary do |node| + node.command "admin", ping: 1 + end + }.to raise_error(Moped::Errors::ConnectionFailure) + end + end + end + end + + describe "#refresh" do let(:cluster) do @@ -411,8 +523,8 @@ end it "gets removed from the available nodes and configured nodes" do - cluster.nodes.size.should eq(2) - cluster.seeds.size.should eq(2) + cluster.nodes.size.should eq(@secondaries.size + 1) + cluster.seeds.size.should eq(@secondaries.size + 1) end end end @@ -441,6 +553,7 @@ end end + describe Moped::Cluster, "authentication", mongohq: :auth do shared_examples_for "authenticable session" do diff --git a/spec/support/replica_set_simulator.rb b/spec/support/replica_set_simulator.rb index cfd89d4..9136721 100644 --- a/spec/support/replica_set_simulator.rb +++ b/spec/support/replica_set_simulator.rb @@ -101,19 +101,20 @@ class Node attr_reader :port attr_reader :host + attr_accessor :tags def initialize(set, port) @set = set @primary = false @secondary = false - + @tags = {"tag" => "test"} @host = "127.0.0.1" @port = port @hiccup_on_next_message = nil end def ==(other) - @host == other.host && @port == other.port + @host == other.host && @port == other.port && @tags == other.tags end def address @@ -132,6 +133,7 @@ def status { "ismaster" => @primary, "secondary" => @secondary, + "tags" => @primary ? nil : @tags, "hosts" => @set.nodes.map(&:address), "me" => address, "maxBsonObjectSize" => 16777216, From 998c513340260552b3726759419adb8ab942ccd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lipka?= Date: Wed, 29 Apr 2015 13:13:30 +0200 Subject: [PATCH 2/4] attempt to fix broken tests --- lib/moped/node.rb | 2 +- lib/moped/session.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/moped/node.rb b/lib/moped/node.rb index e74288d..338ecc0 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -550,7 +550,7 @@ def configure(settings) @passive = settings["passive"] @primary = settings["ismaster"] @secondary = settings["secondary"] - @tags = settings["tags"] ? settings["tags"].to_h.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} : {} + @tags = settings["tags"] ? settings["tags"].to_hash.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} : {} discover(settings["hosts"]) if auto_discovering? end diff --git a/lib/moped/session.rb b/lib/moped/session.rb index 54203fa..0bcc9ff 100644 --- a/lib/moped/session.rb +++ b/lib/moped/session.rb @@ -319,7 +319,7 @@ def new(options = {}) # # @since 2.0.0 def read_preference - @read_preference ||= ReadPreference.get(options[:read] || :primary) + @read_preference ||= ReadPreference.get((options[:read] || :primary), options[:tags]) end # Switch the session's current database. From 218ab3592f11f713c1a054357ac53c4bb29567e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lipka?= Date: Wed, 29 Apr 2015 14:36:40 +0200 Subject: [PATCH 3/4] Allow options[:tags] Hash when creating a session --- lib/moped/session.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/moped/session.rb b/lib/moped/session.rb index 0bcc9ff..9134608 100644 --- a/lib/moped/session.rb +++ b/lib/moped/session.rb @@ -250,6 +250,8 @@ def logout # @since 1.5.0 option(:auto_discover).allow(true, false) + option(:tags).allow(Optionable.any(Hash)) + # Initialize a new database session. # # @example Initialize a new session. From 9363c7d1efa48cca688b54e633295c2acb72b6c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lipka?= Date: Wed, 29 Apr 2015 14:58:14 +0200 Subject: [PATCH 4/4] do not symbolize tags keys --- lib/moped/node.rb | 2 +- spec/moped/cluster_spec.rb | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/moped/node.rb b/lib/moped/node.rb index 338ecc0..00da2ff 100644 --- a/lib/moped/node.rb +++ b/lib/moped/node.rb @@ -550,7 +550,7 @@ def configure(settings) @passive = settings["passive"] @primary = settings["ismaster"] @secondary = settings["secondary"] - @tags = settings["tags"] ? settings["tags"].to_hash.inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} : {} + @tags = settings["tags"] ? settings["tags"].to_hash : {} discover(settings["hosts"]) if auto_discovering? end diff --git a/spec/moped/cluster_spec.rb b/spec/moped/cluster_spec.rb index 7de50e9..6c15c0b 100644 --- a/spec/moped/cluster_spec.rb +++ b/spec/moped/cluster_spec.rb @@ -399,14 +399,14 @@ context "with tagsets" do let(:cluster) do - Moped::Cluster.new(seeds,{tags: {tag: "test"}}) + Moped::Cluster.new(seeds,{tags: {"tag" => "test"}}) end it "reads tags from all nodes" do cluster.nodes.map(&:tags).reject{|x| x.empty?}.size.should eq(@secondaries.size) cluster.nodes.map(&:tags).reject{|x| x.empty?}.uniq.size.should eq(1) cluster.nodes.select {|n| n.primary? }.map(&:tags).reject{|x| x.empty?}.size.should eq(0) - cluster.nodes.select {|n| n.secondary?}.map(&:tags)[0].should eq({tag: "test"}) + cluster.nodes.select {|n| n.secondary?}.map(&:tags)[0].should eq({"tag" => "test"}) end context "when only one secondary is tagged" do @@ -432,8 +432,8 @@ it "connects and yields a tagged secondary node" do cluster.with_secondary do |node| @secondaries.map(&:address).should include node.address.original - node.tags.has_key?(:tag).should eq(true) - node.tags[:tag].should eq("test") + node.tags.has_key?("tag").should eq(true) + node.tags["tag"].should eq("test") end end end @@ -443,7 +443,7 @@ context " and the new node is tagged" do before do @secondaries.each do |s| - s.tags = {tag: "test"} + s.tags = {"tag" => "test"} s.restart end node = @replica_set.add_node @@ -461,7 +461,7 @@ context" and the new node is untagged" do before do @secondaries.each do |s| - s.tags = {tag: "test"} + s.tags = {"tag" => "test"} s.restart end node = @replica_set.add_node