Skip to content

Commit

Permalink
Do not kill the worker thread on connection timeouts
Browse files Browse the repository at this point in the history
Just like #66, we've seen instances of a que worker using fewer threads
than desired after failing to checking out a connection from AR. In some
cases, this can lead to a worker doing nothing (specifically, this can
happen if the que worker is configured to run with a single thread -- if
that thread dies, the worker will sit idle doing nothing forever).

This commit lets the ActiveRecord adapter handle the case where the
connection could not be estabilished for reasons other than a timeout,
which was already handled. Additionally, the internal
`UnavailableConnection` exception will also now include the original
exception as a nested `.cause` attribute to aid debugging.
  • Loading branch information
ivgiuliani committed Apr 22, 2021
1 parent 2964682 commit 872d49d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
9 changes: 7 additions & 2 deletions lib/que/adapters/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
module Que
module Adapters
class ActiveRecord < Base
AR_UNAVAILABLE_CONNECTION_ERRORS = [
::ActiveRecord::ConnectionTimeoutError,
::ActiveRecord::ConnectionNotEstablished,
].freeze

def checkout
checkout_activerecord_adapter { |conn| yield conn.raw_connection }
rescue ::ActiveRecord::ConnectionTimeoutError => e
raise UnavailableConnection
rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e
raise UnavailableConnection.new(e)
end

def wake_worker_after_commit
Expand Down
15 changes: 13 additions & 2 deletions spec/lib/que/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,24 @@
end
end

context "when we can't checkout a new connection" do
context "when we time out checking out a new connection" do
it "rescues it and returns an error" do
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, ["default", 0]).
and_raise(ActiveRecord::ConnectionTimeoutError)
and_raise(ActiveRecord::ConnectionTimeoutError)
expect(subject).to eq(:postgres_error)
end
end

context "when we can't connect to postgres" do
it "rescues it and returns an error" do
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, ["default", 0]).
and_raise(ActiveRecord::ConnectionNotEstablished)
expect(subject).to eq(:postgres_error)
end
end
Expand Down

0 comments on commit 872d49d

Please sign in to comment.