Skip to content

Commit

Permalink
fix: cleanup ttl expire run graphile jobs (#1373)
Browse files Browse the repository at this point in the history
* fix: remove ttl expire run graphile jobs when a run is started or completed

* Update expireEnqueuedRun.server.ts
  • Loading branch information
ericallam authored Oct 2, 2024
1 parent 0bf500f commit c531a9d
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 91 deletions.
16 changes: 10 additions & 6 deletions apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { parsePacket, TaskRunExecution } from "@trigger.dev/core/v3";
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { reportInvocationUsage } from "~/services/platform.v3.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
import { machinePresetFromConfig } from "../machinePresets.server";
import { workerQueue } from "~/services/worker.server";
import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { CrashTaskRunService } from "./crashTaskRun.server";
import { reportInvocationUsage } from "~/services/platform.v3.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";

export class CreateTaskRunAttemptService extends BaseService {
public async call(
Expand Down Expand Up @@ -139,6 +139,10 @@ export class CreateTaskRunAttemptService extends BaseService {
status: "EXECUTING",
},
});

if (taskRun.ttl) {
await ExpireEnqueuedRunService.dequeue(taskRun.id, tx);
}
}

return taskRunAttempt;
Expand Down
10 changes: 3 additions & 7 deletions apps/webapp/app/v3/services/enqueueDelayedRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { $transaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { parseNaturalLanguageDuration } from "./triggerTask.server";
import { workerQueue } from "~/services/worker.server";
import { $transaction } from "~/db.server";

export class EnqueueDelayedRunService extends BaseService {
public async call(runId: string) {
Expand Down Expand Up @@ -52,11 +52,7 @@ export class EnqueueDelayedRunService extends BaseService {
const expireAt = parseNaturalLanguageDuration(run.ttl);

if (expireAt) {
await workerQueue.enqueue(
"v3.expireRun",
{ runId: run.id },
{ tx, runAt: expireAt, jobKey: `v3.expireRun.${run.id}` }
);
await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx);
}
}
});
Expand Down
14 changes: 14 additions & 0 deletions apps/webapp/app/v3/services/expireEnqueuedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,22 @@ import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { eventRepository } from "../eventRepository.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { workerQueue } from "~/services/worker.server";
import { PrismaClientOrTransaction } from "~/db.server";

export class ExpireEnqueuedRunService extends BaseService {
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx });
}

public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
return await workerQueue.enqueue(
"v3.expireRun",
{ runId },
{ runAt, jobKey: `v3.expireRun:${runId}`, tx }
);
}

public async call(runId: string) {
const run = await this._prisma.taskRun.findUnique({
where: {
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/v3/services/finalizeTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, type FINAL_RUN_STATUSES } fr
import { PerformTaskRunAlertsService } from "./alerts/performTaskRunAlerts.server";
import { BaseService } from "./baseService.server";
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";

type BaseInput = {
id: string;
Expand Down Expand Up @@ -62,6 +63,10 @@ export class FinalizeTaskRunService extends BaseService {
...(include ? { include } : {}),
});

if (run.ttl) {
await ExpireEnqueuedRunService.dequeue(run.id);
}

if (attemptStatus || error) {
await this.finalizeAttempt({ attemptStatus, error, run });
}
Expand Down
7 changes: 2 additions & 5 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
import { handleMetadataPacket } from "~/utils/packets";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -435,11 +436,7 @@ export class TriggerTaskService extends BaseService {
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);

if (expireAt) {
await workerQueue.enqueue(
"v3.expireRun",
{ runId: taskRun.id },
{ tx, runAt: expireAt, jobKey: `v3.expireRun.${taskRun.id}` }
);
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx);
}
}

Expand Down
2 changes: 1 addition & 1 deletion references/v3-catalog/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"management": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/management.ts",
"queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts",
"build:client": "tsup-node ./src/clientUsage.ts --format esm,cjs",
"client": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/clientUsage.ts",
"client": "tsx -r dotenv/config ./src/clientUsage.ts",
"triggerWithLargePayload": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/triggerWithLargePayload.ts",
"generate:prisma": "prisma generate --sql"
},
Expand Down
81 changes: 10 additions & 71 deletions references/v3-catalog/src/clientUsage.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { tasks, runs, TaskOutput, TaskPayload, TaskIdentifier } from "@trigger.dev/sdk/v3";
import { createJsonHeroDoc } from "./trigger/simple.js";

type createJsonHeroDocPayload = TaskPayload<typeof createJsonHeroDoc>; // retrieves the payload type of the task
type createJsonHeroDocOutput = TaskOutput<typeof createJsonHeroDoc>; // retrieves the output type of the task
type createJsonHeroDocIdentifier = TaskIdentifier<typeof createJsonHeroDoc>; // retrieves the identifier of the task
import { tasks } from "@trigger.dev/sdk/v3";

async function main() {
const anyHandle = await tasks.trigger(
await tasks.trigger(
"create-jsonhero-doc",
{
title: "Hello World",
Expand All @@ -15,78 +10,22 @@ async function main() {
},
},
{
delay: "1m",
ttl: "1m",
}
);

const anyRun = await runs.retrieve(anyHandle);

console.log(`Run ${anyHandle.id} status: ${anyRun.status}, ttl: ${anyRun.ttl}`, anyRun.output);

const typedRun = await runs.retrieve<typeof createJsonHeroDoc>(anyHandle.id);

console.log(`Run ${anyHandle.id} status: ${typedRun.status}`, typedRun.output);

await new Promise((resolve) => setTimeout(resolve, 121000)); // wait for 2 minutes

const expiredRun = await runs.retrieve(anyRun.id);

console.log(
`Run ${anyHandle.id} status: ${expiredRun.status}, expired at: ${expiredRun.expiredAt}`,
expiredRun.output
);

const handle = await tasks.trigger<typeof createJsonHeroDoc>("create-jsonhero-doc", {
title: "Hello World",
content: {
message: "Hello, World!",
},
});

console.log(handle);

const typedRetrieveRun = await runs.retrieve(handle);

console.log(`Run ${handle.id} status: ${typedRetrieveRun.status}`, typedRetrieveRun.output);

const completedRun = await runs.poll(handle, { pollIntervalMs: 100 });

console.log(`Run ${handle.id} completed with output:`, completedRun.output);

const run = await tasks.triggerAndPoll<typeof createJsonHeroDoc>("create-jsonhero-doc", {
title: "Hello World",
content: {
message: "Hello, World!",
},
});

console.log(`Run ${run.id} completed with output: `, run.output);

const batchHandle = await tasks.batchTrigger<typeof createJsonHeroDoc>("create-jsonhero-doc", [
await tasks.trigger(
"create-jsonhero-doc",
{
payload: {
title: "Hello World",
content: {
message: "Hello, World!",
},
title: "Hello World",
content: {
message: "Hello, World!",
},
},
{
payload: {
title: "Hello World 2",
content: {
message: "Hello, World 2!",
},
},
},
]);

const firstRunHandle = batchHandle.runs[0];

const run2 = await runs.retrieve(firstRunHandle);

console.log(`Run ${run2.id} completed with output: `, run2.output);
ttl: "1m",
}
);
}

main().catch(console.error);
5 changes: 4 additions & 1 deletion references/v3-catalog/src/trigger/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ export const taskWithSpecialCharacters = task({

export const createJsonHeroDoc = task({
id: "create-jsonhero-doc",
queue: {
concurrencyLimit: 1,
},
run: async (payload: { title: string; content: any }, { ctx }) => {
// Sleep for 5 seconds
await wait.for({ seconds: 5 });
await wait.for({ seconds: 30 });

const response = await fetch("https://jsonhero.io/api/create.json", {
method: "POST",
Expand Down

0 comments on commit c531a9d

Please sign in to comment.