diff --git a/lib/data_fabric/connection_proxy.rb b/lib/data_fabric/connection_proxy.rb index 6e3978e..afcbac7 100644 --- a/lib/data_fabric/connection_proxy.rb +++ b/lib/data_fabric/connection_proxy.rb @@ -66,42 +66,38 @@ class ConnectionProxy attr_accessor :status_checker def initialize(model_class, options) - @model_class = model_class + @model_class = model_class @replicated = options[:replicated] @shard_group = options[:shard_by] @prefix = options[:prefix] @dynamic_toggle = options[:dynamic_toggle] @environment = (defined?(Rails) && Rails.env) || ENV["RAILS_ENV"] || "test" set_role('slave') if @replicated - + if @dynamic_toggle && @replicated @status_checker = DataFabricDynamicSwitching.status_for connection_name @status_checker.poller = options[:poller] if options[:poller] @status_checker.poller.checker = options[:checker] if options[:checker] end - + @model_class.send :include, ActiveRecordConnectionMethods if @replicated end - delegate :insert, :update, :delete, :create_table, :rename_table, :drop_table, :add_column, :remove_column, + delegate :insert, :update, :delete, :create_table, :rename_table, :drop_table, :add_column, :remove_column, :change_column, :change_column_default, :rename_column, :add_index, :remove_index, :initialize_schema_information, :dump_schema_information, :execute, :execute_ignore_duplicate, :to => :master delegate :insert_many, :to => :master # ar-extensions bulk insert support - + def transaction(start_db_transaction = true, &block) with_master do connection.transaction(start_db_transaction, &block) end end - + def respond_to?(method) super || connection.respond_to?(method) end - - def locks - Thread.current["#{@model_class}_locks"] ||= [] - end def method_missing(method, *args, &block) DataFabric.logger.debug { "Calling #{method} on #{connection}" } @@ -112,69 +108,37 @@ def connection_name connection_name_builder.join('_') end - def with_master - # Allow nesting of with_master. - self.fixed_role = true - locks << true - old_role = current_role - set_role('master') - yield - ensure - locks.pop - set_role(old_role) - self.fixed_role = false if locks.empty? + def with_master(&block) + with_fixed_role('master', &block) end - - def with_current_db - # Allow nesting of with_current_db. - self.fixed_role = true - locks << true - yield - ensure - locks.pop - self.fixed_role = false if locks.empty? + + def with_current_db(&block) + with_fixed_role(current_role, &block) end - - def with_slave - # Allow nesting of with_slave - self.fixed_role = true - locks << true - old_role = current_role - set_role('slave') - yield - ensure - set_role(old_role) - locks.pop - self.fixed_role = false if locks.empty? + + def with_slave(&block) + with_fixed_role('slave', &block) end - + def connected? current_pool.connected? end - - def fixed_role - Thread.current["#{@model_class}_fixed_role"] - end - - def fixed_role=(arg) - Thread.current["#{@model_class}_fixed_role"] = arg - end def connection current_pool.connection end - + def current_pool if @dynamic_toggle && !fixed_role @status_checker.update_status - + if @status_checker.master? - set_role('master') + set_role('master') else set_role('slave') end end - + name = connection_name self.class.shard_pools[name] ||= begin config = ActiveRecord::Base.configurations[name] @@ -185,6 +149,17 @@ def current_pool private + def with_fixed_role(new_role, &block) + old_fixed_state = fixed_role + self.fixed_role = true + old_role = current_role + set_role(new_role) + yield + ensure + set_role(old_role) + self.fixed_role = old_fixed_state + end + def spec_for(config) config = config.symbolize_keys adapter_method = "#{config[:adapter]}_connection" @@ -212,12 +187,12 @@ def connection_name_builder clauses << @prefix if @prefix clauses << @shard_group if @shard_group clauses << StringProxy.new { DataFabric.active_shard(@shard_group) } if @shard_group - clauses << @environment + clauses << @environment clauses << StringProxy.new { current_role } if @replicated clauses end end - + def set_role(role) Thread.current["#{@model_class}_role"] = role end @@ -225,17 +200,25 @@ def set_role(role) def current_role Thread.current["#{@model_class}_role"] || 'slave' end - + + def fixed_role=(arg) + Thread.current["#{@model_class}_fixed_role"] = arg + end + + def fixed_role + Thread.current["#{@model_class}_fixed_role"] || false + end + def master with_master { return connection } end - + def current_db with_current_db { return connection } end - + def slave with_slave { return connection } end end -end \ No newline at end of file +end diff --git a/test/dynamic_switching_test.rb b/test/dynamic_switching_test.rb index d21ac7b..e3ab692 100644 --- a/test/dynamic_switching_test.rb +++ b/test/dynamic_switching_test.rb @@ -16,11 +16,11 @@ class PollerMock def initialize(behind) @behind = behind end - + def behind? @behind end - + def check_server? true end @@ -31,7 +31,7 @@ def initialize(interval = 2) @checked = Time.now @interval = interval end - + def check_server? time = Time.now if time > @checked + @interval @@ -40,7 +40,7 @@ def check_server? end false end - + def behind? true end @@ -50,34 +50,44 @@ class DynamicSwitchingTest < Test::Unit::TestCase def setup ActiveRecord::Base.configurations = @settings = load_database_yml end - - def test_reads_from_slave_when_below_threshold + + def test_reads_from_slave_when_below_threshold ReplicateModel.connection.status_checker.poller = PollerMock.new(false) assert_equal "test_slave", ReplicateModel.find(1).name end - + def test_reads_from_master_when_above_threshold ReplicateModel.connection.status_checker.poller = PollerMock.new(true) assert_equal "test_master", ReplicateModel.find(1).name end - + def test_with_master_always_goes_to_master ReplicateModel.connection.status_checker.poller = PollerMock.new(false) assert_equal "test_master", ReplicateModel.with_master() { ReplicateModel.find(1).name } end - + def test_with_master_can_be_nested ReplicateModel.connection.status_checker.poller = PollerMock.new(false) - AnotherReplicateModel.connection.status_checker.poller = PollerMock.new(false) - - result = ReplicateModel.with_master do - ReplicateModel.find(1).name + ReplicateModel.with_master { ReplicateModel.find(1).name } + AnotherReplicateModel.with_master { AnotherReplicateModel.find(1).name} + AnotherReplicateModel.connection.status_checker.poller = PollerMock.new(false) # This overwrites the poller set above (it's the same connection). + + result = ReplicateModel.with_master do + ReplicateModel.find(1).name + ReplicateModel.with_master { ReplicateModel.find(1).name } + AnotherReplicateModel.with_master { AnotherReplicateModel.find(1).name} end - assert_equal "test_mastertest_mastertest_master", result + assert_equal "test_mastertest_mastertest_master", result + end + + def test_with_slave_can_be_nested + ReplicateModel.connection.status_checker.poller = PollerMock.new(true) + + result = ReplicateModel.first.name + ReplicateModel.with_slave do + ReplicateModel.with_slave { ReplicateModel.first.name } + ReplicateModel.first.name + end + ReplicateModel.first.name + assert_equal("test_mastertest_slavetest_slavetest_master", result, "Slave should not swap when in a fixed role block.") end - + + def test_find_in_batches_doesnt_swap_during_a_find_when_inside_current_db ReplicateModel.connection.status_checker.poller = TimedPollerMock.new(1) ReplicateModel.with_current_db { ReplicateModel.find_in_batches(:batch_size => 1) { |batch| sleep 0.5; assert_equal "test_slave", batch.first.name } } end -end \ No newline at end of file +end