Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock Improvements #129

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions lib/rufus/lock/flock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
require "fileutils"

module Rufus
module Lock

# Returns true if the scheduler has acquired the [exclusive] lock and
# thus may run.
#
# Most of the time, a scheduler is run alone and this method should
# return true. It is useful in cases where among a group of applications
# only one of them should run the scheduler. For schedulers that should
# not run, the method should return false.
#
# Out of the box, rufus-scheduler proposes the
# :lockfile => 'path/to/lock/file' scheduler start option. It makes
# it easy for schedulers on the same machine to determine which should
# run (to first to write the lockfile and lock it). It uses "man 2 flock"
# so it probably won't work reliably on distributed file systems.
#
# If one needs to use a special/different locking mechanism, providing
# overriding implementation for this #lock and the #unlock complement is
# easy.
class Flock
attr_reader :path

def initialize(path)

@path = path.to_s
end

def lock
return true if locked?

@lockfile = nil

FileUtils.mkdir_p(::File.dirname(@path))

file = File.new(@path, File::RDWR | File::CREAT)
locked = file.flock(File::LOCK_NB | File::LOCK_EX)

return false unless locked

now = Time.now

file.print("pid: #{$$}, ")
file.print("scheduler.object_id: #{self.object_id}, ")
file.print("time: #{now}, ")
file.print("timestamp: #{now.to_f}")
file.flush

@lockfile = file

true
end

def unlock
!!(@lockfile.flock(File::LOCK_UN) if @lockfile)
end

def locked?
!!(@lockfile.flock(File::LOCK_NB | File::LOCK_EX) if @lockfile)
end

end
end
end

10 changes: 10 additions & 0 deletions lib/rufus/lock/null.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module Rufus
module Lock
# A lock that can always be acquired
class Null
def lock; true; end
def locked?; true; end
def unlock; true; end
end
end
end
104 changes: 45 additions & 59 deletions lib/rufus/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ module Rufus

class Scheduler

require 'rufus/lock/null'
require 'rufus/lock/flock'
require 'rufus/scheduler/util'
require 'rufus/scheduler/jobs'
require 'rufus/scheduler/cronline'
Expand Down Expand Up @@ -93,7 +95,14 @@ def initialize(opts={})

@thread_key = "rufus_scheduler_#{self.object_id}"

lock || return
if lockfile = opts[:lockfile]
@lock = Rufus::Lock::Flock.new(lockfile)
else
@lock = opts[:lock] || Rufus::Lock::Null.new
end

# Preemptively attempt to grab the lock
@lock.lock

start
end
Expand Down Expand Up @@ -136,7 +145,7 @@ def shutdown(opt=nil)
kill_all_work_threads
end

unlock
@lock.unlock
end

alias stop shutdown
Expand Down Expand Up @@ -324,6 +333,39 @@ def job(job_id)
@jobs[job_id]
end

# Returns true if the scheduler has acquired the [exclusive] lock and
# thus may run.
#
# Most of the time, a scheduler is run alone and this method should
# return true. It is useful in cases where among a group of applications
# only one of them should run the scheduler. For schedulers that should
# not run, the method should return false.
#
# Out of the box, rufus-scheduler proposes the
# :lockfile => 'path/to/lock/file' scheduler start option. It makes
# it easy for schedulers on the same machine to determine which should
# run (to first to write the lockfile and lock it). It uses "man 2 flock"
# so it probably won't work reliably on distributed file systems.
#
# If one needs to use a special/different locking mechanism, providing
# overriding implementation for this #lock and the #unlock complement is
# easy.
#
def lock
@lock.lock
end
# Sister method to #lock, is called when the scheduler shuts down.
#
def unlock
@lock.unlock
end

# Callback called when a job is triggered. If the lock cannot be confirmed,
# the job won't run (though it'll still be scheduled to run again if necessary).
def confirm_lock
@lock.lock
end

# Returns true if this job is currently scheduled.
#
# Takes extra care to answer true if the job is a repeat job
Expand Down Expand Up @@ -423,7 +465,7 @@ def on_error(job, err)
stderr.puts(" #{pre} opts:")
stderr.puts(" #{pre} #{@opts.inspect}")
stderr.puts(" #{pre} frequency: #{self.frequency}")
stderr.puts(" #{pre} lockfile: #{@lockfile.inspect}")
stderr.puts(" #{pre} lock: #{@lock.inspect}")
stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})")
stderr.puts(" #{pre} down?: #{down?}")
stderr.puts(" #{pre} threads: #{self.threads.size}")
Expand Down Expand Up @@ -468,61 +510,6 @@ def fetch(job_or_job_id)
end
end

# Returns true if the scheduler has acquired the [exclusive] lock and
# thus may run.
#
# Most of the time, a scheduler is run alone and this method should
# return true. It is useful in cases where among a group of applications
# only one of them should run the scheduler. For schedulers that should
# not run, the method should return false.
#
# Out of the box, rufus-scheduler proposes the
# :lockfile => 'path/to/lock/file' scheduler start option. It makes
# it easy for schedulers on the same machine to determine which should
# run (to first to write the lockfile and lock it). It uses "man 2 flock"
# so it probably won't work reliably on distributed file systems.
#
# If one needs to use a special/different locking mechanism, providing
# overriding implementation for this #lock and the #unlock complement is
# easy.
#
def lock

@lockfile = nil

return true unless f = @opts[:lockfile]

raise ArgumentError.new(
":lockfile argument must be a string, not a #{f.class}"
) unless f.is_a?(String)

FileUtils.mkdir_p(File.dirname(f))

f = File.new(f, File::RDWR | File::CREAT)
locked = f.flock(File::LOCK_NB | File::LOCK_EX)

return false unless locked

now = Time.now

f.print("pid: #{$$}, ")
f.print("scheduler.object_id: #{self.object_id}, ")
f.print("time: #{now}, ")
f.print("timestamp: #{now.to_f}")
f.flush

@lockfile = f

true
end

# Sister method to #lock, is called when the scheduler shuts down.
#
def unlock

@lockfile.flock(File::LOCK_UN) if @lockfile
end

def terminate_all_jobs

jobs.each { |j| j.unschedule }
Expand Down Expand Up @@ -606,7 +593,6 @@ def timeout_jobs
end

def do_schedule(job_type, t, callable, opts, return_job_instance, block)

fail NotRunningError.new(
'cannot schedule, scheduler is down or shutting down'
) if @started_at == nil
Expand Down
2 changes: 1 addition & 1 deletion lib/rufus/scheduler/job_array.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def delete_unscheduled

@mutex.synchronize {

@array.delete_if { |j| j.next_time.nil? || j.unscheduled_at }
@array.delete_if { |j| !j.next_time || j.unscheduled_at }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmettraux am I right to think this is a bug?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just code, not a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we have:

job = proc { }
scheduler = Rufus::Scheduler.new
scheduler.schedule_in(0, job)

Once the job is triggered, its @next_time value will be false (since it'll only run once). Shouldn't it get deleted from the jobs array at that point?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense.

My example is what happens when confirm_lock returns true, so the job triggers but it doesn't run its block:

callable = proc { }
scheduler = Rufus::Scheduler.new
scheduler.pause # let's do some setup first

# jobs will still be scheduled and triggered, but the trigger method will return early
def scheduler.confirm_lock
  false
end

job = scheduler.schedule_in(0, callable)

job.next_time # => Time object

# let's trigger jobs again
scheduler.resume 

sleep 1

job.next_time # => false

def scheduler.confirm_lock
  true
end

# job is still not run, but stays in scheduler.jobs

So my question: what does it mean when @next_time is false? Doesn't seem like the job is ever run.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@next_time == false, means the job was about to be triggered (pre trigger) but didn't. @next_time == nil is when post trigger.

I'll investigate what you found in gh-130

Thanks a lot!

}
end

Expand Down
43 changes: 43 additions & 0 deletions spec/lock_flock_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

#
# Specifying rufus-scheduler
#
# Tue Aug 13 05:58:48 JST 2013
#

require 'spec_helper'


describe Rufus::Lock::Flock do

before :each do
@lock_path = '.rufus-scheduler.lock'
@lock = Rufus::Lock::Flock.new(@lock_path)
end

after :each do

FileUtils.rm_f(@lock_path)
FileUtils.rm_f('lock.txt')
end

context ':lock => Rufus::Lock::File.new(path)' do

it 'writes down a .rufus-scheduler.lock file' do
@lock.lock

line = File.read(@lock_path)

expect(line).to match(/pid: #{$$}/)
end

it '"flocks" the lock file' do
@lock.lock

f = File.new(@lock_path, 'a')

expect(f.flock(File::LOCK_NB | File::LOCK_EX)).to eq(false)
end
end
end

14 changes: 11 additions & 3 deletions spec/lockfile_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@
expect(f.flock(File::LOCK_NB | File::LOCK_EX)).to eq(false)
end

it 'prevents newer schedulers from starting' do
it 'prevents newer schedulers from running jobs' do

s0 = Rufus::Scheduler.new :lockfile => '.rufus-scheduler.lock'
s1 = Rufus::Scheduler.new :lockfile => '.rufus-scheduler.lock'

expect(s0.started_at).not_to eq(nil)
expect(s1.started_at).to eq(nil)
counter = 0
job = proc { counter += 1 }
s0.schedule_in(0, job)
s1.schedule_in(0, job)

expect(s0).to be_up
expect(s1).to be_up

loop until s0.jobs.empty? && s1.jobs.empty?
expect(counter).to be(1)
end

it 'releases the lockfile when shutting down' do
Expand Down