diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1113189..57d40ba 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,32 +8,31 @@ jobs: strategy: fail-fast: false matrix: - ruby_version: ["2.6", "2.7", "3.0"] - postgres-version: ["12", "13", "14", "15", "16"] + ruby_version: ["3.0.2"] runs-on: ubuntu-latest services: postgres: - image: postgres:${{ matrix.postgres-version }} + image: postgres:14.2 env: POSTGRES_DB: que-test POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password ports: - - 5432:5432 + - 5437:5432 options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 10 lock_database: - image: postgres:${{ matrix.postgres-version }} + image: postgres:14.2 env: POSTGRES_DB: lock-test POSTGRES_USER: ubuntu POSTGRES_PASSWORD: password ports: - - 5434:5432 + - 5438:5432 options: >- --health-cmd pg_isready --health-interval 10s @@ -58,4 +57,4 @@ jobs: ruby-version: "${{ matrix.ruby-version }}" - name: Run specs run: | - bundle exec rspec \ No newline at end of file + bundle exec rspec \ No newline at end of file diff --git a/lib/que/locker.rb b/lib/que/locker.rb index d93815b..85a51b1 100644 --- a/lib/que/locker.rb +++ b/lib/que/locker.rb @@ -172,7 +172,9 @@ def lock_job_with_lock_database(queue, cursor) .where(retryable: true) .order(:priority, :run_at, :job_id) .limit(1).to_sql - + + puts "code" + puts query result = Que.execute(query).first return result if result.nil? @@ -188,8 +190,10 @@ def locked?(job_id) def handle_expired_cursors! @consolidated_queues.each do |queue| - queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) - reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now + queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now) + + + reset_cursor_for!(queue) if queue_cursor_expires_at <= monotonic_now end end diff --git a/spec/lib/que/locker_spec.rb b/spec/lib/que/locker_spec.rb index 5b4066d..8bbc58f 100644 --- a/spec/lib/que/locker_spec.rb +++ b/spec/lib/que/locker_spec.rb @@ -59,6 +59,9 @@ def expect_to_lock_with(cursor:) end context "with just one job to lock" do + before do + described_class.instance_variable_set(:@queue_cursors, [0]) + end let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs } let(:cursor_expiry) { 60 } @@ -95,7 +98,9 @@ def expect_to_lock_with(cursor:) context "with non-zero cursor expiry" do let(:cursor_expiry) { 5 } - before { allow(locker).to receive(:monotonic_now) { @epoch } } + before do + allow(locker).to receive(:monotonic_now) { @epoch } + end # This test simulates the repeated locking of jobs. We're trying to prove that # the locker will use the previous jobs ID as a cursor until the expiry has @@ -112,7 +117,8 @@ def expect_to_lock_with(cursor:) expect_to_lock_with(cursor: job_1[:job_id]) expect_to_work(job_2) - @epoch += cursor_expiry # our cursor should now expire + @epoch += (cursor_expiry) # our cursor should now expire + # puts @epoch expect_to_lock_with(cursor: 0) expect_to_work(job_3) end @@ -139,8 +145,10 @@ def expect_to_lock_with(cursor:) .order(:priority, :run_at, :job_id) .limit(1).to_sql end + let(:now) {Time.now} + before do - allow(Time).to receive(:now).and_return(Time.now + 1) + allow(Time).to receive(:now).and_return(now) end describe ".with_locked_job" do @@ -173,7 +181,8 @@ def expect_to_work(job) end def query(cursor = 0) QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count) - .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now) + .select("extract(epoch from (now() - run_at)) as latency") + .where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, now) .where(retryable: true) .order(:priority, :run_at, :job_id) .limit(1).to_sql @@ -191,7 +200,7 @@ def expect_to_lock_with(cursor:, locked_job: nil) context "with no jobs to lock" do it "scans entire table and calls block with nil job" do expect(Que).to receive(:execute).with(query).and_call_original - + puts query with_locked_job do |job| expect(job).to be_nil end @@ -199,7 +208,7 @@ def expect_to_lock_with(cursor:, locked_job: nil) end context "with just one job to lock" do - let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs } + let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1, run_at: now - 1).attrs } let(:cursor_expiry) { 60 } # Pretend time isn't moving, as we don't want to test cursor expiry here @@ -220,9 +229,9 @@ def expect_to_lock_with(cursor:, locked_job: nil) end context "with jobs to lock" do - let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs } - let!(:job_2) { FakeJob.enqueue(2, queue: queue, priority: 2).attrs } - let!(:job_3) { FakeJob.enqueue(3, queue: queue, priority: 3).attrs } + let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1, run_at: now - 1).attrs } + let!(:job_2) { FakeJob.enqueue(2, queue: queue, priority: 2, run_at: now - 1).attrs } + let!(:job_3) { FakeJob.enqueue(3, queue: queue, priority: 3, run_at: now - 1).attrs } it "locks and then unlocks the most important job" do expect_to_lock_with(cursor: 0, locked_job: job_1)