From c531a9d986d13dc9eae437a8a229422a0a1b96f2 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 2 Oct 2024 15:30:39 -0700 Subject: [PATCH] fix: cleanup ttl expire run graphile jobs (#1373) * fix: remove ttl expire run graphile jobs when a run is started or completed * Update expireEnqueuedRun.server.ts --- .../services/createTaskRunAttempt.server.ts | 16 ++-- .../v3/services/enqueueDelayedRun.server.ts | 10 +-- .../v3/services/expireEnqueuedRun.server.ts | 14 ++++ .../app/v3/services/finalizeTaskRun.server.ts | 5 ++ .../app/v3/services/triggerTask.server.ts | 7 +- references/v3-catalog/package.json | 2 +- references/v3-catalog/src/clientUsage.ts | 81 +++---------------- references/v3-catalog/src/trigger/simple.ts | 5 +- 8 files changed, 49 insertions(+), 91 deletions(-) diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index 0d7660e482..bab14b3356 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -1,15 +1,15 @@ import { parsePacket, TaskRunExecution } from "@trigger.dev/core/v3"; -import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server"; +import { TaskRun, TaskRunAttempt } from "@trigger.dev/database"; +import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; +import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { reportInvocationUsage } from "~/services/platform.v3.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; -import { BaseService, ServiceValidationError } from "./baseService.server"; -import { TaskRun, TaskRunAttempt } from "@trigger.dev/database"; import { machinePresetFromConfig } from "../machinePresets.server"; -import { workerQueue } from "~/services/worker.server"; -import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; +import { BaseService, ServiceValidationError } from "./baseService.server"; import { CrashTaskRunService } from "./crashTaskRun.server"; -import { reportInvocationUsage } from "~/services/platform.v3.server"; +import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; export class CreateTaskRunAttemptService extends BaseService { public async call( @@ -139,6 +139,10 @@ export class CreateTaskRunAttemptService extends BaseService { status: "EXECUTING", }, }); + + if (taskRun.ttl) { + await ExpireEnqueuedRunService.dequeue(taskRun.id, tx); + } } return taskRunAttempt; diff --git a/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts b/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts index 12beb589f5..f8d8c82f76 100644 --- a/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts +++ b/apps/webapp/app/v3/services/enqueueDelayedRun.server.ts @@ -1,9 +1,9 @@ +import { $transaction } from "~/db.server"; import { logger } from "~/services/logger.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; +import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { parseNaturalLanguageDuration } from "./triggerTask.server"; -import { workerQueue } from "~/services/worker.server"; -import { $transaction } from "~/db.server"; export class EnqueueDelayedRunService extends BaseService { public async call(runId: string) { @@ -52,11 +52,7 @@ export class EnqueueDelayedRunService extends BaseService { const expireAt = parseNaturalLanguageDuration(run.ttl); if (expireAt) { - await workerQueue.enqueue( - "v3.expireRun", - { runId: run.id }, - { tx, runAt: expireAt, jobKey: `v3.expireRun.${run.id}` } - ); + await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx); } } }); diff --git a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts index 0f3cbfb4ab..0c4b2be1fb 100644 --- a/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts +++ b/apps/webapp/app/v3/services/expireEnqueuedRun.server.ts @@ -2,8 +2,22 @@ import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { eventRepository } from "../eventRepository.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; +import { workerQueue } from "~/services/worker.server"; +import { PrismaClientOrTransaction } from "~/db.server"; export class ExpireEnqueuedRunService extends BaseService { + public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) { + return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx }); + } + + public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) { + return await workerQueue.enqueue( + "v3.expireRun", + { runId }, + { runAt, jobKey: `v3.expireRun:${runId}`, tx } + ); + } + public async call(runId: string) { const run = await this._prisma.taskRun.findUnique({ where: { diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index ea8cf87848..797e802382 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -7,6 +7,7 @@ import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, type FINAL_RUN_STATUSES } fr import { PerformTaskRunAlertsService } from "./alerts/performTaskRunAlerts.server"; import { BaseService } from "./baseService.server"; import { ResumeDependentParentsService } from "./resumeDependentParents.server"; +import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; type BaseInput = { id: string; @@ -62,6 +63,10 @@ export class FinalizeTaskRunService extends BaseService { ...(include ? { include } : {}), }); + if (run.ttl) { + await ExpireEnqueuedRunService.dequeue(run.id); + } + if (attemptStatus || error) { await this.finalizeAttempt({ attemptStatus, error, run }); } diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index b7743e4503..2eaf769b27 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -21,6 +21,7 @@ import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; import { handleMetadataPacket } from "~/utils/packets"; +import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -435,11 +436,7 @@ export class TriggerTaskService extends BaseService { const expireAt = parseNaturalLanguageDuration(taskRun.ttl); if (expireAt) { - await workerQueue.enqueue( - "v3.expireRun", - { runId: taskRun.id }, - { tx, runAt: expireAt, jobKey: `v3.expireRun.${taskRun.id}` } - ); + await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx); } } diff --git a/references/v3-catalog/package.json b/references/v3-catalog/package.json index ff1c63c5d7..8f23f7b6f2 100644 --- a/references/v3-catalog/package.json +++ b/references/v3-catalog/package.json @@ -11,7 +11,7 @@ "management": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/management.ts", "queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts", "build:client": "tsup-node ./src/clientUsage.ts --format esm,cjs", - "client": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/clientUsage.ts", + "client": "tsx -r dotenv/config ./src/clientUsage.ts", "triggerWithLargePayload": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/triggerWithLargePayload.ts", "generate:prisma": "prisma generate --sql" }, diff --git a/references/v3-catalog/src/clientUsage.ts b/references/v3-catalog/src/clientUsage.ts index a7e018a47d..0535af0e85 100644 --- a/references/v3-catalog/src/clientUsage.ts +++ b/references/v3-catalog/src/clientUsage.ts @@ -1,12 +1,7 @@ -import { tasks, runs, TaskOutput, TaskPayload, TaskIdentifier } from "@trigger.dev/sdk/v3"; -import { createJsonHeroDoc } from "./trigger/simple.js"; - -type createJsonHeroDocPayload = TaskPayload; // retrieves the payload type of the task -type createJsonHeroDocOutput = TaskOutput; // retrieves the output type of the task -type createJsonHeroDocIdentifier = TaskIdentifier; // retrieves the identifier of the task +import { tasks } from "@trigger.dev/sdk/v3"; async function main() { - const anyHandle = await tasks.trigger( + await tasks.trigger( "create-jsonhero-doc", { title: "Hello World", @@ -15,78 +10,22 @@ async function main() { }, }, { - delay: "1m", ttl: "1m", } ); - const anyRun = await runs.retrieve(anyHandle); - - console.log(`Run ${anyHandle.id} status: ${anyRun.status}, ttl: ${anyRun.ttl}`, anyRun.output); - - const typedRun = await runs.retrieve(anyHandle.id); - - console.log(`Run ${anyHandle.id} status: ${typedRun.status}`, typedRun.output); - - await new Promise((resolve) => setTimeout(resolve, 121000)); // wait for 2 minutes - - const expiredRun = await runs.retrieve(anyRun.id); - - console.log( - `Run ${anyHandle.id} status: ${expiredRun.status}, expired at: ${expiredRun.expiredAt}`, - expiredRun.output - ); - - const handle = await tasks.trigger("create-jsonhero-doc", { - title: "Hello World", - content: { - message: "Hello, World!", - }, - }); - - console.log(handle); - - const typedRetrieveRun = await runs.retrieve(handle); - - console.log(`Run ${handle.id} status: ${typedRetrieveRun.status}`, typedRetrieveRun.output); - - const completedRun = await runs.poll(handle, { pollIntervalMs: 100 }); - - console.log(`Run ${handle.id} completed with output:`, completedRun.output); - - const run = await tasks.triggerAndPoll("create-jsonhero-doc", { - title: "Hello World", - content: { - message: "Hello, World!", - }, - }); - - console.log(`Run ${run.id} completed with output: `, run.output); - - const batchHandle = await tasks.batchTrigger("create-jsonhero-doc", [ + await tasks.trigger( + "create-jsonhero-doc", { - payload: { - title: "Hello World", - content: { - message: "Hello, World!", - }, + title: "Hello World", + content: { + message: "Hello, World!", }, }, { - payload: { - title: "Hello World 2", - content: { - message: "Hello, World 2!", - }, - }, - }, - ]); - - const firstRunHandle = batchHandle.runs[0]; - - const run2 = await runs.retrieve(firstRunHandle); - - console.log(`Run ${run2.id} completed with output: `, run2.output); + ttl: "1m", + } + ); } main().catch(console.error); diff --git a/references/v3-catalog/src/trigger/simple.ts b/references/v3-catalog/src/trigger/simple.ts index d6d46092e4..1d5786d727 100644 --- a/references/v3-catalog/src/trigger/simple.ts +++ b/references/v3-catalog/src/trigger/simple.ts @@ -71,9 +71,12 @@ export const taskWithSpecialCharacters = task({ export const createJsonHeroDoc = task({ id: "create-jsonhero-doc", + queue: { + concurrencyLimit: 1, + }, run: async (payload: { title: string; content: any }, { ctx }) => { // Sleep for 5 seconds - await wait.for({ seconds: 5 }); + await wait.for({ seconds: 30 }); const response = await fetch("https://jsonhero.io/api/create.json", { method: "POST",