From bcd1ebb30f6a36187cea17774eef4ce8f214318c Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Ibarra Date: Tue, 29 Mar 2016 21:24:08 +0200 Subject: [PATCH 1/7] Fixed global variable. Fixed round robin with mutex. --- lib/octopus/load_balancing/round_robin.rb | 6 +++++- lib/octopus/proxy.rb | 15 ++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/lib/octopus/load_balancing/round_robin.rb b/lib/octopus/load_balancing/round_robin.rb index a1673417..ed4a550c 100644 --- a/lib/octopus/load_balancing/round_robin.rb +++ b/lib/octopus/load_balancing/round_robin.rb @@ -1,4 +1,5 @@ require 'octopus/load_balancing' +require 'thread' # The round-robin load balancing of slaves belonging to the same shard. # It is a pool that contains slaves which queries are distributed to. @@ -8,11 +9,14 @@ class RoundRobin def initialize(slaves_list) @slaves_list = slaves_list @slave_index = 0 + @semaphore = Mutex.new end # Returns the next available slave in the pool def next(options) - @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + @semaphore.synchronize { + @slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length] + } end end end diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index c3bac619..2dde86b1 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -13,7 +13,6 @@ class Proxy CURRENT_LOAD_BALANCE_OPTIONS_KEY = 'octopus.current_load_balance_options'.freeze BLOCK_KEY = 'octopus.block'.freeze LAST_CURRENT_SHARD_KEY = 'octopus.last_current_shard'.freeze - FULLY_REPLICATED_KEY = 'octopus.fully_replicated'.freeze def initialize(config = Octopus.config) initialize_shards(config) @@ -34,7 +33,6 @@ def initialize_shards(config) end @shards_config ||= [] - @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) @@ -86,7 +84,12 @@ def initialize_shards(config) end def initialize_replication(config) - @replicated = true + if config.key?('replicated') + @replicated = config['replicated'] + else + @replicated = true + end + if config.key?('fully_replicated') @fully_replicated = config['fully_replicated'] else @@ -94,7 +97,6 @@ def initialize_replication(config) end @slaves_list = @shards.keys.map(&:to_s).sort - @slaves_list.delete('master') @slaves_load_balancer = Octopus.load_balancer.new(@slaves_list) end @@ -190,7 +192,7 @@ def last_current_shard=(last_current_shard) end def fully_replicated? - @fully_replicated || Thread.current[FULLY_REPLICATED_KEY] + @fully_replicated end # Public: Whether or not a group exists with the given name converted to a @@ -463,7 +465,7 @@ def should_use_slaves_for_method?(method) end def slaves_grouped? - @slave_groups.present? + current_group && @slave_groups[current_group] end # Temporarily switch `current_shard` to the next slave in a slave group and send queries to it @@ -501,7 +503,6 @@ def using_shard(shard, &_block) older_slave_group = current_slave_group older_load_balance_options = current_load_balance_options - begin unless current_model && !current_model.allowed_shard?(shard) self.current_shard = shard From 70c285ba7161ea7b6be7ae3b54a182f1c5835e8d Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Ibarra Date: Wed, 30 Mar 2016 10:57:05 +0200 Subject: [PATCH 2/7] Fixed test. --- .gitignore | 1 + lib/octopus/proxy.rb | 14 ++++++++++---- spec/octopus/replicated_slave_grouped_spec.rb | 14 ++++++++------ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 111f8dea..a188004a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ pkg/* tmp/* .*.sw[a-z] database.log +.idea diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 2dde86b1..f1b3849e 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -13,6 +13,7 @@ class Proxy CURRENT_LOAD_BALANCE_OPTIONS_KEY = 'octopus.current_load_balance_options'.freeze BLOCK_KEY = 'octopus.block'.freeze LAST_CURRENT_SHARD_KEY = 'octopus.last_current_shard'.freeze + FULLY_REPLICATED_KEY = 'octopus.fully_replicated'.freeze def initialize(config = Octopus.config) initialize_shards(config) @@ -97,6 +98,7 @@ def initialize_replication(config) end @slaves_list = @shards.keys.map(&:to_s).sort + @slaves_list.delete('master') @slaves_load_balancer = Octopus.load_balancer.new(@slaves_list) end @@ -192,7 +194,11 @@ def last_current_shard=(last_current_shard) end def fully_replicated? - @fully_replicated + if Thread.current[FULLY_REPLICATED_KEY].nil? + @fully_replicated + else + @fully_replicated || Thread.current[FULLY_REPLICATED_KEY] + end end # Public: Whether or not a group exists with the given name converted to a @@ -429,12 +435,12 @@ def resolve_string_connection(spec) end def should_clean_connection_proxy?(method) - method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard) + !(method.to_s =~ /insert|select|execute/).nil? && !current_model_replicated? && (!block || block != current_shard) end # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined def should_send_queries_to_replicated_databases?(method) - @replicated && method.to_s =~ /select/ && !block && !slaves_grouped? + @replicated && !(method.to_s =~ /select/).nil? && !block && !slaves_grouped? end def current_model_replicated? @@ -461,7 +467,7 @@ def send_queries_to_selected_slave(method, *args, &block) # while ensuring that we revert `current_shard` from the selected slave to the (shard's) master # not to make queries other than SELECT leak to the slave. def should_use_slaves_for_method?(method) - current_model_replicated? && method.to_s =~ /select/ + current_model_replicated? && !(method.to_s =~ /select/).nil? end def slaves_grouped? diff --git a/spec/octopus/replicated_slave_grouped_spec.rb b/spec/octopus/replicated_slave_grouped_spec.rb index 49148cb7..6d5918f3 100644 --- a/spec/octopus/replicated_slave_grouped_spec.rb +++ b/spec/octopus/replicated_slave_grouped_spec.rb @@ -54,11 +54,13 @@ # In `database.yml` and `shards.yml`, we have configured 1 master and 4 slaves. # So we can ensure Octopus is not distributing queries between them # by asserting 1 + 4 = 5 queries go to :master(`octopus_shard_1`) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) - expect(Cat.count).to eq(2) + Octopus.using(:master) do + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + expect(Cat.count).to eq(2) + end end end @@ -74,7 +76,7 @@ end end end - + it 'should restore previous slave group after a using block' do OctopusHelper.using_environment :replicated_slave_grouped do Cat.create!(:name => 'Thiago1') From 90b69b6b996aef087463a5c0a954eb4606d48ee3 Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Ibarra Date: Wed, 30 Mar 2016 11:20:34 +0200 Subject: [PATCH 3/7] Fixed config when is not defined. --- lib/octopus.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/octopus.rb b/lib/octopus.rb index 56f051a4..79db36bb 100644 --- a/lib/octopus.rb +++ b/lib/octopus.rb @@ -73,7 +73,11 @@ def self.environments=(environments) end def self.environments - @environments ||= config['environments'] || ['production'] + if config + @environments ||= config['environments'] || ['production'] + else + @environments ||= ['production'] + end end def self.robust_environments=(environments) From fb860673659a3841f5207bb89d004c7a93c560b9 Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Date: Wed, 30 Mar 2016 23:32:02 +0200 Subject: [PATCH 4/7] Fixed get environments. --- lib/octopus.rb | 4 ++++ lib/octopus/proxy.rb | 24 ++++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/lib/octopus.rb b/lib/octopus.rb index 79db36bb..e15df45c 100644 --- a/lib/octopus.rb +++ b/lib/octopus.rb @@ -73,11 +73,15 @@ def self.environments=(environments) end def self.environments + return @environments if @environments + if config @environments ||= config['environments'] || ['production'] else @environments ||= ['production'] end + + @environments end def self.robust_environments=(environments) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index f1b3849e..d8e4f345 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -301,12 +301,16 @@ def method_missing(method, *args, &block) self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) - elsif should_send_queries_to_shard_slave_group?(method) - send_queries_to_shard_slave_group(method, *args, &block) - elsif should_send_queries_to_slave_group?(method) - send_queries_to_slave_group(method, *args, &block) - elsif should_send_queries_to_replicated_databases?(method) - send_queries_to_selected_slave(method, *args, &block) + elsif method_contains_select?(method) + if should_send_queries_to_shard_slave_group?(method) + send_queries_to_shard_slave_group(method, *args, &block) + elsif should_send_queries_to_slave_group?(method) + send_queries_to_slave_group(method, *args, &block) + elsif should_send_queries_to_replicated_databases?(method) + send_queries_to_selected_slave(method, *args, &block) + else + select_connection.send(method, *args, &block) + end else select_connection.send(method, *args, &block) end @@ -434,13 +438,17 @@ def resolve_string_connection(spec) end end + def method_contains_select?(method) + method && !(method.to_s =~ /select/).nil? + end + def should_clean_connection_proxy?(method) !(method.to_s =~ /insert|select|execute/).nil? && !current_model_replicated? && (!block || block != current_shard) end # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined def should_send_queries_to_replicated_databases?(method) - @replicated && !(method.to_s =~ /select/).nil? && !block && !slaves_grouped? + @replicated && !block && !slaves_grouped? end def current_model_replicated? @@ -467,7 +475,7 @@ def send_queries_to_selected_slave(method, *args, &block) # while ensuring that we revert `current_shard` from the selected slave to the (shard's) master # not to make queries other than SELECT leak to the slave. def should_use_slaves_for_method?(method) - current_model_replicated? && !(method.to_s =~ /select/).nil? + current_model_replicated? end def slaves_grouped? From e3fdbb36a0f74dc3b99d2ad632409109372e394c Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Date: Thu, 31 Mar 2016 14:59:57 +0200 Subject: [PATCH 5/7] Fixed global variables. --- lib/octopus.rb | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/lib/octopus.rb b/lib/octopus.rb index e15df45c..98e7feb0 100644 --- a/lib/octopus.rb +++ b/lib/octopus.rb @@ -6,26 +6,33 @@ require 'erb' module Octopus + DEFAULT_ENVIRONMENTS = ['production'] + def self.env + return @env if @env @env ||= 'octopus' end def self.rails_env + return @rails_env if @rails_env @rails_env ||= defined?(::Rails.env) ? Rails.env.to_s : 'shards' end def self.config - @config ||= begin + return @config if @config + + @config = begin file_name = File.join(Octopus.directory, 'config/shards.yml').to_s if File.exist?(file_name) || File.symlink?(file_name) - config ||= HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(file_name)).result))[Octopus.env] - else - config ||= HashWithIndifferentAccess.new + hash = HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(file_name)).result)) || HashWithIndifferentAccess.new + config ||= hash[Octopus.env] end - config + config ||= HashWithIndifferentAccess.new end + + @config end def self.load_balancer=(balancer) @@ -33,6 +40,7 @@ def self.load_balancer=(balancer) end def self.load_balancer + return @load_balancer if @load_balancer @load_balancer ||= Octopus::LoadBalancing::RoundRobin end @@ -58,6 +66,7 @@ def self.enabled? # Returns the Rails.root_to_s when you are using rails # Running the current directory in a generic Ruby process def self.directory + return @directory if @directory @directory ||= defined?(::Rails.root) ? Rails.root.to_s : Dir.pwd end @@ -76,12 +85,12 @@ def self.environments return @environments if @environments if config - @environments ||= config['environments'] || ['production'] + environments = (config['environments'] || []) + DEFAULT_ENVIRONMENTS else - @environments ||= ['production'] + environments = DEFAULT_ENVIRONMENTS end - @environments + @environments = environments.compact.uniq end def self.robust_environments=(environments) @@ -91,7 +100,15 @@ def self.robust_environments=(environments) # Environments in which to swallow failures from a single shard # when iterating through all. def self.robust_environments - @robust_environments ||= config['robust_environments'] || ['production'] + return @robust_environments if @robust_environments + + if config + robust_environments = (config['robust_environments'] || []) + DEFAULT_ENVIRONMENTS + else + robust_environments = DEFAULT_ENVIRONMENTS + end + + @robust_environments = robust_environments.compact.uniq end def self.robust_environment? @@ -113,11 +130,15 @@ def self.rails41? attr_writer :logger def self.logger + return @logger if @logger + if defined?(Rails.logger) - @logger ||= Rails.logger + @logger = Rails.logger else - @logger ||= Logger.new($stderr) + @logger = Logger.new($stderr) end + + @logger end def self.shards=(shards) From 2600124aa516d39fe1e431854ac5b701508ebf5c Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Date: Thu, 31 Mar 2016 20:03:15 +0200 Subject: [PATCH 6/7] Fixed mistakes. --- lib/octopus.rb | 4 ++-- lib/octopus/proxy.rb | 7 ++----- spec/octopus/octopus_spec.rb | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/octopus.rb b/lib/octopus.rb index 98e7feb0..a8e47bce 100644 --- a/lib/octopus.rb +++ b/lib/octopus.rb @@ -85,9 +85,9 @@ def self.environments return @environments if @environments if config - environments = (config['environments'] || []) + DEFAULT_ENVIRONMENTS + environments = (config['environments'] || []) else - environments = DEFAULT_ENVIRONMENTS + environments = [] end @environments = environments.compact.uniq diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index d8e4f345..35ef8c7c 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -194,11 +194,8 @@ def last_current_shard=(last_current_shard) end def fully_replicated? - if Thread.current[FULLY_REPLICATED_KEY].nil? - @fully_replicated - else - @fully_replicated || Thread.current[FULLY_REPLICATED_KEY] - end + return @fully_replicated if Thread.current[FULLY_REPLICATED_KEY].nil? + Thread.current[FULLY_REPLICATED_KEY] end # Public: Whether or not a group exists with the given name converted to a diff --git a/spec/octopus/octopus_spec.rb b/spec/octopus/octopus_spec.rb index b931ece8..97b7d6ae 100644 --- a/spec/octopus/octopus_spec.rb +++ b/spec/octopus/octopus_spec.rb @@ -49,7 +49,7 @@ describe '#setup' do it 'should have the default octopus environment as production' do - expect(Octopus.environments).to eq(['production']) + expect(Octopus.environments).to eq([]) end it 'should allow the user to configure the octopus environments' do From e00f0086e5c6a1dda8d21ff75f8b741b4dec0d9c Mon Sep 17 00:00:00 2001 From: Guillermo Guerrero Date: Thu, 31 Mar 2016 20:17:45 +0200 Subject: [PATCH 7/7] remove nil check. --- lib/octopus/proxy.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index 35ef8c7c..bc5a9881 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -436,7 +436,7 @@ def resolve_string_connection(spec) end def method_contains_select?(method) - method && !(method.to_s =~ /select/).nil? + !(method.to_s =~ /select/).nil? end def should_clean_connection_proxy?(method)