-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
re-enqueue pending workflows during recovery #739
base: main
Are you sure you want to change the base?
Conversation
src/wfqueue.ts
Outdated
logger.info("Workflow queues:"); | ||
for (const [qn, q] of this.wfQueuesByName) { | ||
const conc = q.concurrency !== undefined ? `${q.concurrency}` : 'No concurrency limit set'; | ||
const conc = q.concurrency !== undefined ? `global concurrency limit: ${q.concurrency}` : 'No concurrency limit set'; | ||
logger.info(` ${qn}: ${conc}`); | ||
const workerconc = q.workerConcurrency !== undefined ? `worker concurrency limit: ${q.workerConcurrency}` : 'No worker concurrency limit set'; | ||
logger.info(` ${qn}: ${workerconc}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoliakov btw I noticed these logs are not in order in the dashboard. I assume due to timestamps granularity. (And also concurrency because the workflow queue runner microTask goes to sleep periodically.)
I am having a real problem understanding what this is supposed to accomplish, and that the test shows that it is accomplished. OK, so it says it will change the database record contents and it does. (But why though?) This is a reverse state change (PENDING->ENQUEUED, where the prior invariant was that it always went the other way) and the consequences of that are not really tested. |
What this is doing is ensuring that, upon explicit runs of the recovery logic, we don't immediately start executing workflows otherwise assigned to a queue. This effectively re-enqueue them at the exact place they were. The reason we are doing that is that the current logic violates concurrency limits for a queue (specifically, with a given worker being able to process queue tasks from two threads: the queue thread itself and the recovery thread.) Are worried about the FIFO properties of the queue? At this point the task has already been dequeued and workers might have dequeued other tasks (within the limits of concurrency if any). So we have no way to enforce FIFO for recovered workflows, with the current queue semantics. What this does is re-enqueue with the "highest" priority (i.e., leave the enqueue time unchanged such that the task will be dequeued first at the next iteration.) We could decide to re-enqueue with a new created_at value (place the recovered task at the end of the queue.) What the test does is verify that the task is indeed cleared from its assignment by the recovery code, dequeued again and executed. We could make the test more complex and ensuring that concurrency is respected, but I trust this is already covered by the existing worker concurrency tests. Are you suggesting I missed places of the code which rely on PENDING workflows never being ENQUEUED again? |
77b11dc
to
1229320
Compare
0dba40b
to
0f21f36
Compare
@@ -325,7 +327,7 @@ export class PostgresSystemDatabase implements SystemDatabase { | |||
// Every time we init the status, we increment `recovery_attempts` by 1. | |||
// Thus, when this number becomes equal to `maxRetries + 1`, we should mark the workflow as `RETRIES_EXCEEDED`. | |||
const attempts = resRow.recovery_attempts; | |||
if (attempts > initStatus.maxRetries) { | |||
if (attempts > initStatus.maxRetries + 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented "max attempts", instead of "max recoveries" in the last TS PR.
When performing recovery, we now re-enqueue workflows that came from a queue. This allows tasks from a queue to respect the concurrency limits.
Re-enqueue = reset the start time and executor assignment in the queue table. This ensures the task is re-inserted in the same position in the queue.