Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-enqueue pending workflows during recovery #739

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

maxdml
Copy link
Contributor

@maxdml maxdml commented Jan 29, 2025

When performing recovery, we now re-enqueue workflows that came from a queue. This allows tasks from a queue to respect the concurrency limits.
Re-enqueue = reset the start time and executor assignment in the queue table. This ensures the task is re-inserted in the same position in the queue.

src/dbos-executor.ts Outdated Show resolved Hide resolved
@maxdml maxdml marked this pull request as ready for review January 29, 2025 23:12
src/wfqueue.ts Outdated
Comment on lines 125 to 130
logger.info("Workflow queues:");
for (const [qn, q] of this.wfQueuesByName) {
const conc = q.concurrency !== undefined ? `${q.concurrency}` : 'No concurrency limit set';
const conc = q.concurrency !== undefined ? `global concurrency limit: ${q.concurrency}` : 'No concurrency limit set';
logger.info(` ${qn}: ${conc}`);
const workerconc = q.workerConcurrency !== undefined ? `worker concurrency limit: ${q.workerConcurrency}` : 'No worker concurrency limit set';
logger.info(` ${qn}: ${workerconc}`);
Copy link
Contributor Author

@maxdml maxdml Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoliakov btw I noticed these logs are not in order in the dashboard. I assume due to timestamps granularity. (And also concurrency because the workflow queue runner microTask goes to sleep periodically.)

Screenshot 2025-01-29 at 22 08 52

src/system_database.ts Outdated Show resolved Hide resolved
@qianl15 qianl15 requested a review from chuck-dbos January 30, 2025 19:46
src/system_database.ts Outdated Show resolved Hide resolved
@chuck-dbos
Copy link
Collaborator

I am having a real problem understanding what this is supposed to accomplish, and that the test shows that it is accomplished.

OK, so it says it will change the database record contents and it does. (But why though?) This is a reverse state change (PENDING->ENQUEUED, where the prior invariant was that it always went the other way) and the consequences of that are not really tested.

@maxdml
Copy link
Contributor Author

maxdml commented Jan 30, 2025

I am having a real problem understanding what this is supposed to accomplish, and that the test shows that it is accomplished.

OK, so it says it will change the database record contents and it does. (But why though?) This is a reverse state change (PENDING->ENQUEUED, where the prior invariant was that it always went the other way) and the consequences of that are not really tested.

What this is doing is ensuring that, upon explicit runs of the recovery logic, we don't immediately start executing workflows otherwise assigned to a queue. This effectively re-enqueue them at the exact place they were. The reason we are doing that is that the current logic violates concurrency limits for a queue (specifically, with a given worker being able to process queue tasks from two threads: the queue thread itself and the recovery thread.)

Are worried about the FIFO properties of the queue? At this point the task has already been dequeued and workers might have dequeued other tasks (within the limits of concurrency if any). So we have no way to enforce FIFO for recovered workflows, with the current queue semantics.

What this does is re-enqueue with the "highest" priority (i.e., leave the enqueue time unchanged such that the task will be dequeued first at the next iteration.) We could decide to re-enqueue with a new created_at value (place the recovered task at the end of the queue.)

What the test does is verify that the task is indeed cleared from its assignment by the recovery code, dequeued again and executed. We could make the test more complex and ensuring that concurrency is respected, but I trust this is already covered by the existing worker concurrency tests.

Are you suggesting I missed places of the code which rely on PENDING workflows never being ENQUEUED again?

@maxdml maxdml marked this pull request as draft February 4, 2025 02:05
@maxdml maxdml force-pushed the fix-queue-recovery branch from 77b11dc to 1229320 Compare February 5, 2025 18:51
@maxdml maxdml force-pushed the fix-queue-recovery branch from 0dba40b to 0f21f36 Compare February 6, 2025 22:07
@@ -325,7 +327,7 @@ export class PostgresSystemDatabase implements SystemDatabase {
// Every time we init the status, we increment `recovery_attempts` by 1.
// Thus, when this number becomes equal to `maxRetries + 1`, we should mark the workflow as `RETRIES_EXCEEDED`.
const attempts = resRow.recovery_attempts;
if (attempts > initStatus.maxRetries) {
if (attempts > initStatus.maxRetries + 1) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented "max attempts", instead of "max recoveries" in the last TS PR.

@maxdml maxdml marked this pull request as ready for review February 7, 2025 02:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants