Skip to content

Commit

Permalink
Merge pull request #1 from cerego/kevin_master_changes
Browse files Browse the repository at this point in the history
Kevin master changes
  • Loading branch information
Kevin Griffin committed Aug 23, 2012
2 parents e7fb1d9 + 50d1737 commit 1111d58
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 77 deletions.
105 changes: 44 additions & 61 deletions lib/data_fabric/connection_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,42 +66,38 @@ class ConnectionProxy
attr_accessor :status_checker

def initialize(model_class, options)
@model_class = model_class
@model_class = model_class
@replicated = options[:replicated]
@shard_group = options[:shard_by]
@prefix = options[:prefix]
@dynamic_toggle = options[:dynamic_toggle]
@environment = (defined?(Rails) && Rails.env) || ENV["RAILS_ENV"] || "test"
set_role('slave') if @replicated

if @dynamic_toggle && @replicated
@status_checker = DataFabricDynamicSwitching.status_for connection_name
@status_checker.poller = options[:poller] if options[:poller]
@status_checker.poller.checker = options[:checker] if options[:checker]
end

@model_class.send :include, ActiveRecordConnectionMethods if @replicated
end

delegate :insert, :update, :delete, :create_table, :rename_table, :drop_table, :add_column, :remove_column,
delegate :insert, :update, :delete, :create_table, :rename_table, :drop_table, :add_column, :remove_column,
:change_column, :change_column_default, :rename_column, :add_index, :remove_index, :initialize_schema_information,
:dump_schema_information, :execute, :execute_ignore_duplicate, :to => :master

delegate :insert_many, :to => :master # ar-extensions bulk insert support

def transaction(start_db_transaction = true, &block)
with_master do
connection.transaction(start_db_transaction, &block)
end
end

def respond_to?(method)
super || connection.respond_to?(method)
end

def locks
Thread.current["#{@model_class}_locks"] ||= []
end

def method_missing(method, *args, &block)
DataFabric.logger.debug { "Calling #{method} on #{connection}" }
Expand All @@ -112,69 +108,37 @@ def connection_name
connection_name_builder.join('_')
end

def with_master
# Allow nesting of with_master.
self.fixed_role = true
locks << true
old_role = current_role
set_role('master')
yield
ensure
locks.pop
set_role(old_role)
self.fixed_role = false if locks.empty?
def with_master(&block)
with_fixed_role('master', &block)
end

def with_current_db
# Allow nesting of with_current_db.
self.fixed_role = true
locks << true
yield
ensure
locks.pop
self.fixed_role = false if locks.empty?

def with_current_db(&block)
with_fixed_role(current_role, &block)
end

def with_slave
# Allow nesting of with_slave
self.fixed_role = true
locks << true
old_role = current_role
set_role('slave')
yield
ensure
set_role(old_role)
locks.pop
self.fixed_role = false if locks.empty?

def with_slave(&block)
with_fixed_role('slave', &block)
end

def connected?
current_pool.connected?
end

def fixed_role
Thread.current["#{@model_class}_fixed_role"]
end

def fixed_role=(arg)
Thread.current["#{@model_class}_fixed_role"] = arg
end

def connection
current_pool.connection
end

def current_pool
if @dynamic_toggle && !fixed_role
@status_checker.update_status

if @status_checker.master?
set_role('master')
set_role('master')
else
set_role('slave')
end
end

name = connection_name
self.class.shard_pools[name] ||= begin
config = ActiveRecord::Base.configurations[name]
Expand All @@ -185,6 +149,17 @@ def current_pool

private

def with_fixed_role(new_role, &block)
old_fixed_state = fixed_role
self.fixed_role = true
old_role = current_role
set_role(new_role)
yield
ensure
set_role(old_role)
self.fixed_role = old_fixed_state
end

def spec_for(config)
config = config.symbolize_keys
adapter_method = "#{config[:adapter]}_connection"
Expand Down Expand Up @@ -212,30 +187,38 @@ def connection_name_builder
clauses << @prefix if @prefix
clauses << @shard_group if @shard_group
clauses << StringProxy.new { DataFabric.active_shard(@shard_group) } if @shard_group
clauses << @environment
clauses << @environment
clauses << StringProxy.new { current_role } if @replicated
clauses
end
end

def set_role(role)
Thread.current["#{@model_class}_role"] = role
end

def current_role
Thread.current["#{@model_class}_role"] || 'slave'
end


def fixed_role=(arg)
Thread.current["#{@model_class}_fixed_role"] = arg
end

def fixed_role
Thread.current["#{@model_class}_fixed_role"] || false
end

def master
with_master { return connection }
end

def current_db
with_current_db { return connection }
end

def slave
with_slave { return connection }
end
end
end
end
42 changes: 26 additions & 16 deletions test/dynamic_switching_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ class PollerMock
def initialize(behind)
@behind = behind
end

def behind?
@behind
end

def check_server?
true
end
Expand All @@ -31,7 +31,7 @@ def initialize(interval = 2)
@checked = Time.now
@interval = interval
end

def check_server?
time = Time.now
if time > @checked + @interval
Expand All @@ -40,7 +40,7 @@ def check_server?
end
false
end

def behind?
true
end
Expand All @@ -50,34 +50,44 @@ class DynamicSwitchingTest < Test::Unit::TestCase
def setup
ActiveRecord::Base.configurations = @settings = load_database_yml
end
def test_reads_from_slave_when_below_threshold

def test_reads_from_slave_when_below_threshold
ReplicateModel.connection.status_checker.poller = PollerMock.new(false)
assert_equal "test_slave", ReplicateModel.find(1).name
end

def test_reads_from_master_when_above_threshold
ReplicateModel.connection.status_checker.poller = PollerMock.new(true)
assert_equal "test_master", ReplicateModel.find(1).name
end

def test_with_master_always_goes_to_master
ReplicateModel.connection.status_checker.poller = PollerMock.new(false)
assert_equal "test_master", ReplicateModel.with_master() { ReplicateModel.find(1).name }
end

def test_with_master_can_be_nested
ReplicateModel.connection.status_checker.poller = PollerMock.new(false)
AnotherReplicateModel.connection.status_checker.poller = PollerMock.new(false)
result = ReplicateModel.with_master do
ReplicateModel.find(1).name + ReplicateModel.with_master { ReplicateModel.find(1).name } + AnotherReplicateModel.with_master { AnotherReplicateModel.find(1).name}
AnotherReplicateModel.connection.status_checker.poller = PollerMock.new(false) # This overwrites the poller set above (it's the same connection).

result = ReplicateModel.with_master do
ReplicateModel.find(1).name + ReplicateModel.with_master { ReplicateModel.find(1).name } + AnotherReplicateModel.with_master { AnotherReplicateModel.find(1).name}
end
assert_equal "test_mastertest_mastertest_master", result
assert_equal "test_mastertest_mastertest_master", result
end

def test_with_slave_can_be_nested
ReplicateModel.connection.status_checker.poller = PollerMock.new(true)

result = ReplicateModel.first.name + ReplicateModel.with_slave do
ReplicateModel.with_slave { ReplicateModel.first.name } + ReplicateModel.first.name
end + ReplicateModel.first.name
assert_equal("test_mastertest_slavetest_slavetest_master", result, "Slave should not swap when in a fixed role block.")
end



def test_find_in_batches_doesnt_swap_during_a_find_when_inside_current_db
ReplicateModel.connection.status_checker.poller = TimedPollerMock.new(1)
ReplicateModel.with_current_db { ReplicateModel.find_in_batches(:batch_size => 1) { |batch| sleep 0.5; assert_equal "test_slave", batch.first.name } }
end
end
end

0 comments on commit 1111d58

Please sign in to comment.