Skip to content

Commit

Permalink
fix: add lazyFetch for parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Oct 31, 2024
1 parent 30e5315 commit 14dce13
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 21 deletions.
43 changes: 38 additions & 5 deletions src/client/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Client } from "@upstash/qstash";
import { NotifyResponse, RawStep, Waiter } from "../types";
import { WorkflowLogger } from "../logger";
import { WorkflowError } from "../error";

export const makeNotifyRequest = async (
requester: Client["http"],
Expand Down Expand Up @@ -39,11 +40,43 @@ export const makeCancelRequest = async (requester: Client["http"], workflowRunId
export const getSteps = async (
requester: Client["http"],
workflowRunId: string,
messageId?: string,
debug?: WorkflowLogger
): Promise<RawStep[]> => {
await debug?.log("INFO", "ENDPOINT_START", "Pulling steps from QStash.");
return (await requester.request({
path: ["v2", "workflows", "runs", workflowRunId],
parseResponseAsJson: true,
})) as RawStep[];
try {
const steps = (await requester.request({
path: ["v2", "workflows", "runs", workflowRunId],
parseResponseAsJson: true,
})) as RawStep[];

if (!messageId) {
await debug?.log("INFO", "ENDPOINT_START", {
message:
`Pulled ${steps.length} steps from QStash` +
`and returned them without filtering with messageId.`,
});
return steps;
} else {
const index = steps.findIndex((item) => item.messageId === messageId);

if (index === -1) {
// targetMessageId not found, return an empty array or handle it as needed
return [];
}

const filteredSteps = steps.slice(0, index + 1);
await debug?.log("INFO", "ENDPOINT_START", {
message:
`Pulled ${steps.length} steps from QStash` +
`and filtered them to ${filteredSteps.length} using messageId.`,
});
return filteredSteps;
}
} catch (error) {
await debug?.log("ERROR", "ERROR", {
message: "failed while fetching steps.",
error: error,
});
throw new WorkflowError(`Failed while pulling steps. ${error}`);
}
};
7 changes: 4 additions & 3 deletions src/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ describe.skip("live serve tests", () => {
}
);

describe("lazy fetch", () => {
describe.skip("lazy fetch", () => {
// create 5 mb payload.
// lazy fetch will become enabled for payloads larger than 3mb
const largeObject = "x".repeat(4 * 1024 * 1024);
Expand Down Expand Up @@ -815,12 +815,12 @@ describe.skip("live serve tests", () => {
timeout: 10_000,
}
);
test.skip(
test(
"large parallel step response",
async () => {
const finishState = new FinishState();
await testEndpoint({
finalCount: 4,
finalCount: 11,
waitFor: 7000,
initialPayload: "my-payload",
finishState,
Expand Down Expand Up @@ -885,6 +885,7 @@ describe.skip("live serve tests", () => {
timeout: 10_000,
}
);

test(
"large call response",
async () => {
Expand Down
1 change: 1 addition & 0 deletions src/serve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const serve = <
isFirstInvocation,
workflowRunId,
qstashClient.http,
request.headers.get("upstash-message-id")!,
debug
);

Expand Down
26 changes: 14 additions & 12 deletions src/workflow-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ export const parseRequest = async (
isFirstInvocation: boolean,
workflowRunId: string,
requester: Client["http"],
messageId?: string,
debug?: WorkflowLogger
): Promise<{
rawInitialPayload: string;
Expand All @@ -247,7 +248,7 @@ export const parseRequest = async (
);
}
const { rawInitialPayload, steps } = await parsePayload(
requestPayload ? requestPayload : await getSteps(requester, workflowRunId),
requestPayload ? requestPayload : await getSteps(requester, workflowRunId, messageId, debug),
debug
);
const isLastDuplicate = await checkIfLastOneIsDuplicate(steps, debug);
Expand Down Expand Up @@ -296,17 +297,17 @@ export const handleFailure = async <TInitialPayload>(
}

try {
const { status, header, body, url, sourceHeader, sourceBody, workflowRunId } = JSON.parse(
requestPayload
) as {
status: number;
header: Record<string, string[]>;
body: string;
url: string;
sourceHeader: Record<string, string[]>;
sourceBody: string;
workflowRunId: string;
};
const { status, header, body, url, sourceHeader, sourceBody, workflowRunId, sourceMessageId } =
JSON.parse(requestPayload) as {
status: number;
header: Record<string, string[]>;
body: string;
url: string;
sourceHeader: Record<string, string[]>;
sourceBody: string;
workflowRunId: string;
sourceMessageId: string;
};

const decodedBody = body ? decodeBase64(body) : "{}";
const errorPayload = JSON.parse(decodedBody) as FailureFunctionPayload;
Expand All @@ -322,6 +323,7 @@ export const handleFailure = async <TInitialPayload>(
false,
workflowRunId,
qstashClient.http,
sourceMessageId,
debug
);

Expand Down
2 changes: 1 addition & 1 deletion src/workflow-requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ export const handleThirdPartyCallResult = async (
throw new WorkflowError("workflow run id missing in context.call lazy fetch.");
if (!messageId) throw new WorkflowError("message id missing in context.call lazy fetch.");

const steps = await getSteps(client.http, workflowRunId, debug);
const steps = await getSteps(client.http, workflowRunId, messageId, debug);
const failingStep = steps.find((step) => step.messageId === messageId);

if (!failingStep)
Expand Down

0 comments on commit 14dce13

Please sign in to comment.