-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a new adapter to work the job with multiple database #103
Conversation
d92a84a
to
59aab33
Compare
5c6353d
to
f29a8b8
Compare
e1f6f21
to
31bf0fe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done a first pass of the implementation, looking sensible. Suggested some improvements
lib/que/adapters/yugabyte.rb
Outdated
|
||
if locked?(result.first['job_id']) | ||
return result | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An idea I had. We can do something like this to make this a bit more efficient, basically we use the row itself as a temporary lock to prevent a lot of race conditions. This way, a lot of the work is done within the main database itself.
Let me know what you think.
loop do
job_connection.transaction do
# This locks the row only while we are checking the advisory lock database.
# This saves duplicate attempts by different works to acquire the lock.
result = Que.execute("SELECT ... FOR UPDATE SKIPPED LOCK")
return result if result.empty?
if locked?(result.first['job_id'])
return result
end
# lock is released after the advisory lock check as the transaction completes
end
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will have cross database transaction in this case. If that's not an issue then this would be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will have cross database transaction in this case
Cross database transactions are mainly a problem when you're trying to maintain consistency of writes between two databases (eg. if one commits but the other rolls back).
I don't think this is a problem here, so should be safe to ignore: technically acquiring the a lock is a write, but the SELECT FOR UPDATE
is temporary and not the source of truth, and the lock is released at the end of the transaction anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tried this, but for some reason, all the specs are getting hanged. It would require more time to debug what's going on. Will address this separate PR
d872b0e
to
6b3bac9
Compare
30ee504
to
adbbcb6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! Just some minor stylistic stuff and I think it's good to go
lib/que/locker.rb
Outdated
@@ -60,7 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue | |||
@queue_expires_at = {} | |||
@secondary_queues = secondary_queues | |||
@consolidated_queues = Array.wrap(queue).concat(secondary_queues) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing a bunch of diffs that are just changing whitespace/formatting. Should we just get rubocop to standardise all of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: let's keep this empty line? Keeps a separation from the comment
9fb3896
to
4623fab
Compare
cc8d50b
to
a0c4b58
Compare
e51cd42
to
b277323
Compare
b277323
to
22eca7b
Compare
lib/que/locker.rb
Outdated
@@ -60,7 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue | |||
@queue_expires_at = {} | |||
@secondary_queues = secondary_queues | |||
@consolidated_queues = Array.wrap(queue).concat(secondary_queues) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: let's keep this empty line? Keeps a separation from the comment
.github/workflows/tests.yml
Outdated
@@ -96,3 +114,5 @@ jobs: | |||
- name: Run specs | |||
run: | | |||
bundle exec rspec | |||
- name: Run Specs With ActiveRecordWithLock Adapter | |||
run: ADAPTER="ActiveRecordWithLock" bundle exec rspec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this a different job? That way we can put the ADAPTER
in its own env, and it's useful to see that the specs passed for one adapter and not for the other, rather than failing the whole job
Let's also squash commits before merging, there's quite a few! |
fb78ba2
to
31aed1a
Compare
Can I ask why you're working on this? Do you have evidence that the advisory locking is significantly slowing down your database? Given advisory locking is in-memory, I'd've thought it'd be pretty inconsequential in terms of load. |
This change here is to support Que functionality on a database that does not support advisory locking. This PR adds a new Que adapter where the jobs will be processed on 1 database and an advisory lock will be taken on the small database instance that supports the advisory locking |
…t does not support advisory locking What? This is an attempt to use que with the database that does not support advisory locking. It will use 2 databases. The primary one will be processing the jobs and uses a 2nd database to acquire the advisory lock on the job.
31aed1a
to
99061e9
Compare
What?
Added a new adapter which uses 2 databases to work the job.
database 1 will have the job enqueued and will fetch the jobs to be worked
database 2 will be used to acquire the advisory lock on the job.
This adapter can be used with a database that does not support advisory locking. In the above scenario, database 1 does not support advisory locking.