Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Add support for tag sets #371

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/moped/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,11 @@ def with_secondary(&block)
private

def available_secondary_nodes
nodes.select(&:secondary?).shuffle!
secondary =nodes.select(&:secondary?)
if options[:tags] && !options[:tags].empty?
secondary = secondary.select{|node| node.tags.merge(options[:tags]) == node.tags}
end
secondary
end

# Apply the credentials on all nodes
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Node
# @return [ Hash ] The node options.
# @!attribute refreshed_at
# @return [ Time ] The last time the node did a refresh.
attr_reader :address, :down_at, :latency, :options, :refreshed_at
attr_reader :address, :down_at, :latency, :options, :refreshed_at, :tags

# @!attribute credentials
# @return [ Hash ] The credentials of the node.
Expand Down Expand Up @@ -251,6 +251,7 @@ def initialize(address, options = {})
@latency = nil
@primary = nil
@secondary = nil
@tags = nil
@credentials = {}
@instrumenter = options[:instrumenter] || Instrumentable::Log
@address = Address.new(address, timeout)
Expand Down Expand Up @@ -511,7 +512,7 @@ def update(database, collection, selector, change, concern, options = {})
#
# @since 1.0.0
def inspect
"<#{self.class.name} resolved_address=#{address.resolved.inspect}>"
"<#{self.class.name} resolved_address=#{address.resolved.inspect} tags=#{tags.inspect}>"
end

private
Expand Down Expand Up @@ -549,6 +550,7 @@ def configure(settings)
@passive = settings["passive"]
@primary = settings["ismaster"]
@secondary = settings["secondary"]
@tags = settings["tags"] ? settings["tags"].to_hash : {}
discover(settings["hosts"]) if auto_discovering?
end

Expand Down
4 changes: 3 additions & 1 deletion lib/moped/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ def logout
# @since 1.5.0
option(:auto_discover).allow(true, false)

option(:tags).allow(Optionable.any(Hash))

# Initialize a new database session.
#
# @example Initialize a new session.
Expand Down Expand Up @@ -319,7 +321,7 @@ def new(options = {})
#
# @since 2.0.0
def read_preference
@read_preference ||= ReadPreference.get(options[:read] || :primary)
@read_preference ||= ReadPreference.get((options[:read] || :primary), options[:tags])
end

# Switch the session's current database.
Expand Down
117 changes: 115 additions & 2 deletions spec/moped/cluster_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,118 @@
end
end

context "with tagsets" do
let(:cluster) do
Moped::Cluster.new(seeds,{tags: {"tag" => "test"}})
end

it "reads tags from all nodes" do
cluster.nodes.map(&:tags).reject{|x| x.empty?}.size.should eq(@secondaries.size)
cluster.nodes.map(&:tags).reject{|x| x.empty?}.uniq.size.should eq(1)
cluster.nodes.select {|n| n.primary? }.map(&:tags).reject{|x| x.empty?}.size.should eq(0)
cluster.nodes.select {|n| n.secondary?}.map(&:tags)[0].should eq({"tag" => "test"})
end

context "when only one secondary is tagged" do
before do
@secondaries[0].tags = nil
@secondaries[0].restart
cluster.refresh
end

it "filters out untagged secondary" do
cluster.send(:available_secondary_nodes).size.should eq(@secondaries.size - 1)
end

describe "#with_primary" do
it "connects and yields the primary node" do
cluster.with_primary do |node|
node.address.original.should eq @primary.address
end
end
end

describe "#with_secondary" do
it "connects and yields a tagged secondary node" do
cluster.with_secondary do |node|
@secondaries.map(&:address).should include node.address.original
node.tags.has_key?("tag").should eq(true)
node.tags["tag"].should eq("test")
end
end
end
end

context "when a node is added to the set" do
context " and the new node is tagged" do
before do
@secondaries.each do |s|
s.tags = {"tag" => "test"}
s.restart
end
node = @replica_set.add_node
node.demote
node.restart
@secondaries.push(node)
cluster.refresh
end

it "includes the newly added tagged node in the available set" do
cluster.send(:available_secondary_nodes).size.should eq(@secondaries.size)
end
end

context" and the new node is untagged" do
before do
@secondaries.each do |s|
s.tags = {"tag" => "test"}
s.restart
end
node = @replica_set.add_node
node.demote
node.tags = nil
node.restart
@secondaries.push(node)
cluster.refresh
end

it "excludes the newly added untagged node from the available set" do
cluster.send(:available_secondary_nodes).size.should eq(@secondaries.size - 1)
end
end

end

context "when there are no tagged nodes" do
before do
@secondaries.each do |s|
s.tags = nil
s.restart
end
cluster.refresh
end

describe "#with_primary" do
it "connects and yields the primary node" do
cluster.with_primary do |node|
node.address.original == @primary.address
end
end
end

describe "#with_secondary" do
it "raises an error" do
expect {
cluster.with_secondary do |node|
node.command "admin", ping: 1
end
}.to raise_error(Moped::Errors::ConnectionFailure)
end
end
end
end


describe "#refresh" do

let(:cluster) do
Expand All @@ -411,8 +523,8 @@
end

it "gets removed from the available nodes and configured nodes" do
cluster.nodes.size.should eq(2)
cluster.seeds.size.should eq(2)
cluster.nodes.size.should eq(@secondaries.size + 1)
cluster.seeds.size.should eq(@secondaries.size + 1)
end
end
end
Expand Down Expand Up @@ -441,6 +553,7 @@
end
end


describe Moped::Cluster, "authentication", mongohq: :auth do

shared_examples_for "authenticable session" do
Expand Down
6 changes: 4 additions & 2 deletions spec/support/replica_set_simulator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,20 @@ class Node

attr_reader :port
attr_reader :host
attr_accessor :tags

def initialize(set, port)
@set = set
@primary = false
@secondary = false

@tags = {"tag" => "test"}
@host = "127.0.0.1"
@port = port
@hiccup_on_next_message = nil
end

def ==(other)
@host == other.host && @port == other.port
@host == other.host && @port == other.port && @tags == other.tags
end

def address
Expand All @@ -132,6 +133,7 @@ def status
{
"ismaster" => @primary,
"secondary" => @secondary,
"tags" => @primary ? nil : @tags,
"hosts" => @set.nodes.map(&:address),
"me" => address,
"maxBsonObjectSize" => 16777216,
Expand Down