Skip to content

Commit

Permalink
fix: add lazyFetch for initial payload and context.call
Browse files Browse the repository at this point in the history
  • Loading branch information
CahidArda committed Oct 31, 2024
1 parent 0ef8478 commit 30e5315
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/context/auto-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ describe("auto-executor", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down Expand Up @@ -211,6 +212,7 @@ describe("auto-executor", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-delay": "123s",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand All @@ -225,6 +227,7 @@ describe("auto-executor", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down Expand Up @@ -277,6 +280,7 @@ describe("auto-executor", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down Expand Up @@ -329,6 +333,7 @@ describe("auto-executor", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down
4 changes: 4 additions & 0 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ describe("context tests", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "2",
Expand Down Expand Up @@ -186,6 +187,7 @@ describe("context tests", () => {
timeout: "20s",
timeoutHeaders: {
"Content-Type": ["application/json"],
"Upstash-Feature-Set": ["LazyFetch"],
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"],
"Upstash-Retries": ["3"],
"Upstash-Workflow-CallType": ["step"],
Expand Down Expand Up @@ -235,6 +237,7 @@ describe("context tests", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand All @@ -249,6 +252,7 @@ describe("context tests", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down
148 changes: 148 additions & 0 deletions src/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -782,4 +782,152 @@ describe.skip("live serve tests", () => {
timeout: 10_000,
}
);

describe("lazy fetch", () => {
// create 5 mb payload.
// lazy fetch will become enabled for payloads larger than 3mb
const largeObject = "x".repeat(4 * 1024 * 1024);

test(
"large payload",
async () => {
const finishState = new FinishState();
await testEndpoint({
finalCount: 3,
waitFor: 7000,
initialPayload: largeObject,
finishState,
routeFunction: async (context) => {
const input = context.requestPayload;

expect(input).toBe(largeObject);

const result = await context.run("step1", async () => {
return "step-1-result";
});
expect(result).toBe("step-1-result");

finishState.finish();
},
});
},
{
timeout: 10_000,
}
);
test.skip(
"large parallel step response",
async () => {
const finishState = new FinishState();
await testEndpoint({
finalCount: 4,
waitFor: 7000,
initialPayload: "my-payload",
finishState,
routeFunction: async (context) => {
const input = context.requestPayload;

expect(input).toBe("my-payload");

const results = await Promise.all([
context.run("step1", () => {
return largeObject;
}),
context.sleep("sleep1", 1),
context.run("step2", () => {
return largeObject;
}),
context.sleep("sleep2", 1),
]);

expect(results[0]).toBe(largeObject);
expect(results[1]).toBe(undefined);
expect(results[2]).toBe(largeObject);
expect(results[3]).toBe(undefined);

await context.sleep("check", 1);

finishState.finish();
},
});
},
{
timeout: 10_000,
}
);

test.skip(
"large error",
async () => {
const finishState = new FinishState();
await testEndpoint({
finalCount: 3,
waitFor: 7000,
initialPayload: "my-payload",
finishState,
retries: 0,
routeFunction: async (context) => {
const input = context.requestPayload;

expect(input).toBe("my-payload");

await context.run("step1", async () => {
throw new Error(largeObject);
});
},
failureFunction(context, failStatus, failResponse) {
expect(failResponse).toBe(largeObject);
finishState.finish();
},
});
},
{
timeout: 10_000,
}
);
test(
"large call response",
async () => {
const thirdPartyServer = serve({
async fetch() {
return new Response(largeObject, { status: 200 });
},
port: THIRD_PARTY_PORT,
});

const finishState = new FinishState();
await testEndpoint({
finalCount: 6,
waitFor: 9000,
initialPayload: "my-payload",
finishState,
routeFunction: async (context) => {
// sleeping to avoid checking input before the first step
await context.sleep("sleeping", 1);

const input = context.requestPayload;
expect(input).toBe("my-payload");

const { status, body } = await context.call("call to large object", {
url: LOCAL_THIRD_PARTY_URL,
body: input,
method: "POST",
});

expect(status).toBe(200);
expect(body).toBe(largeObject);

await context.sleep("sleep", 1);

finishState.finish();
},
});

thirdPartyServer.stop();
},
{
timeout: 10_000,
}
);
});
});
3 changes: 3 additions & 0 deletions src/serve/authorization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ describe("disabled workflow context", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "0",
Expand Down Expand Up @@ -265,6 +266,7 @@ describe("disabled workflow context", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down Expand Up @@ -319,6 +321,7 @@ describe("disabled workflow context", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down
8 changes: 8 additions & 0 deletions src/serve/serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-retries": "3",
"upstash-method": "POST",
Expand All @@ -156,6 +157,7 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down Expand Up @@ -329,6 +331,7 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
Expand Down Expand Up @@ -373,6 +376,7 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-delay": "1s",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -414,6 +418,8 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-failure-callback-upstash-workflow-runid": "wfr-bar",
"upstash-delay": "1s",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
Expand Down Expand Up @@ -463,12 +469,14 @@ describe("serve", () => {
destination: WORKFLOW_ENDPOINT,
headers: {
"content-type": "application/json",
"upstash-feature-set": "LazyFetch",
"upstash-delay": "1s",
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-method": "POST",
"upstash-retries": "3",
"upstash-workflow-init": "false",
"upstash-workflow-runid": "wfr-bar",
"upstash-failure-callback-upstash-workflow-runid": "wfr-bar",
"upstash-workflow-url": WORKFLOW_ENDPOINT,
"upstash-failure-callback": WORKFLOW_ENDPOINT,
"upstash-failure-callback-forward-upstash-workflow-is-failure": "true",
Expand Down
9 changes: 8 additions & 1 deletion src/workflow-requests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ describe("Workflow Requests", () => {
[WORKFLOW_INIT_HEADER]: "true",
[WORKFLOW_ID_HEADER]: workflowRunId,
[WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT,
[WORKFLOW_FEATURE_HEADER]: "LazyFetch",
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION,
});
expect(timeoutHeaders).toBeUndefined();
Expand All @@ -418,6 +419,7 @@ describe("Workflow Requests", () => {
[WORKFLOW_INIT_HEADER]: "false",
[WORKFLOW_ID_HEADER]: workflowRunId,
[WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT,
[WORKFLOW_FEATURE_HEADER]: "LazyFetch",
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION,
});
expect(timeoutHeaders).toBeUndefined();
Expand Down Expand Up @@ -451,10 +453,11 @@ describe("Workflow Requests", () => {
}
);
expect(headers).toEqual({
[WORKFLOW_FEATURE_HEADER]: "WF_NoDelete",
[WORKFLOW_INIT_HEADER]: "false",
[WORKFLOW_ID_HEADER]: workflowRunId,
[WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT,
[WORKFLOW_FEATURE_HEADER]: "WF_NoDelete",
"Upstash-Callback-Feature-Set": "LazyFetch",
"Upstash-Retries": "0",
"Upstash-Callback": WORKFLOW_ENDPOINT,
"Upstash-Callback-Forward-Upstash-Workflow-Callback": "true",
Expand Down Expand Up @@ -487,6 +490,8 @@ describe("Workflow Requests", () => {
[WORKFLOW_INIT_HEADER]: "true",
[WORKFLOW_ID_HEADER]: workflowRunId,
[WORKFLOW_URL_HEADER]: WORKFLOW_ENDPOINT,
[`Upstash-Failure-Callback-${WORKFLOW_ID_HEADER}`]: workflowRunId,
[WORKFLOW_FEATURE_HEADER]: "LazyFetch",
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: WORKFLOW_PROTOCOL_VERSION,
[`Upstash-Failure-Callback-Forward-${WORKFLOW_FAILURE_HEADER}`]: "true",
"Upstash-Failure-Callback": failureUrl,
Expand All @@ -513,13 +518,15 @@ describe("Workflow Requests", () => {
"Upstash-Workflow-Init": "false",
"Upstash-Workflow-RunId": workflowRunId,
"Upstash-Workflow-Url": WORKFLOW_ENDPOINT,
[WORKFLOW_FEATURE_HEADER]: "LazyFetch",
"Upstash-Forward-Upstash-Workflow-Sdk-Version": "1",
"Upstash-Workflow-CallType": "step",
});
expect(timeoutHeaders).toEqual({
"Upstash-Workflow-Init": ["false"],
"Upstash-Workflow-RunId": [workflowRunId],
"Upstash-Workflow-Url": [WORKFLOW_ENDPOINT],
[WORKFLOW_FEATURE_HEADER]: ["LazyFetch"],
"Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"],
"Upstash-Workflow-Runid": [workflowRunId],
"Upstash-Workflow-CallType": ["step"],
Expand Down
Loading

0 comments on commit 30e5315

Please sign in to comment.