diff --git a/.gitignore b/.gitignore index 111f8dea..a188004a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ pkg/* tmp/* .*.sw[a-z] database.log +.idea diff --git a/lib/octopus.rb b/lib/octopus.rb index 56f051a4..a8e47bce 100644 --- a/lib/octopus.rb +++ b/lib/octopus.rb @@ -6,26 +6,33 @@ require 'erb' module Octopus + DEFAULT_ENVIRONMENTS = ['production'] + def self.env + return @env if @env @env ||= 'octopus' end def self.rails_env + return @rails_env if @rails_env @rails_env ||= defined?(::Rails.env) ? Rails.env.to_s : 'shards' end def self.config - @config ||= begin + return @config if @config + + @config = begin file_name = File.join(Octopus.directory, 'config/shards.yml').to_s if File.exist?(file_name) || File.symlink?(file_name) - config ||= HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(file_name)).result))[Octopus.env] - else - config ||= HashWithIndifferentAccess.new + hash = HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(file_name)).result)) || HashWithIndifferentAccess.new + config ||= hash[Octopus.env] end - config + config ||= HashWithIndifferentAccess.new end + + @config end def self.load_balancer=(balancer) @@ -33,6 +40,7 @@ def self.load_balancer=(balancer) end def self.load_balancer + return @load_balancer if @load_balancer @load_balancer ||= Octopus::LoadBalancing::RoundRobin end @@ -58,6 +66,7 @@ def self.enabled? # Returns the Rails.root_to_s when you are using rails # Running the current directory in a generic Ruby process def self.directory + return @directory if @directory @directory ||= defined?(::Rails.root) ? Rails.root.to_s : Dir.pwd end @@ -73,7 +82,15 @@ def self.environments=(environments) end def self.environments - @environments ||= config['environments'] || ['production'] + return @environments if @environments + + if config + environments = (config['environments'] || []) + else + environments = [] + end + + @environments = environments.compact.uniq end def self.robust_environments=(environments) @@ -83,7 +100,15 @@ def self.robust_environments=(environments) # Environments in which to swallow failures from a single shard # when iterating through all. def self.robust_environments - @robust_environments ||= config['robust_environments'] || ['production'] + return @robust_environments if @robust_environments + + if config + robust_environments = (config['robust_environments'] || []) + DEFAULT_ENVIRONMENTS + else + robust_environments = DEFAULT_ENVIRONMENTS + end + + @robust_environments = robust_environments.compact.uniq end def self.robust_environment? @@ -105,11 +130,15 @@ def self.rails41? attr_writer :logger def self.logger + return @logger if @logger + if defined?(Rails.logger) - @logger ||= Rails.logger + @logger = Rails.logger else - @logger ||= Logger.new($stderr) + @logger = Logger.new($stderr) end + + @logger end def self.shards=(shards) diff --git a/lib/octopus/load_balancing/round_robin.rb b/lib/octopus/load_balancing/round_robin.rb index a1673417..ed4a550c 100644 --- a/lib/octopus/load_balancing/round_robin.rb +++ b/lib/octopus/load_balancing/round_robin.rb @@ -1,4 +1,5 @@ require 'octopus/load_balancing' +require 'thread' # The round-robin load balancing of slaves belonging to the same shard. # It is a pool that contains slaves which queries are distributed to. @@ -8,11 +9,14 @@ class RoundRobin def initialize(slaves_list) @slaves_list = slaves_list @slave_index = 0 + @semaphore = Mutex.new end # Returns the next available slave in the pool def next(options) - @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + @semaphore.synchronize { + @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + } end end end diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index c3bac619..bc5a9881 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -34,7 +34,6 @@ def initialize_shards(config) end @shards_config ||= [] - @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) @@ -86,7 +85,12 @@ def initialize_shards(config) end def initialize_replication(config) - @replicated = true + if config.key?('replicated') + @replicated = config['replicated'] + else + @replicated = true + end + if config.key?('fully_replicated') @fully_replicated = config['fully_replicated'] else @@ -190,7 +194,8 @@ def last_current_shard=(last_current_shard) end def fully_replicated? - @fully_replicated || Thread.current[FULLY_REPLICATED_KEY] + return @fully_replicated if Thread.current[FULLY_REPLICATED_KEY].nil? + Thread.current[FULLY_REPLICATED_KEY] end # Public: Whether or not a group exists with the given name converted to a @@ -293,12 +298,16 @@ def method_missing(method, *args, &block) self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) - elsif should_send_queries_to_shard_slave_group?(method) - send_queries_to_shard_slave_group(method, *args, &block) - elsif should_send_queries_to_slave_group?(method) - send_queries_to_slave_group(method, *args, &block) - elsif should_send_queries_to_replicated_databases?(method) - send_queries_to_selected_slave(method, *args, &block) + elsif method_contains_select?(method) + if should_send_queries_to_shard_slave_group?(method) + send_queries_to_shard_slave_group(method, *args, &block) + elsif should_send_queries_to_slave_group?(method) + send_queries_to_slave_group(method, *args, &block) + elsif should_send_queries_to_replicated_databases?(method) + send_queries_to_selected_slave(method, *args, &block) + else + select_connection.send(method, *args, &block) + end else select_connection.send(method, *args, &block) end @@ -426,13 +435,17 @@ def resolve_string_connection(spec) end end + def method_contains_select?(method) + !(method.to_s =~ /select/).nil? + end + def should_clean_connection_proxy?(method) - method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard) + !(method.to_s =~ /insert|select|execute/).nil? && !current_model_replicated? && (!block || block != current_shard) end # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined def should_send_queries_to_replicated_databases?(method) - @replicated && method.to_s =~ /select/ && !block && !slaves_grouped? + @replicated && !block && !slaves_grouped? end def current_model_replicated? @@ -459,11 +472,11 @@ def send_queries_to_selected_slave(method, *args, &block) # while ensuring that we revert `current_shard` from the selected slave to the (shard's) master # not to make queries other than SELECT leak to the slave. def should_use_slaves_for_method?(method) - current_model_replicated? && method.to_s =~ /select/ + current_model_replicated? end def slaves_grouped? - @slave_groups.present? + current_group && @slave_groups[current_group] end # Temporarily switch `current_shard` to the next slave in a slave group and send queries to it @@ -501,7 +514,6 @@ def using_shard(shard, &_block) older_slave_group = current_slave_group older_load_balance_options = current_load_balance_options - begin unless current_model && !current_model.allowed_shard?(shard) self.current_shard = shard diff --git a/spec/octopus/octopus_spec.rb b/spec/octopus/octopus_spec.rb index b931ece8..97b7d6ae 100644 --- a/spec/octopus/octopus_spec.rb +++ b/spec/octopus/octopus_spec.rb @@ -49,7 +49,7 @@ describe '#setup' do it 'should have the default octopus environment as production' do - expect(Octopus.environments).to eq(['production']) + expect(Octopus.environments).to eq([]) end it 'should allow the user to configure the octopus environments' do diff --git a/spec/octopus/replicated_slave_grouped_spec.rb b/spec/octopus/replicated_slave_grouped_spec.rb index 49148cb7..6d5918f3 100644 --- a/spec/octopus/replicated_slave_grouped_spec.rb +++ b/spec/octopus/replicated_slave_grouped_spec.rb @@ -54,11 +54,13 @@ # In `database.yml` and `shards.yml`, we have configured 1 master and 4 slaves. # So we can ensure Octopus is not distributing queries between them # by asserting 1 + 4 = 5 queries go to :master(`octopus_shard_1`) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) + Octopus.using(:master) do + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + end end end @@ -74,7 +76,7 @@ end end end - + it 'should restore previous slave group after a using block' do OctopusHelper.using_environment :replicated_slave_grouped do Cat.create!(:name => 'Thiago1')