From 14dce13108a9c66fe02554f4e87f2e611c241de5 Mon Sep 17 00:00:00 2001 From: CahidArda Date: Thu, 31 Oct 2024 19:09:27 +0300 Subject: [PATCH] fix: add lazyFetch for parallel --- src/client/utils.ts | 43 +++++++++++++++++++++++++++++++++++----- src/integration.test.ts | 7 ++++--- src/serve/index.ts | 1 + src/workflow-parser.ts | 26 +++++++++++++----------- src/workflow-requests.ts | 2 +- 5 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/client/utils.ts b/src/client/utils.ts index 1533661..516ecbe 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -1,6 +1,7 @@ import { Client } from "@upstash/qstash"; import { NotifyResponse, RawStep, Waiter } from "../types"; import { WorkflowLogger } from "../logger"; +import { WorkflowError } from "../error"; export const makeNotifyRequest = async ( requester: Client["http"], @@ -39,11 +40,43 @@ export const makeCancelRequest = async (requester: Client["http"], workflowRunId export const getSteps = async ( requester: Client["http"], workflowRunId: string, + messageId?: string, debug?: WorkflowLogger ): Promise => { - await debug?.log("INFO", "ENDPOINT_START", "Pulling steps from QStash."); - return (await requester.request({ - path: ["v2", "workflows", "runs", workflowRunId], - parseResponseAsJson: true, - })) as RawStep[]; + try { + const steps = (await requester.request({ + path: ["v2", "workflows", "runs", workflowRunId], + parseResponseAsJson: true, + })) as RawStep[]; + + if (!messageId) { + await debug?.log("INFO", "ENDPOINT_START", { + message: + `Pulled ${steps.length} steps from QStash` + + `and returned them without filtering with messageId.`, + }); + return steps; + } else { + const index = steps.findIndex((item) => item.messageId === messageId); + + if (index === -1) { + // targetMessageId not found, return an empty array or handle it as needed + return []; + } + + const filteredSteps = steps.slice(0, index + 1); + await debug?.log("INFO", "ENDPOINT_START", { + message: + `Pulled ${steps.length} steps from QStash` + + `and filtered them to ${filteredSteps.length} using messageId.`, + }); + return filteredSteps; + } + } catch (error) { + await debug?.log("ERROR", "ERROR", { + message: "failed while fetching steps.", + error: error, + }); + throw new WorkflowError(`Failed while pulling steps. ${error}`); + } }; diff --git a/src/integration.test.ts b/src/integration.test.ts index 3a1d395..010f5c1 100644 --- a/src/integration.test.ts +++ b/src/integration.test.ts @@ -783,7 +783,7 @@ describe.skip("live serve tests", () => { } ); - describe("lazy fetch", () => { + describe.skip("lazy fetch", () => { // create 5 mb payload. // lazy fetch will become enabled for payloads larger than 3mb const largeObject = "x".repeat(4 * 1024 * 1024); @@ -815,12 +815,12 @@ describe.skip("live serve tests", () => { timeout: 10_000, } ); - test.skip( + test( "large parallel step response", async () => { const finishState = new FinishState(); await testEndpoint({ - finalCount: 4, + finalCount: 11, waitFor: 7000, initialPayload: "my-payload", finishState, @@ -885,6 +885,7 @@ describe.skip("live serve tests", () => { timeout: 10_000, } ); + test( "large call response", async () => { diff --git a/src/serve/index.ts b/src/serve/index.ts index 3439fda..670b407 100644 --- a/src/serve/index.ts +++ b/src/serve/index.ts @@ -82,6 +82,7 @@ export const serve = < isFirstInvocation, workflowRunId, qstashClient.http, + request.headers.get("upstash-message-id")!, debug ); diff --git a/src/workflow-parser.ts b/src/workflow-parser.ts index 0bbcd8c..af77e54 100644 --- a/src/workflow-parser.ts +++ b/src/workflow-parser.ts @@ -225,6 +225,7 @@ export const parseRequest = async ( isFirstInvocation: boolean, workflowRunId: string, requester: Client["http"], + messageId?: string, debug?: WorkflowLogger ): Promise<{ rawInitialPayload: string; @@ -247,7 +248,7 @@ export const parseRequest = async ( ); } const { rawInitialPayload, steps } = await parsePayload( - requestPayload ? requestPayload : await getSteps(requester, workflowRunId), + requestPayload ? requestPayload : await getSteps(requester, workflowRunId, messageId, debug), debug ); const isLastDuplicate = await checkIfLastOneIsDuplicate(steps, debug); @@ -296,17 +297,17 @@ export const handleFailure = async ( } try { - const { status, header, body, url, sourceHeader, sourceBody, workflowRunId } = JSON.parse( - requestPayload - ) as { - status: number; - header: Record; - body: string; - url: string; - sourceHeader: Record; - sourceBody: string; - workflowRunId: string; - }; + const { status, header, body, url, sourceHeader, sourceBody, workflowRunId, sourceMessageId } = + JSON.parse(requestPayload) as { + status: number; + header: Record; + body: string; + url: string; + sourceHeader: Record; + sourceBody: string; + workflowRunId: string; + sourceMessageId: string; + }; const decodedBody = body ? decodeBase64(body) : "{}"; const errorPayload = JSON.parse(decodedBody) as FailureFunctionPayload; @@ -322,6 +323,7 @@ export const handleFailure = async ( false, workflowRunId, qstashClient.http, + sourceMessageId, debug ); diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index e1c9b24..ac3d061 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -170,7 +170,7 @@ export const handleThirdPartyCallResult = async ( throw new WorkflowError("workflow run id missing in context.call lazy fetch."); if (!messageId) throw new WorkflowError("message id missing in context.call lazy fetch."); - const steps = await getSteps(client.http, workflowRunId, debug); + const steps = await getSteps(client.http, workflowRunId, messageId, debug); const failingStep = steps.find((step) => step.messageId === messageId); if (!failingStep)