diff --git a/.changeset/tiny-forks-remember.md b/.changeset/tiny-forks-remember.md new file mode 100644 index 0000000000..f194551c12 --- /dev/null +++ b/.changeset/tiny-forks-remember.md @@ -0,0 +1,7 @@ +--- +"@trigger.dev/sdk": patch +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Adding maxDuration to tasks to allow timing out runs after they exceed a certain number of seconds diff --git a/apps/webapp/app/assets/icons/TimedOutIcon.tsx b/apps/webapp/app/assets/icons/TimedOutIcon.tsx new file mode 100644 index 0000000000..3ad34e698c --- /dev/null +++ b/apps/webapp/app/assets/icons/TimedOutIcon.tsx @@ -0,0 +1,19 @@ +export function TimedOutIcon({ className }: { className?: string }) { + return ( + + + + ); +} diff --git a/apps/webapp/app/components/primitives/Select.tsx b/apps/webapp/app/components/primitives/Select.tsx index 058354e1b8..127b744aa1 100644 --- a/apps/webapp/app/components/primitives/Select.tsx +++ b/apps/webapp/app/components/primitives/Select.tsx @@ -613,7 +613,7 @@ export function SelectPopover({ "z-50 flex flex-col overflow-clip rounded border border-charcoal-700 bg-background-bright shadow-md outline-none animate-in fade-in-40", "min-w-[max(180px,calc(var(--popover-anchor-width)+0.5rem))]", "max-w-[min(480px,var(--popover-available-width))]", - "max-h-[min(480px,var(--popover-available-height))]", + "max-h-[min(520px,var(--popover-available-height))]", "origin-[var(--popover-transform-origin)]", className )} diff --git a/apps/webapp/app/components/runs/v3/RunInspector.tsx b/apps/webapp/app/components/runs/v3/RunInspector.tsx index e057b94265..16e98e5b46 100644 --- a/apps/webapp/app/components/runs/v3/RunInspector.tsx +++ b/apps/webapp/app/components/runs/v3/RunInspector.tsx @@ -324,6 +324,12 @@ export function RunInspector({ )} + + Max duration + + {run.maxDurationInSeconds ? `${run.maxDurationInSeconds}s` : "–"} + + Run invocation cost diff --git a/apps/webapp/app/components/runs/v3/TaskRunStatus.tsx b/apps/webapp/app/components/runs/v3/TaskRunStatus.tsx index 880093e2eb..cd80e17ee4 100644 --- a/apps/webapp/app/components/runs/v3/TaskRunStatus.tsx +++ b/apps/webapp/app/components/runs/v3/TaskRunStatus.tsx @@ -8,12 +8,14 @@ import { NoSymbolIcon, PauseCircleIcon, RectangleStackIcon, + StopIcon, TrashIcon, XCircleIcon, } from "@heroicons/react/20/solid"; import { TaskRunStatus } from "@trigger.dev/database"; import assertNever from "assert-never"; import { SnowflakeIcon } from "lucide-react"; +import { TimedOutIcon } from "~/assets/icons/TimedOutIcon"; import { Spinner } from "~/components/primitives/Spinner"; import { cn } from "~/utils/cn"; @@ -27,6 +29,7 @@ export const allTaskRunStatuses = [ "COMPLETED_SUCCESSFULLY", "CANCELED", "COMPLETED_WITH_ERRORS", + "TIMED_OUT", "CRASHED", "PAUSED", "INTERRUPTED", @@ -44,6 +47,7 @@ export const filterableTaskRunStatuses = [ "COMPLETED_SUCCESSFULLY", "CANCELED", "COMPLETED_WITH_ERRORS", + "TIMED_OUT", "CRASHED", "INTERRUPTED", "SYSTEM_FAILURE", @@ -65,6 +69,7 @@ const taskRunStatusDescriptions: Record = { PAUSED: "Task has been paused by the user", CRASHED: "Task has crashed and won't be retried", EXPIRED: "Task has surpassed its ttl and won't be executed", + TIMED_OUT: "Task has reached its maxDuration and has been stopped", }; export const QUEUED_STATUSES = [ @@ -140,6 +145,8 @@ export function TaskRunStatusIcon({ return ; case "EXPIRED": return ; + case "TIMED_OUT": + return ; default: { assertNever(status); @@ -174,6 +181,8 @@ export function runStatusClassNameColor(status: TaskRunStatus): string { return "text-error"; case "CRASHED": return "text-error"; + case "TIMED_OUT": + return "text-error"; default: { assertNever(status); } @@ -210,6 +219,8 @@ export function runStatusTitle(status: TaskRunStatus): string { return "Crashed"; case "EXPIRED": return "Expired"; + case "TIMED_OUT": + return "Timed out"; default: { assertNever(status); } diff --git a/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx b/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx index 002fa8c233..e31b30f0c5 100644 --- a/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx +++ b/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx @@ -9,11 +9,13 @@ import { BeakerIcon, BookOpenIcon, CheckIcon } from "@heroicons/react/24/solid"; import { useLocation } from "@remix-run/react"; import { formatDuration, formatDurationMilliseconds } from "@trigger.dev/core/v3"; import { useCallback, useRef } from "react"; +import { Badge } from "~/components/primitives/Badge"; import { Button, LinkButton } from "~/components/primitives/Buttons"; import { Checkbox } from "~/components/primitives/Checkbox"; import { Dialog, DialogTrigger } from "~/components/primitives/Dialog"; import { Header3 } from "~/components/primitives/Headers"; import { useSelectedItems } from "~/components/primitives/SelectedItemsProvider"; +import { SimpleTooltip } from "~/components/primitives/Tooltip"; import { useEnvironments } from "~/hooks/useEnvironments"; import { useFeatures } from "~/hooks/useFeatures"; import { useOrganization } from "~/hooks/useOrganizations"; @@ -39,9 +41,12 @@ import { import { CancelRunDialog } from "./CancelRunDialog"; import { LiveTimer } from "./LiveTimer"; import { ReplayRunDialog } from "./ReplayRunDialog"; -import { TaskRunStatusCombo } from "./TaskRunStatus"; import { RunTag } from "./RunTag"; -import { Badge } from "~/components/primitives/Badge"; +import { + descriptionForTaskRunStatus, + filterableTaskRunStatuses, + TaskRunStatusCombo, +} from "./TaskRunStatus"; type RunsTableProps = { total: number; @@ -126,7 +131,27 @@ export function TaskRunsTable({ Env Task Version - Status + + {filterableTaskRunStatuses.map((status) => ( +
+
+ +
+ + {descriptionForTaskRunStatus(status)} + +
+ ))} + + } + > + Status +
Started {run.version ?? "–"} - + } + /> {run.startedAt ? : "–"} diff --git a/apps/webapp/app/database-types.ts b/apps/webapp/app/database-types.ts index 7ea0766ca4..aa1ac1d3c9 100644 --- a/apps/webapp/app/database-types.ts +++ b/apps/webapp/app/database-types.ts @@ -42,6 +42,7 @@ export const TaskRunStatus = { CRASHED: "CRASHED", DELAYED: "DELAYED", EXPIRED: "EXPIRED", + TIMED_OUT: "TIMED_OUT", } as const satisfies Record; export const JobRunStatus = { diff --git a/apps/webapp/app/models/taskRun.server.ts b/apps/webapp/app/models/taskRun.server.ts index 0c78b25ba3..f458dcb573 100644 --- a/apps/webapp/app/models/taskRun.server.ts +++ b/apps/webapp/app/models/taskRun.server.ts @@ -119,6 +119,7 @@ export function batchTaskRunItemStatusForRunStatus( case TaskRunStatus.SYSTEM_FAILURE: case TaskRunStatus.CRASHED: case TaskRunStatus.EXPIRED: + case TaskRunStatus.TIMED_OUT: return BatchTaskRunItemStatus.FAILED; case TaskRunStatus.PENDING: case TaskRunStatus.WAITING_FOR_DEPLOY: diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 90977f3442..dd9d024609 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -270,6 +270,9 @@ export class ApiRetrieveRunPresenter extends BasePresenter { case "EXPIRED": { return "EXPIRED"; } + case "TIMED_OUT": { + return "TIMED_OUT"; + } default: { assertNever(status); } diff --git a/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts index 9d87918ba2..43400a8bb7 100644 --- a/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts @@ -310,6 +310,9 @@ export class ApiRunListPresenter extends BasePresenter { case "EXPIRED": { return "EXPIRED"; } + case "TIMED_OUT": { + return "TIMED_OUT"; + } default: { assertNever(status); } diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index fd0c2ce4a5..6af0f09de1 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -9,6 +9,7 @@ import { eventRepository } from "~/v3/eventRepository.server"; import { machinePresetFromName } from "~/v3/machinePresets.server"; import { FINAL_ATTEMPT_STATUSES, isFinalRunStatus } from "~/v3/taskStatus"; import { BasePresenter } from "./basePresenter.server"; +import { getMaxDuration } from "~/v3/utils/maxDuration"; type Result = Awaited>; export type Span = NonNullable["span"]>; @@ -69,6 +70,7 @@ export class SpanPresenter extends BasePresenter { taskIdentifier: true, friendlyId: true, isTest: true, + maxDurationInSeconds: true, tags: { select: { name: true, @@ -229,6 +231,7 @@ export class SpanPresenter extends BasePresenter { baseCostInCents: run.baseCostInCents, maxAttempts: run.maxAttempts ?? undefined, version: run.lockedToVersion?.version, + maxDuration: run.maxDurationInSeconds ?? undefined, }, queue: { name: run.queue, @@ -307,6 +310,7 @@ export class SpanPresenter extends BasePresenter { }, context: JSON.stringify(context, null, 2), metadata, + maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds), }; } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx index cb69864b54..b1d3a7f11e 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx @@ -677,7 +677,12 @@ function RunBody({ )}
- + + Max duration + + {run.maxDurationInSeconds ? `${run.maxDurationInSeconds}s` : "–"} + + Run invocation cost diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index 4e80c9804c..0dc162c638 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -24,6 +24,7 @@ import { tracer, } from "../tracer.server"; import { DevSubscriber, devPubSub } from "./devPubSub.server"; +import { getMaxDuration } from "../utils/maxDuration"; const MessageBody = z.discriminatedUnion("type", [ z.object({ @@ -378,6 +379,10 @@ export class DevQueueConsumer { status: "EXECUTING", lockedToVersionId: backgroundWorker.id, startedAt: existingTaskRun.startedAt ?? new Date(), + maxDurationInSeconds: getMaxDuration( + existingTaskRun.maxDurationInSeconds, + backgroundTask.maxDurationInSeconds + ), }, include: { attempts: { diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index c4d5282f30..92762629e2 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -44,6 +44,7 @@ import { EnvironmentVariable } from "../environmentVariables/repository"; import { machinePresetFromConfig } from "../machinePresets.server"; import { env } from "~/env.server"; import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; +import { getMaxDuration } from "../utils/maxDuration"; const WithTraceContext = z.object({ traceparent: z.string().optional(), @@ -403,6 +404,10 @@ export class SharedQueueConsumer { startedAt: existingTaskRun.startedAt ?? new Date(), baseCostInCents: env.CENTS_PER_RUN, machinePreset: machinePresetFromConfig(backgroundTask.machineConfig ?? {}).name, + maxDurationInSeconds: getMaxDuration( + existingTaskRun.maxDurationInSeconds, + backgroundTask.maxDurationInSeconds + ), }, include: { runtimeEnvironment: true, @@ -1067,6 +1072,7 @@ class SharedQueueTasks { costInCents: taskRun.costInCents, baseCostInCents: taskRun.baseCostInCents, metadata, + maxDuration: taskRun.maxDurationInSeconds ?? undefined, }, queue: { id: queue.friendlyId, diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/requeueTaskRun.server.ts index 0ca8497e0b..8228829e87 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/requeueTaskRun.server.ts @@ -70,6 +70,7 @@ export class RequeueTaskRunService extends BaseService { case "COMPLETED_WITH_ERRORS": case "COMPLETED_SUCCESSFULLY": case "EXPIRED": + case "TIMED_OUT": case "CANCELED": { logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun }); diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 7ec8a4c03a..3df51c5cf9 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -363,10 +363,16 @@ export class CompleteAttemptService extends BaseService { }, }); + const status = + sanitizedError.type === "INTERNAL_ERROR" && + sanitizedError.code === "MAX_DURATION_EXCEEDED" + ? "TIMED_OUT" + : "COMPLETED_WITH_ERRORS"; + const finalizeService = new FinalizeTaskRunService(); await finalizeService.call({ id: taskRunAttempt.taskRunId, - status: "COMPLETED_WITH_ERRORS", + status, completedAt: new Date(), }); } diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 478e827812..c9d19fd7f2 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -15,6 +15,7 @@ import { projectPubSub } from "./projectPubSub.server"; import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server"; import cronstrue from "cronstrue"; import { CheckScheduleService } from "./checkSchedule.server"; +import { clampMaxDuration } from "../utils/maxDuration"; export class CreateBackgroundWorkerService extends BaseService { public async call( @@ -156,6 +157,7 @@ export async function createBackgroundTasks( machineConfig: task.machine, triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD", fileId: tasksToBackgroundFiles?.get(task.id) ?? null, + maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, }, }); diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index bab14b3356..32f400d85a 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -196,6 +196,7 @@ export class CreateTaskRunAttemptService extends BaseService { maxAttempts: taskRun.maxAttempts ?? undefined, version: taskRun.lockedBy.worker.version, metadata, + maxDuration: taskRun.maxDurationInSeconds ?? undefined, }, queue: { id: queue.friendlyId, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 477c502388..63b07d285a 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -23,6 +23,7 @@ import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.ser import { handleMetadataPacket } from "~/utils/packets"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; +import { clampMaxDuration } from "../utils/maxDuration"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -373,6 +374,9 @@ export class TriggerTaskService extends BaseService { metadataType: metadataPacket?.dataType, seedMetadata: metadataPacket?.data, seedMetadataType: metadataPacket?.dataType, + maxDurationInSeconds: body.options?.maxDuration + ? clampMaxDuration(body.options.maxDuration) + : undefined, }, }); diff --git a/apps/webapp/app/v3/taskStatus.ts b/apps/webapp/app/v3/taskStatus.ts index a5147fa953..498df8a267 100644 --- a/apps/webapp/app/v3/taskStatus.ts +++ b/apps/webapp/app/v3/taskStatus.ts @@ -41,6 +41,7 @@ export const FINAL_RUN_STATUSES = [ "SYSTEM_FAILURE", "EXPIRED", "CRASHED", + "TIMED_OUT", ] satisfies TaskRunStatus[]; export type FINAL_RUN_STATUSES = (typeof FINAL_RUN_STATUSES)[number]; @@ -96,6 +97,7 @@ export const FAILED_RUN_STATUSES = [ "COMPLETED_WITH_ERRORS", "SYSTEM_FAILURE", "CRASHED", + "TIMED_OUT", ] satisfies TaskRunStatus[]; export function isFailedRunStatus(status: TaskRunStatus): boolean { diff --git a/apps/webapp/app/v3/utils/maxDuration.ts b/apps/webapp/app/v3/utils/maxDuration.ts new file mode 100644 index 0000000000..b19d2786fd --- /dev/null +++ b/apps/webapp/app/v3/utils/maxDuration.ts @@ -0,0 +1,22 @@ +const MINIMUM_MAX_DURATION = 5; +const MAXIMUM_MAX_DURATION = 2_147_483_647; // largest 32-bit signed integer + +export function clampMaxDuration(maxDuration: number): number { + return Math.min(Math.max(maxDuration, MINIMUM_MAX_DURATION), MAXIMUM_MAX_DURATION); +} + +export function getMaxDuration( + maxDuration?: number | null, + defaultMaxDuration?: number | null +): number | undefined { + if (!maxDuration) { + return defaultMaxDuration ?? undefined; + } + + // Setting the maxDuration to MAXIMUM_MAX_DURATION means we don't want to use the default maxDuration + if (maxDuration === MAXIMUM_MAX_DURATION) { + return; + } + + return maxDuration; +} diff --git a/packages/cli-v3/src/commands/init.ts b/packages/cli-v3/src/commands/init.ts index 90a99ffcee..c5c7ba5a5d 100644 --- a/packages/cli-v3/src/commands/init.ts +++ b/packages/cli-v3/src/commands/init.ts @@ -22,7 +22,6 @@ import { wrapCommandAction, } from "../cli/common.js"; import { loadConfig } from "../config.js"; -import { CLOUD_API_URL } from "../consts.js"; import { cliLink } from "../utilities/cliOutput.js"; import { createFileFromTemplate, diff --git a/packages/cli-v3/src/dev/workerRuntime.ts b/packages/cli-v3/src/dev/workerRuntime.ts index 88636363a8..89b311b43b 100644 --- a/packages/cli-v3/src/dev/workerRuntime.ts +++ b/packages/cli-v3/src/dev/workerRuntime.ts @@ -310,6 +310,8 @@ class DevWorkerRuntime implements WorkerRuntime { const execution = attemptResponse.data; + logger.debug("Executing task run lazy attempt", { execution }); + const completion = await this.backgroundWorkerCoordinator.executeTaskRun( id, { execution, traceContext: payload.traceContext, environment: payload.environment }, diff --git a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts index 5bc2dadebe..dfaacb5d73 100644 --- a/packages/cli-v3/src/entryPoints/deploy-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-index-worker.ts @@ -115,6 +115,20 @@ if (typeof config.machine === "string") { }); } +// If the config has a maxDuration, we need to apply it to all tasks that don't have a maxDuration +if (typeof config.maxDuration === "number") { + tasks = tasks.map((task) => { + if (typeof task.maxDuration !== "number") { + return { + ...task, + maxDuration: config.maxDuration, + }; + } + + return task; + }); +} + await sendMessageInCatalog( indexerToWorkerMessages, "INDEX_COMPLETE", diff --git a/packages/cli-v3/src/entryPoints/deploy-run-worker.ts b/packages/cli-v3/src/entryPoints/deploy-run-worker.ts index 9a03922ee9..a30e1c3725 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-worker.ts @@ -14,6 +14,7 @@ import { TriggerTracer, WorkerManifest, ExecutorToWorkerMessageCatalog, + timeout, } from "@trigger.dev/core/v3"; import { ProdRuntimeManager } from "@trigger.dev/core/v3/prod"; import { @@ -29,6 +30,7 @@ import { TracingDiagnosticLogLevel, TracingSDK, usage, + UsageTimeoutManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -82,13 +84,15 @@ const usageEventUrl = getEnvVar("USAGE_EVENT_URL"); const triggerJWT = getEnvVar("TRIGGER_JWT"); const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS"); -const prodUsageManager = new ProdUsageManager(new DevUsageManager(), { +const devUsageManager = new DevUsageManager(); +const prodUsageManager = new ProdUsageManager(devUsageManager, { heartbeatIntervalMs: usageIntervalMs ? parseInt(usageIntervalMs, 10) : undefined, url: usageEventUrl, jwt: triggerJWT, }); usage.setGlobalUsageManager(prodUsageManager); +timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog()); const durableClock = new DurableClock(); @@ -301,19 +305,60 @@ const zodIpc = new ZodIpcConnection({ const measurement = usage.start(); - const { result } = await executor.execute(execution, metadata, traceContext, measurement); + // This lives outside of the executor because this will eventually be moved to the controller level + const signal = execution.run.maxDuration + ? timeout.abortAfterTimeout(execution.run.maxDuration) + : undefined; + + signal?.addEventListener("abort", async (e) => { + if (_isRunning) { + _isRunning = false; + _execution = undefined; + + const usageSample = usage.stop(measurement); + + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED, + message: + signal.reason instanceof Error + ? signal.reason.message + : String(signal.reason), + }, + usage: { + durationMs: usageSample.cpuTime, + }, + }, + }); + } + }); + + const { result } = await executor.execute( + execution, + metadata, + traceContext, + measurement, + signal + ); const usageSample = usage.stop(measurement); - return sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ...result, - usage: { - durationMs: usageSample.cpuTime, + if (_isRunning) { + return sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ...result, + usage: { + durationMs: usageSample.cpuTime, + }, }, - }, - }); + }); + } } finally { _execution = undefined; _isRunning = false; diff --git a/packages/cli-v3/src/entryPoints/dev-index-worker.ts b/packages/cli-v3/src/entryPoints/dev-index-worker.ts index 8b3882e6c2..0420665907 100644 --- a/packages/cli-v3/src/entryPoints/dev-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-index-worker.ts @@ -95,9 +95,23 @@ async function bootstrap() { }; } -const { buildManifest, importErrors } = await bootstrap(); +const { buildManifest, importErrors, config } = await bootstrap(); -const tasks = taskCatalog.listTaskManifests(); +let tasks = taskCatalog.listTaskManifests(); + +// If the config has a maxDuration, we need to apply it to all tasks that don't have a maxDuration +if (typeof config.maxDuration === "number") { + tasks = tasks.map((task) => { + if (typeof task.maxDuration !== "number") { + return { + ...task, + maxDuration: config.maxDuration, + }; + } + + return task; + }); +} await sendMessageInCatalog( indexerToWorkerMessages, diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 49123aab13..05f7ffe493 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -14,11 +14,13 @@ import { TriggerTracer, WorkerManifest, ExecutorToWorkerMessageCatalog, + timeout, } from "@trigger.dev/core/v3"; import { DevRuntimeManager } from "@trigger.dev/core/v3/dev"; import { ConsoleInterceptor, DevUsageManager, + UsageTimeoutManager, DurableClock, getEnvVar, logLevels, @@ -72,9 +74,11 @@ process.on("uncaughtException", function (error, origin) { taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog()); const durableClock = new DurableClock(); clock.setGlobalClock(durableClock); -usage.setGlobalUsageManager(new DevUsageManager()); +const devUsageManager = new DevUsageManager(); +usage.setGlobalUsageManager(devUsageManager); const devRuntimeManager = new DevRuntimeManager(); runtime.setGlobalRuntimeManager(devRuntimeManager); +timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL"); @@ -271,19 +275,58 @@ const zodIpc = new ZodIpcConnection({ const measurement = usage.start(); - const { result } = await executor.execute(execution, metadata, traceContext, measurement); + // This lives outside of the executor because this will eventually be moved to the controller level + const signal = execution.run.maxDuration + ? timeout.abortAfterTimeout(execution.run.maxDuration) + : undefined; + + signal?.addEventListener("abort", async (e) => { + if (_isRunning) { + _isRunning = false; + _execution = undefined; + + const usageSample = usage.stop(measurement); + + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED, + message: + signal.reason instanceof Error ? signal.reason.message : String(signal.reason), + }, + usage: { + durationMs: usageSample.cpuTime, + }, + }, + }); + } + }); + + const { result } = await executor.execute( + execution, + metadata, + traceContext, + measurement, + signal + ); const usageSample = usage.stop(measurement); - return sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ...result, - usage: { - durationMs: usageSample.cpuTime, + if (_isRunning) { + return sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ...result, + usage: { + durationMs: usageSample.cpuTime, + }, }, - }, - }); + }); + } } finally { _execution = undefined; _isRunning = false; diff --git a/packages/core/src/v3/config.ts b/packages/core/src/v3/config.ts index 0ea16759b2..42ca53c616 100644 --- a/packages/core/src/v3/config.ts +++ b/packages/core/src/v3/config.ts @@ -37,6 +37,15 @@ export type TriggerConfig = { */ logLevel?: LogLevel; + /** + * The maximum duration in compute-time seconds that a task run is allowed to run. If the task run exceeds this duration, it will be stopped. + * + * Minimum value is 5 seconds + * + * Setting this value will effect all tasks in the project. + */ + maxDuration?: number; + /** * Enable console logging while running the dev CLI. This will print out logs from console.log, console.warn, and console.error. By default all logs are sent to the trigger.dev backend, and not logged to the console. */ diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index a29157af7c..5feea6d6e7 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -58,7 +58,11 @@ export function createErrorTaskError(error: TaskRunError): any { return JSON.parse(error.raw); } case "INTERNAL_ERROR": { - return new Error(`trigger.dev internal error (${error.code})`); + const e = new Error(error.message ?? `Internal error (${error.code})`); + e.name = error.code; + e.stack = error.stackTrace; + + return e; } } } diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index 61d7f3bf34..3302028d8c 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -12,6 +12,7 @@ export * from "./task-context-api.js"; export * from "./apiClientManager-api.js"; export * from "./usage-api.js"; export * from "./run-metadata-api.js"; +export * from "./timeout-api.js"; export * from "./schemas/index.js"; export { SemanticInternalAttributes } from "./semanticInternalAttributes.js"; export * from "./task-catalog-api.js"; diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 37fa187551..1958aacf83 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -84,6 +84,7 @@ export const TriggerTaskRequestBody = z.object({ maxAttempts: z.number().int().optional(), metadata: z.any(), metadataType: z.string().optional(), + maxDuration: z.number().optional(), }) .optional(), }); @@ -443,6 +444,8 @@ export const RunStatus = z.enum([ "DELAYED", /// Task has expired and won't be executed "EXPIRED", + /// Task has reached it's maxDuration and has been stopped + "TIMED_OUT", ]); export type RunStatus = z.infer; diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index e99b7dbc9e..cd4d3584ea 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -92,6 +92,7 @@ export const TaskRunErrorCodes = { HANDLE_ERROR_ERROR: "HANDLE_ERROR_ERROR", GRACEFUL_EXIT_TIMEOUT: "GRACEFUL_EXIT_TIMEOUT", TASK_RUN_CRASHED: "TASK_RUN_CRASHED", + MAX_DURATION_EXCEEDED: "MAX_DURATION_EXCEEDED", } as const; export const TaskRunInternalError = z.object({ @@ -112,6 +113,7 @@ export const TaskRunInternalError = z.object({ "GRACEFUL_EXIT_TIMEOUT", "TASK_RUN_HEARTBEAT_TIMEOUT", "TASK_RUN_CRASHED", + "MAX_DURATION_EXCEEDED", ]), message: z.string().optional(), stackTrace: z.string().optional(), @@ -144,6 +146,7 @@ export const TaskRun = z.object({ baseCostInCents: z.number().default(0), version: z.string().optional(), metadata: z.record(DeserializedJsonSchema).optional(), + maxDuration: z.number().optional(), }); export type TaskRun = z.infer; diff --git a/packages/core/src/v3/schemas/resources.ts b/packages/core/src/v3/schemas/resources.ts index fbdb9f27a8..01fda90ca0 100644 --- a/packages/core/src/v3/schemas/resources.ts +++ b/packages/core/src/v3/schemas/resources.ts @@ -11,6 +11,7 @@ export const TaskResource = z.object({ machine: MachineConfig.optional(), triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), + maxDuration: z.number().optional(), }); export type TaskResource = z.infer; diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 4ae7588d68..1b804ec3c4 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -154,6 +154,7 @@ const taskMetadata = { machine: MachineConfig.optional(), triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), + maxDuration: z.number().optional(), }; export const TaskMetadata = z.object(taskMetadata); diff --git a/packages/core/src/v3/timeout-api.ts b/packages/core/src/v3/timeout-api.ts new file mode 100644 index 0000000000..1b30af4a55 --- /dev/null +++ b/packages/core/src/v3/timeout-api.ts @@ -0,0 +1,5 @@ +// Split module-level variable definition into separate files to allow +// tree-shaking on each api instance. +import { TimeoutAPI } from "./timeout/api.js"; +/** Entrypoint for timeout API */ +export const timeout = TimeoutAPI.getInstance(); diff --git a/packages/core/src/v3/timeout/api.ts b/packages/core/src/v3/timeout/api.ts new file mode 100644 index 0000000000..d8370118e2 --- /dev/null +++ b/packages/core/src/v3/timeout/api.ts @@ -0,0 +1,46 @@ +import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js"; +import { TimeoutManager } from "./types.js"; + +const API_NAME = "timeout"; + +class NoopTimeoutManager implements TimeoutManager { + abortAfterTimeout(timeoutInSeconds: number): AbortSignal { + return new AbortController().signal; + } +} + +const NOOP_TIMEOUT_MANAGER = new NoopTimeoutManager(); + +export class TimeoutAPI implements TimeoutManager { + private static _instance?: TimeoutAPI; + + private constructor() {} + + public static getInstance(): TimeoutAPI { + if (!this._instance) { + this._instance = new TimeoutAPI(); + } + + return this._instance; + } + + public get signal(): AbortSignal | undefined { + return this.#getManagerManager().signal; + } + + public abortAfterTimeout(timeoutInSeconds: number): AbortSignal { + return this.#getManagerManager().abortAfterTimeout(timeoutInSeconds); + } + + public setGlobalManager(manager: TimeoutManager): boolean { + return registerGlobal(API_NAME, manager); + } + + public disable() { + unregisterGlobal(API_NAME); + } + + #getManagerManager(): TimeoutManager { + return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER; + } +} diff --git a/packages/core/src/v3/timeout/types.ts b/packages/core/src/v3/timeout/types.ts new file mode 100644 index 0000000000..9d2be0ef52 --- /dev/null +++ b/packages/core/src/v3/timeout/types.ts @@ -0,0 +1,14 @@ +export interface TimeoutManager { + abortAfterTimeout: (timeoutInSeconds: number) => AbortSignal; + signal?: AbortSignal; +} + +export class TaskRunExceededMaxDuration extends Error { + constructor( + public readonly timeoutInSeconds: number, + public readonly usageInSeconds: number + ) { + super(`Run exceeded maxDuration of ${timeoutInSeconds} seconds`); + this.name = "TaskRunExceededMaxDuration"; + } +} diff --git a/packages/core/src/v3/timeout/usageTimeoutManager.ts b/packages/core/src/v3/timeout/usageTimeoutManager.ts new file mode 100644 index 0000000000..030e30602d --- /dev/null +++ b/packages/core/src/v3/timeout/usageTimeoutManager.ts @@ -0,0 +1,35 @@ +import { UsageManager } from "../usage/types.js"; +import { TaskRunExceededMaxDuration, TimeoutManager } from "./types.js"; + +export class UsageTimeoutManager implements TimeoutManager { + private _abortController: AbortController; + private _abortSignal: AbortSignal | undefined; + + constructor(private readonly usageManager: UsageManager) { + this._abortController = new AbortController(); + } + + get signal(): AbortSignal | undefined { + return this._abortSignal; + } + + abortAfterTimeout(timeoutInSeconds: number): AbortSignal { + this._abortSignal = this._abortController.signal; + + // Now we need to start an interval that will measure usage and abort the signal if the usage is too high + const intervalId = setInterval(() => { + const sample = this.usageManager.sample(); + if (sample) { + if (sample.cpuTime > timeoutInSeconds * 1000) { + clearInterval(intervalId); + + this._abortController.abort( + new TaskRunExceededMaxDuration(timeoutInSeconds, sample.cpuTime / 1000) + ); + } + } + }, 1000); + + return this._abortSignal; + } +} diff --git a/packages/core/src/v3/tracer.ts b/packages/core/src/v3/tracer.ts index 562780e8e9..e1325ba443 100644 --- a/packages/core/src/v3/tracer.ts +++ b/packages/core/src/v3/tracer.ts @@ -13,6 +13,7 @@ import { SemanticInternalAttributes } from "./semanticInternalAttributes.js"; import { clock } from "./clock-api.js"; import { usage } from "./usage-api.js"; import { taskContext } from "./task-context-api.js"; +import { recordSpanException } from "./otel/index.js"; export type TriggerTracerConfig = | { @@ -57,12 +58,15 @@ export class TriggerTracer { name: string, fn: (span: Span) => Promise, options?: SpanOptions, - ctx?: Context + ctx?: Context, + signal?: AbortSignal ): Promise { const parentContext = ctx ?? context.active(); const attributes = options?.attributes ?? {}; + let spanEnded = false; + return this.tracer.startActiveSpan( name, { @@ -72,6 +76,14 @@ export class TriggerTracer { }, parentContext, async (span) => { + signal?.addEventListener("abort", () => { + if (!spanEnded) { + spanEnded = true; + recordSpanException(span, signal.reason); + span.end(); + } + }); + if (taskContext.ctx) { this.tracer .startSpan( @@ -94,27 +106,33 @@ export class TriggerTracer { try { return await fn(span); } catch (e) { - if (typeof e === "string" || e instanceof Error) { - span.recordException(e); - } + if (!spanEnded) { + if (typeof e === "string" || e instanceof Error) { + span.recordException(e); + } - span.setStatus({ code: SpanStatusCode.ERROR }); + span.setStatus({ code: SpanStatusCode.ERROR }); + } throw e; } finally { - if (taskContext.ctx) { - const usageSample = usage.stop(usageMeasurement); - const machine = taskContext.ctx.machine; - - span.setAttributes({ - [SemanticInternalAttributes.USAGE_DURATION_MS]: usageSample.cpuTime, - [SemanticInternalAttributes.USAGE_COST_IN_CENTS]: machine?.centsPerMs - ? usageSample.cpuTime * machine.centsPerMs - : 0, - }); + if (!spanEnded) { + spanEnded = true; + + if (taskContext.ctx) { + const usageSample = usage.stop(usageMeasurement); + const machine = taskContext.ctx.machine; + + span.setAttributes({ + [SemanticInternalAttributes.USAGE_DURATION_MS]: usageSample.cpuTime, + [SemanticInternalAttributes.USAGE_COST_IN_CENTS]: machine?.centsPerMs + ? usageSample.cpuTime * machine.centsPerMs + : 0, + }); + } + + span.end(clock.preciseNow()); } - - span.end(clock.preciseNow()); } } ); diff --git a/packages/core/src/v3/types/index.ts b/packages/core/src/v3/types/index.ts index 68e64516b8..b90d7da0ce 100644 --- a/packages/core/src/v3/types/index.ts +++ b/packages/core/src/v3/types/index.ts @@ -10,19 +10,27 @@ export type RunFnParams = Prettify<{ ctx: Context; /** If you use the `init` function, this will be whatever you returned. */ init?: TInitOutput; + /** Abort signal that is aborted when a task run exceeds it's maxDuration. Can be used to automatically cancel downstream requests */ + signal?: AbortSignal; }>; export type MiddlewareFnParams = Prettify<{ ctx: Context; next: () => Promise; + /** Abort signal that is aborted when a task run exceeds it's maxDuration. Can be used to automatically cancel downstream requests */ + signal?: AbortSignal; }>; export type InitFnParams = Prettify<{ ctx: Context; + /** Abort signal that is aborted when a task run exceeds it's maxDuration. Can be used to automatically cancel downstream requests */ + signal?: AbortSignal; }>; export type StartFnParams = Prettify<{ ctx: Context; + /** Abort signal that is aborted when a task run exceeds it's maxDuration. Can be used to automatically cancel downstream requests */ + signal?: AbortSignal; }>; export type Context = TaskRunContext; @@ -57,6 +65,8 @@ export type HandleErrorArgs = { retry?: RetryOptions; retryAt?: Date; retryDelayInMs?: number; + /** Abort signal that is aborted when a task run exceeds it's maxDuration. Can be used to automatically cancel downstream requests */ + signal?: AbortSignal; }; export type HandleErrorFunction = ( diff --git a/packages/core/src/v3/utils/globals.ts b/packages/core/src/v3/utils/globals.ts index f9124bdc88..24938a2ff8 100644 --- a/packages/core/src/v3/utils/globals.ts +++ b/packages/core/src/v3/utils/globals.ts @@ -4,6 +4,7 @@ import { Clock } from "../clock/clock.js"; import type { RuntimeManager } from "../runtime/manager.js"; import { TaskCatalog } from "../task-catalog/catalog.js"; import { TaskContext } from "../taskContext/types.js"; +import { TimeoutManager } from "../timeout/types.js"; import { UsageManager } from "../usage/types.js"; import { _globalThis } from "./platform.js"; @@ -56,4 +57,5 @@ type TriggerDotDevGlobalAPI = { ["task-context"]?: TaskContext; ["api-client"]?: ApiClientConfiguration; ["run-metadata"]?: Record; + ["timeout"]?: TimeoutManager; }; diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index bb02a12afc..a91de70db0 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -13,3 +13,4 @@ export { export * from "../usage-api.js"; export { DevUsageManager } from "../usage/devUsageManager.js"; export { ProdUsageManager, type ProdUsageManagerOptions } from "../usage/prodUsageManager.js"; +export { UsageTimeoutManager } from "../timeout/usageTimeoutManager.js"; diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 0a1ef93606..7139e35225 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -57,7 +57,8 @@ export class TaskExecutor { execution: TaskRunExecution, worker: ServerBackgroundWorker, traceContext: Record, - usage: UsageMeasurement + usage: UsageMeasurement, + signal?: AbortSignal ): Promise<{ result: TaskRunExecutionResult }> { const ctx = TaskRunContext.parse(execution); const attemptMessage = `Attempt ${execution.attempt.number}`; @@ -95,14 +96,14 @@ export class TaskExecutor { parsedPayload = await parsePacket(payloadPacket); if (execution.attempt.number === 1) { - await this.#callOnStartFunctions(parsedPayload, ctx); + await this.#callOnStartFunctions(parsedPayload, ctx, signal); } - initOutput = await this.#callInitFunctions(parsedPayload, ctx); + initOutput = await this.#callInitFunctions(parsedPayload, ctx, signal); - const output = await this.#callRun(parsedPayload, ctx, initOutput); + const output = await this.#callRun(parsedPayload, ctx, initOutput, signal); - await this.#callOnSuccessFunctions(parsedPayload, output, ctx, initOutput); + await this.#callOnSuccessFunctions(parsedPayload, output, ctx, initOutput, signal); try { const stringifiedOutput = await stringifyIO(output); @@ -153,7 +154,8 @@ export class TaskExecutor { execution, runError, parsedPayload, - ctx + ctx, + signal ); recordSpanException(span, handleErrorResult.error ?? runError); @@ -163,7 +165,8 @@ export class TaskExecutor { parsedPayload, handleErrorResult.error ?? runError, ctx, - initOutput + initOutput, + signal ); } @@ -197,7 +200,7 @@ export class TaskExecutor { } satisfies TaskRunExecutionResult; } } finally { - await this.#callTaskCleanup(parsedPayload, ctx, initOutput); + await this.#callTaskCleanup(parsedPayload, ctx, initOutput, signal); } }); }, @@ -207,13 +210,14 @@ export class TaskExecutor { [SemanticInternalAttributes.STYLE_ICON]: "attempt", }, }, - this._tracer.extractContext(traceContext) + this._tracer.extractContext(traceContext), + signal ); return { result }; } - async #callRun(payload: unknown, ctx: TaskRunContext, init: unknown) { + async #callRun(payload: unknown, ctx: TaskRunContext, init: unknown, signal?: AbortSignal) { const runFn = this.task.fns.run; const middlewareFn = this.task.fns.middleware; @@ -222,14 +226,18 @@ export class TaskExecutor { } if (!middlewareFn) { - return runFn(payload, { ctx, init }); + return runFn(payload, { ctx, init, signal }); } - return middlewareFn(payload, { ctx, next: async () => runFn(payload, { ctx, init }) }); + return middlewareFn(payload, { + ctx, + signal, + next: async () => runFn(payload, { ctx, init, signal }), + }); } - async #callInitFunctions(payload: unknown, ctx: TaskRunContext) { - await this.#callConfigInit(payload, ctx); + async #callInitFunctions(payload: unknown, ctx: TaskRunContext, signal?: AbortSignal) { + await this.#callConfigInit(payload, ctx, signal); const initFn = this.task.fns.init; @@ -240,7 +248,7 @@ export class TaskExecutor { return this._tracer.startActiveSpan( "init", async (span) => { - return await initFn(payload, { ctx }); + return await initFn(payload, { ctx, signal }); }, { attributes: { @@ -250,7 +258,7 @@ export class TaskExecutor { ); } - async #callConfigInit(payload: unknown, ctx: TaskRunContext) { + async #callConfigInit(payload: unknown, ctx: TaskRunContext, signal?: AbortSignal) { const initFn = this._importedConfig?.init; if (!initFn) { @@ -260,7 +268,7 @@ export class TaskExecutor { return this._tracer.startActiveSpan( "config.init", async (span) => { - return await initFn(payload, { ctx }); + return await initFn(payload, { ctx, signal }); }, { attributes: { @@ -274,7 +282,8 @@ export class TaskExecutor { payload: unknown, output: any, ctx: TaskRunContext, - initOutput: any + initOutput: any, + signal?: AbortSignal ) { await this.#callOnSuccessFunction( this.task.fns.onSuccess, @@ -282,7 +291,8 @@ export class TaskExecutor { payload, output, ctx, - initOutput + initOutput, + signal ); await this.#callOnSuccessFunction( @@ -291,7 +301,8 @@ export class TaskExecutor { payload, output, ctx, - initOutput + initOutput, + signal ); } @@ -301,7 +312,8 @@ export class TaskExecutor { payload: unknown, output: any, ctx: TaskRunContext, - initOutput: any + initOutput: any, + signal?: AbortSignal ) { if (!onSuccessFn) { return; @@ -311,7 +323,7 @@ export class TaskExecutor { await this._tracer.startActiveSpan( name, async (span) => { - return await onSuccessFn(payload, output, { ctx, init: initOutput }); + return await onSuccessFn(payload, output, { ctx, init: initOutput, signal }); }, { attributes: { @@ -328,7 +340,8 @@ export class TaskExecutor { payload: unknown, error: unknown, ctx: TaskRunContext, - initOutput: any + initOutput: any, + signal?: AbortSignal ) { await this.#callOnFailureFunction( this.task.fns.onFailure, @@ -336,7 +349,8 @@ export class TaskExecutor { payload, error, ctx, - initOutput + initOutput, + signal ); await this.#callOnFailureFunction( @@ -345,7 +359,8 @@ export class TaskExecutor { payload, error, ctx, - initOutput + initOutput, + signal ); } @@ -355,7 +370,8 @@ export class TaskExecutor { payload: unknown, error: unknown, ctx: TaskRunContext, - initOutput: any + initOutput: any, + signal?: AbortSignal ) { if (!onFailureFn) { return; @@ -365,7 +381,7 @@ export class TaskExecutor { return await this._tracer.startActiveSpan( name, async (span) => { - return await onFailureFn(payload, error, { ctx, init: initOutput }); + return await onFailureFn(payload, error, { ctx, init: initOutput, signal }); }, { attributes: { @@ -378,16 +394,24 @@ export class TaskExecutor { } } - async #callOnStartFunctions(payload: unknown, ctx: TaskRunContext) { + async #callOnStartFunctions(payload: unknown, ctx: TaskRunContext, signal?: AbortSignal) { await this.#callOnStartFunction( this._importedConfig?.onStart, "config.onStart", payload, ctx, - {} + {}, + signal ); - await this.#callOnStartFunction(this.task.fns.onStart, "task.onStart", payload, ctx, {}); + await this.#callOnStartFunction( + this.task.fns.onStart, + "task.onStart", + payload, + ctx, + {}, + signal + ); } async #callOnStartFunction( @@ -395,7 +419,8 @@ export class TaskExecutor { name: string, payload: unknown, ctx: TaskRunContext, - initOutput: any + initOutput: any, + signal?: AbortSignal ) { if (!onStartFn) { return; @@ -405,7 +430,7 @@ export class TaskExecutor { await this._tracer.startActiveSpan( name, async (span) => { - return await onStartFn(payload, { ctx }); + return await onStartFn(payload, { ctx, signal }); }, { attributes: { @@ -418,7 +443,12 @@ export class TaskExecutor { } } - async #callTaskCleanup(payload: unknown, ctx: TaskRunContext, init: unknown) { + async #callTaskCleanup( + payload: unknown, + ctx: TaskRunContext, + init: unknown, + signal?: AbortSignal + ) { const cleanupFn = this.task.fns.cleanup; if (!cleanupFn) { @@ -426,7 +456,7 @@ export class TaskExecutor { } return this._tracer.startActiveSpan("cleanup", async (span) => { - return await cleanupFn(payload, { ctx, init }); + return await cleanupFn(payload, { ctx, init, signal }); }); } @@ -434,7 +464,8 @@ export class TaskExecutor { execution: TaskRunExecution, error: unknown, payload: any, - ctx: TaskRunContext + ctx: TaskRunContext, + signal?: AbortSignal ): Promise< | { status: "retry"; retry: TaskRunExecutionRetry; error?: unknown } | { status: "skipped"; error?: unknown } // skipped is different than noop, it means that the task was skipped from retrying, instead of just not retrying @@ -486,6 +517,7 @@ export class TaskExecutor { retry, retryDelayInMs: delay, retryAt: delay ? new Date(Date.now() + delay) : undefined, + signal, }) : this._importedConfig ? await this._handleErrorFn?.(payload, error, { @@ -493,6 +525,7 @@ export class TaskExecutor { retry, retryDelayInMs: delay, retryAt: delay ? new Date(Date.now() + delay) : undefined, + signal, }) : undefined; diff --git a/packages/database/prisma/migrations/20241002155751_add_timed_out_status_to_task_run_status/migration.sql b/packages/database/prisma/migrations/20241002155751_add_timed_out_status_to_task_run_status/migration.sql new file mode 100644 index 0000000000..107e343290 --- /dev/null +++ b/packages/database/prisma/migrations/20241002155751_add_timed_out_status_to_task_run_status/migration.sql @@ -0,0 +1,2 @@ +-- AlterEnum +ALTER TYPE "TaskRunStatus" ADD VALUE 'TIMED_OUT'; diff --git a/packages/database/prisma/migrations/20241002163757_add_max_duration_to_background_worker_task/migration.sql b/packages/database/prisma/migrations/20241002163757_add_max_duration_to_background_worker_task/migration.sql new file mode 100644 index 0000000000..f04057a035 --- /dev/null +++ b/packages/database/prisma/migrations/20241002163757_add_max_duration_to_background_worker_task/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "BackgroundWorkerTask" ADD COLUMN "maxDurationInSeconds" INTEGER; diff --git a/packages/database/prisma/migrations/20241002164627_add_max_duration_in_seconds_to_task_run/migration.sql b/packages/database/prisma/migrations/20241002164627_add_max_duration_in_seconds_to_task_run/migration.sql new file mode 100644 index 0000000000..72f52481fc --- /dev/null +++ b/packages/database/prisma/migrations/20241002164627_add_max_duration_in_seconds_to_task_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN "maxDurationInSeconds" INTEGER; diff --git a/packages/database/prisma/schema.prisma b/packages/database/prisma/schema.prisma index fc66fc4b44..3537082628 100644 --- a/packages/database/prisma/schema.prisma +++ b/packages/database/prisma/schema.prisma @@ -1634,6 +1634,8 @@ model BackgroundWorkerTask { retryConfig Json? machineConfig Json? + maxDurationInSeconds Int? + triggerSource TaskTriggerSource @default(STANDARD) @@unique([workerId, slug]) @@ -1769,6 +1771,8 @@ model TaskRun { /// Run error error Json? + maxDurationInSeconds Int? + @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) // Finding child runs @@index([parentTaskRunId]) @@ -1829,8 +1833,11 @@ enum TaskRunStatus { /// Task has crashed and won't be retried, most likely the worker ran out of resources, e.g. memory or storage CRASHED - // Task reached the ttl without being executed + /// Task reached the ttl without being executed EXPIRED + + /// Task has been timed out when using maxDuration + TIMED_OUT } model TaskRunTag { diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index 00436d1189..22b8dac1e0 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -8,6 +8,7 @@ export * from "./usage.js"; export * from "./idempotencyKeys.js"; export * from "./tags.js"; export * from "./metadata.js"; +export * from "./timeout.js"; export type { Context }; import type { Context } from "./shared.js"; diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 3d5ea3059b..071e8cb268 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -157,6 +157,13 @@ export type TaskOptions< | "large-2x"; }; + /** + * The maximum duration in compute-time seconds that a task run is allowed to run. If the task run exceeds this duration, it will be stopped. + * + * Minimum value is 5 seconds + */ + maxDuration?: number; + /** This gets called when a task is triggered. It's where you put the code you want to execute. * * @param payload - The payload that is passed to your task when it's triggered. This must be JSON serializable. @@ -502,6 +509,15 @@ export type TaskRunOptions = { * Metadata to attach to the run. Metadata can be used to store additional information about the run. Limited to 4KB. */ metadata?: Record; + + /** + * The maximum duration in compute-time seconds that a task run is allowed to run. If the task run exceeds this duration, it will be stopped. + * + * This will override the task's maxDuration. + * + * Minimum value is 5 seconds + */ + maxDuration?: number; }; type TaskRunConcurrencyOptions = Queue; @@ -602,6 +618,7 @@ export function createTask< queue: params.queue, retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined, machine: params.machine, + maxDuration: params.maxDuration, fns: { run: params.run, init: params.init, @@ -795,6 +812,7 @@ async function trigger_internal( maxAttempts: options?.maxAttempts, parentAttempt: taskContext.ctx?.attempt.id, metadata: options?.metadata, + maxDuration: options?.maxDuration, }, }, { @@ -854,6 +872,7 @@ async function batchTrigger_internal( maxAttempts: item.options?.maxAttempts, parentAttempt: taskContext.ctx?.attempt.id, metadata: item.options?.metadata, + maxDuration: item.options?.maxDuration, }, }; }) @@ -918,6 +937,7 @@ async function triggerAndWait_internal( tags: options?.tags, maxAttempts: options?.maxAttempts, metadata: options?.metadata, + maxDuration: options?.maxDuration, }, }, {}, @@ -1011,6 +1031,7 @@ async function batchTriggerAndWait_internal( tags: item.options?.tags, maxAttempts: item.options?.maxAttempts, metadata: item.options?.metadata, + maxDuration: item.options?.maxDuration, }, }; }) diff --git a/packages/trigger-sdk/src/v3/timeout.ts b/packages/trigger-sdk/src/v3/timeout.ts new file mode 100644 index 0000000000..b7d6ab5d22 --- /dev/null +++ b/packages/trigger-sdk/src/v3/timeout.ts @@ -0,0 +1,8 @@ +import { timeout as timeoutApi } from "@trigger.dev/core/v3"; + +const MAXIMUM_MAX_DURATION = 2_147_483_647; + +export const timeout = { + None: MAXIMUM_MAX_DURATION, + signal: timeoutApi.signal, +}; diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index d9a901de12..50d76069b0 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -1,4 +1,4 @@ -import { logger, task, wait } from "@trigger.dev/sdk/v3"; +import { logger, task, timeout, usage, wait } from "@trigger.dev/sdk/v3"; import { setTimeout } from "timers/promises"; export const helloWorldTask = task({ @@ -42,3 +42,29 @@ export const childTask = task({ } }, }); + +export const maxDurationTask = task({ + id: "max-duration", + retry: { + maxAttempts: 5, + minTimeoutInMs: 1_000, + maxTimeoutInMs: 2_000, + factor: 1.4, + }, + maxDuration: 5, + run: async (payload: { sleepFor: number }, { signal, ctx }) => { + await setTimeout(payload.sleepFor * 1000, { signal }); + }, +}); + +export const maxDurationParentTask = task({ + id: "max-duration-parent", + run: async (payload: { sleepFor?: number; maxDuration?: number }, { ctx, signal }) => { + const result = await maxDurationTask.triggerAndWait( + { sleepFor: payload.sleepFor ?? 10 }, + { maxDuration: timeout.None } + ); + + return result; + }, +}); diff --git a/references/hello-world/trigger.config.ts b/references/hello-world/trigger.config.ts index 7483169a24..a3a5755074 100644 --- a/references/hello-world/trigger.config.ts +++ b/references/hello-world/trigger.config.ts @@ -3,6 +3,7 @@ import { defineConfig } from "@trigger.dev/sdk/v3"; export default defineConfig({ project: "proj_rrkpdguyagvsoktglnod", logLevel: "log", + maxDuration: 60, retries: { enabledInDev: true, default: { diff --git a/references/v3-catalog/package.json b/references/v3-catalog/package.json index 8f23f7b6f2..35fc7899ca 100644 --- a/references/v3-catalog/package.json +++ b/references/v3-catalog/package.json @@ -7,7 +7,7 @@ }, "scripts": { "dev:trigger": "trigger dev", - "deploy": "trigger deploy", + "deploy": "trigger deploy --self-hosted --load-image", "management": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/management.ts", "queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts", "build:client": "tsup-node ./src/clientUsage.ts --format esm,cjs", diff --git a/references/v3-catalog/src/trigger/maxDuration.ts b/references/v3-catalog/src/trigger/maxDuration.ts new file mode 100644 index 0000000000..612b1916a6 --- /dev/null +++ b/references/v3-catalog/src/trigger/maxDuration.ts @@ -0,0 +1,21 @@ +import { logger, task, usage, wait } from "@trigger.dev/sdk/v3"; +import { setTimeout } from "timers/promises"; + +export const maxDurationTask = task({ + id: "max-duration", + maxDuration: 15, // 15 seconds + run: async (payload: { sleepFor: number }, { signal }) => { + await setTimeout(payload.sleepFor * 1000, { signal }); + + return usage.getCurrent(); + }, +}); + +export const maxDurationParentTask = task({ + id: "max-duration-parent", + run: async (payload: any, { ctx, signal }) => { + const result = await maxDurationTask.triggerAndWait({ sleepFor: 10 }, { maxDuration: 600 }); + + return result; + }, +}); diff --git a/references/v3-catalog/trigger.config.ts b/references/v3-catalog/trigger.config.ts index 36d5f7bab9..92350853d3 100644 --- a/references/v3-catalog/trigger.config.ts +++ b/references/v3-catalog/trigger.config.ts @@ -17,6 +17,7 @@ export default defineConfig({ machine: "medium-1x", instrumentations: [new OpenAIInstrumentation()], additionalFiles: ["wrangler/wrangler.toml"], + maxDuration: 60, retries: { enabledInDev: false, default: { @@ -33,7 +34,11 @@ export default defineConfig({ console.log(`Task ${ctx.task.id} started ${ctx.run.id}`); }, onFailure: async (payload, error, { ctx }) => { - console.log(`Task ${ctx.task.id} failed ${ctx.run.id}`); + console.log( + `Task ${ctx.task.id} failed ${ctx.run.id}: ${ + error instanceof Error ? error.message : String(error) + }` + ); }, build: { conditions: ["react-server"],