Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed global variable #364

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pkg/*
tmp/*
.*.sw[a-z]
database.log
.idea
47 changes: 38 additions & 9 deletions lib/octopus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,41 @@
require 'erb'

module Octopus
DEFAULT_ENVIRONMENTS = ['production']

def self.env
return @env if @env
@env ||= 'octopus'
end

def self.rails_env
return @rails_env if @rails_env
@rails_env ||= defined?(::Rails.env) ? Rails.env.to_s : 'shards'
end

def self.config
@config ||= begin
return @config if @config

@config = begin
file_name = File.join(Octopus.directory, 'config/shards.yml').to_s

if File.exist?(file_name) || File.symlink?(file_name)
config ||= HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(file_name)).result))[Octopus.env]
else
config ||= HashWithIndifferentAccess.new
hash = HashWithIndifferentAccess.new(YAML.load(ERB.new(File.read(file_name)).result)) || HashWithIndifferentAccess.new
config ||= hash[Octopus.env]
end

config
config ||= HashWithIndifferentAccess.new
end

@config
end

def self.load_balancer=(balancer)
@load_balancer = balancer
end

def self.load_balancer
return @load_balancer if @load_balancer
@load_balancer ||= Octopus::LoadBalancing::RoundRobin
end

Expand All @@ -58,6 +66,7 @@ def self.enabled?
# Returns the Rails.root_to_s when you are using rails
# Running the current directory in a generic Ruby process
def self.directory
return @directory if @directory
@directory ||= defined?(::Rails.root) ? Rails.root.to_s : Dir.pwd
end

Expand All @@ -73,7 +82,15 @@ def self.environments=(environments)
end

def self.environments
@environments ||= config['environments'] || ['production']
return @environments if @environments

if config
environments = (config['environments'] || [])
else
environments = []
end

@environments = environments.compact.uniq
end

def self.robust_environments=(environments)
Expand All @@ -83,7 +100,15 @@ def self.robust_environments=(environments)
# Environments in which to swallow failures from a single shard
# when iterating through all.
def self.robust_environments
@robust_environments ||= config['robust_environments'] || ['production']
return @robust_environments if @robust_environments

if config
robust_environments = (config['robust_environments'] || []) + DEFAULT_ENVIRONMENTS
else
robust_environments = DEFAULT_ENVIRONMENTS
end

@robust_environments = robust_environments.compact.uniq
end

def self.robust_environment?
Expand All @@ -105,11 +130,15 @@ def self.rails41?
attr_writer :logger

def self.logger
return @logger if @logger

if defined?(Rails.logger)
@logger ||= Rails.logger
@logger = Rails.logger
else
@logger ||= Logger.new($stderr)
@logger = Logger.new($stderr)
end

@logger
end

def self.shards=(shards)
Expand Down
6 changes: 5 additions & 1 deletion lib/octopus/load_balancing/round_robin.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'octopus/load_balancing'
require 'thread'

# The round-robin load balancing of slaves belonging to the same shard.
# It is a pool that contains slaves which queries are distributed to.
Expand All @@ -8,11 +9,14 @@ class RoundRobin
def initialize(slaves_list)
@slaves_list = slaves_list
@slave_index = 0
@semaphore = Mutex.new
end

# Returns the next available slave in the pool
def next(options)
@slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length]
@semaphore.synchronize {
@slaves_list[@slave_index = (@slave_index + 1) % @slaves_list.length]
}
end
end
end
Expand Down
40 changes: 26 additions & 14 deletions lib/octopus/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def initialize_shards(config)
end

@shards_config ||= []

@shards_config.each do |key, value|
if value.is_a?(String)
value = resolve_string_connection(value).merge(:octopus_shard => key)
Expand Down Expand Up @@ -86,7 +85,12 @@ def initialize_shards(config)
end

def initialize_replication(config)
@replicated = true
if config.key?('replicated')
@replicated = config['replicated']
else
@replicated = true
end

if config.key?('fully_replicated')
@fully_replicated = config['fully_replicated']
else
Expand Down Expand Up @@ -190,7 +194,8 @@ def last_current_shard=(last_current_shard)
end

def fully_replicated?
@fully_replicated || Thread.current[FULLY_REPLICATED_KEY]
return @fully_replicated if Thread.current[FULLY_REPLICATED_KEY].nil?
Thread.current[FULLY_REPLICATED_KEY]
end

# Public: Whether or not a group exists with the given name converted to a
Expand Down Expand Up @@ -293,12 +298,16 @@ def method_missing(method, *args, &block)
self.last_current_shard = current_shard
clean_connection_proxy
conn.send(method, *args, &block)
elsif should_send_queries_to_shard_slave_group?(method)
send_queries_to_shard_slave_group(method, *args, &block)
elsif should_send_queries_to_slave_group?(method)
send_queries_to_slave_group(method, *args, &block)
elsif should_send_queries_to_replicated_databases?(method)
send_queries_to_selected_slave(method, *args, &block)
elsif method_contains_select?(method)
if should_send_queries_to_shard_slave_group?(method)
send_queries_to_shard_slave_group(method, *args, &block)
elsif should_send_queries_to_slave_group?(method)
send_queries_to_slave_group(method, *args, &block)
elsif should_send_queries_to_replicated_databases?(method)
send_queries_to_selected_slave(method, *args, &block)
else
select_connection.send(method, *args, &block)
end
else
select_connection.send(method, *args, &block)
end
Expand Down Expand Up @@ -426,13 +435,17 @@ def resolve_string_connection(spec)
end
end

def method_contains_select?(method)
!(method.to_s =~ /select/).nil?
end

def should_clean_connection_proxy?(method)
method.to_s =~ /insert|select|execute/ && !current_model_replicated? && (!block || block != current_shard)
!(method.to_s =~ /insert|select|execute/).nil? && !current_model_replicated? && (!block || block != current_shard)
end

# Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined
def should_send_queries_to_replicated_databases?(method)
@replicated && method.to_s =~ /select/ && !block && !slaves_grouped?
@replicated && !block && !slaves_grouped?
end

def current_model_replicated?
Expand All @@ -459,11 +472,11 @@ def send_queries_to_selected_slave(method, *args, &block)
# while ensuring that we revert `current_shard` from the selected slave to the (shard's) master
# not to make queries other than SELECT leak to the slave.
def should_use_slaves_for_method?(method)
current_model_replicated? && method.to_s =~ /select/
current_model_replicated?
end

def slaves_grouped?
@slave_groups.present?
current_group && @slave_groups[current_group]
end

# Temporarily switch `current_shard` to the next slave in a slave group and send queries to it
Expand Down Expand Up @@ -501,7 +514,6 @@ def using_shard(shard, &_block)
older_slave_group = current_slave_group
older_load_balance_options = current_load_balance_options


begin
unless current_model && !current_model.allowed_shard?(shard)
self.current_shard = shard
Expand Down
2 changes: 1 addition & 1 deletion spec/octopus/octopus_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

describe '#setup' do
it 'should have the default octopus environment as production' do
expect(Octopus.environments).to eq(['production'])
expect(Octopus.environments).to eq([])
end

it 'should allow the user to configure the octopus environments' do
Expand Down
14 changes: 8 additions & 6 deletions spec/octopus/replicated_slave_grouped_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@
# In `database.yml` and `shards.yml`, we have configured 1 master and 4 slaves.
# So we can ensure Octopus is not distributing queries between them
# by asserting 1 + 4 = 5 queries go to :master(`octopus_shard_1`)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
Octopus.using(:master) do
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
expect(Cat.count).to eq(2)
end
end
end

Expand All @@ -74,7 +76,7 @@
end
end
end

it 'should restore previous slave group after a using block' do
OctopusHelper.using_environment :replicated_slave_grouped do
Cat.create!(:name => 'Thiago1')
Expand Down