Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event notification dx improvements #23

Merged
merged 9 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
20 changes: 16 additions & 4 deletions examples/ci/app/test-routes/wait-for-event/workflow/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ export const { POST, GET } = testServe(
const { eventData, timeout } = await context.waitForEvent(
"wait for event which should timeout",
`random-event-${nanoid()}`,
1
{
timeout: 1
}
);
expect(eventData as undefined, undefined);
expect(timeout, true);
Expand Down Expand Up @@ -51,7 +53,9 @@ export const { POST, GET } = testServe(
headers: { authorization: `Bearer ${NOTIFIER_SECRET}` }
}
),
context.waitForEvent("wait sdk", config.sdkEventId, 5)
context.waitForEvent("wait sdk", config.sdkEventId, {
timeout: "5s"
})
])

expect(sdkResults[0].status, 200)
Expand All @@ -71,7 +75,9 @@ export const { POST, GET } = testServe(
body: config,
}
),
context.waitForEvent("wait text", config.textEventId, 5)
context.waitForEvent("wait text", config.textEventId, {
timeout: 5
})
])

expect(textResults[0].status, 200)
Expand All @@ -84,7 +90,13 @@ export const { POST, GET } = testServe(
const {
eventData: objectEventData,
timeout: objectTimeout
} = await context.waitForEvent("wait object", config.objectEventId, 5)
} = await context.waitForEvent(
"wait object",
config.objectEventId,
{
timeout: "5s"
}
)

expect(objectTimeout, false)
expect(typeof objectEventData, "object")
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"prettier": "3.3.3",
"tsc": "^2.0.4",
"tsup": "^8.3.0",
"typescript": "5.4.5",
"typescript": "^5.6.3",
"typescript-eslint": "^8.8.0"
},
"dependencies": {
Expand Down
10 changes: 9 additions & 1 deletion src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ export class Client {

constructor(clientConfig: ClientConfig) {
if (!clientConfig.token) {
console.warn("[Upstash Workflow] url or the token is not set. client will not work.");
console.error(
"QStash token is required for Upstash Workflow!\n\n" +
"To fix this:\n" +
"1. Get your token from the Upstash Console (https://console.upstash.com/qstash)\n" +
"2. Initialize the workflow client with:\n\n" +
" const client = new Client({\n" +
" token: '<YOUR_QSTASH_TOKEN>'\n" +
" });"
);
}
this.client = new QStashClient(clientConfig);
}
Expand Down
2 changes: 1 addition & 1 deletion src/context/auto-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ describe("auto-executor", () => {
context.sleep("sleep for 123s", 123),
context.sleep("sleep for 10m", "10m"),
context.sleepUntil("sleep until next day", 123_123),
context.waitForEvent("waitEvent", "my-event", "5m"),
context.waitForEvent("waitEvent", "my-event", { timeout: "5m" }),
]);
expect(throws).rejects.toThrowError(QStashWorkflowAbort);
},
Expand Down
6 changes: 3 additions & 3 deletions src/context/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ describe("context tests", () => {
const eventId = "my-event-id";
await mockQStashServer({
execute: () => {
const throws = () => context.waitForEvent("my-step", eventId, 20);
const throws = () => context.waitForEvent("my-step", eventId);
expect(throws).toThrowError("Aborting workflow after executing step 'my-step'.");
},
responseFields: {
Expand All @@ -184,7 +184,7 @@ describe("context tests", () => {
stepName: "my-step",
stepType: "Wait",
},
timeout: "20s",
timeout: "7d", // default timeout
timeoutHeaders: {
"Content-Type": ["application/json"],
[`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: ["1"],
Expand Down Expand Up @@ -218,7 +218,7 @@ describe("context tests", () => {
execute: () => {
const throws = () =>
Promise.all([
context.waitForEvent("my-wait-step", eventId, 20),
context.waitForEvent("my-wait-step", eventId, { timeout: 20 }),
context.run("my-run-step", () => "foo"),
]);
expect(throws).toThrowError("Aborting workflow after executing step 'my-wait-step'.");
Expand Down
58 changes: 31 additions & 27 deletions src/context/context.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import type { CallResponse, NotifyStepResponse, WaitStepResponse, WorkflowClient } from "../types";
import type {
CallResponse,
NotifyStepResponse,
WaitEventOptions,
WaitStepResponse,
WorkflowClient,
} from "../types";
import { type StepFunction, type Step } from "../types";
import { AutoExecutor } from "./auto-executor";
import type { BaseLazyStep } from "./steps";
Expand Down Expand Up @@ -334,49 +340,47 @@ export class WorkflowContext<TInitialPayload = unknown> {
}

/**
* Makes the workflow run wait until a notify request is sent or until the
* timeout ends
* Pauses workflow execution until a specific event occurs or a timeout is reached.
*
* ```ts
* const { eventData, timeout } = await context.waitForEvent(
* "wait for event step",
* "my-event-id",
* 100 // timeout after 100 seconds
* );
* ```
*```ts
* const result = await workflow.waitForEvent("payment-confirmed", {
* timeout: "5m"
* });
*```
*
* To notify a waiting workflow run, you can use the notify method:
* To notify a waiting workflow:
*
* ```ts
* import { Client } from "@upstash/workflow";
*
* const client = new Client({ token: });
* const client = new Client({ token: "<QSTASH_TOKEN>" });
*
* await client.notify({
* eventId: "my-event-id",
* eventData: "eventData"
* eventId: "payment.confirmed",
* data: {
* amount: 99.99,
* currency: "USD"
* }
* })
* ```
*
* @param stepName
* @param eventId event id to wake up the waiting workflow run
* @param timeout timeout duration in seconds
* @returns wait response as `{ timeout: boolean, eventData: unknown }`.
* timeout is true if the wait times out, if notified it is false. eventData
* is the value passed to `client.notify`.
* @param eventId - Unique identifier for the event to wait for
* @param options - Configuration options.
* @returns `{ timeout: boolean, eventData: unknown }`.
* The `timeout` property specifies if the workflow has timed out. The `eventData`
* is the data passed when notifying this workflow of an event.
*/
public async waitForEvent(
stepName: string,
eventId: string,
timeout: number | Duration
options: WaitEventOptions = {}
): Promise<WaitStepResponse> {
const result = await this.addStep(
new LazyWaitForEventStep(
stepName,
eventId,
typeof timeout === "string" ? timeout : `${timeout}s`
)
);
const { timeout = "7d" } = options;

const timeoutStr = typeof timeout === "string" ? timeout : `${timeout}s`;

const result = await this.addStep(new LazyWaitForEventStep(stepName, eventId, timeoutStr));

try {
return {
Expand Down
4 changes: 2 additions & 2 deletions src/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,14 +620,14 @@ describe.skip("live serve tests", () => {
const { eventData, timeout } = await context.waitForEvent(
"single wait for event",
eventId,
1
{ timeout: 1 }
);
expect(eventData).toBeUndefined();
expect(timeout).toBeTrue();

const [runResponse, waitResponse] = await Promise.all([
context.run("run-step", () => runResult),
context.waitForEvent("wait-event-step", eventId, 3),
context.waitForEvent("wait-event-step", eventId, { timeout: 3 }),
]);
expect(runResponse).toBe(runResult);
expect(waitResponse.timeout).toBe(expectedWaitResponse.timeout);
Expand Down
2 changes: 1 addition & 1 deletion src/serve/serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ describe("serve", () => {
const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []);
const { handler: endpoint } = serve(
async (context) => {
await context.waitForEvent("waiting step", "wait-event-id", "10d");
await context.waitForEvent("waiting step", "wait-event-id", { timeout: "10d" });
},
{
qstashClient,
Expand Down
19 changes: 18 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,21 @@ export type CallResponse<TResult = unknown> = {
header: Record<string, string[]>;
};

export type Duration = `${bigint}s` | `${bigint}m` | `${bigint}h` | `${bigint}d`;
/**
* Valid duration string formats
* @example "30s" // 30 seconds
* @example "5m" // 5 minutes
* @example "2h" // 2 hours
* @example "1d" // 1 day
*/
export type Duration = `${bigint}${"s" | "m" | "h" | "d"}`;

export interface WaitEventOptions {
/**
* Duration in seconds to wait for an event before timing out the workflow.
* @example 300 // 5 minutes in seconds
* @example "5m" // 5 minutes as duration string
* @default "7d"
*/
timeout?: number | Duration;
}
Loading