Skip to content

Commit

Permalink
Add client.trigger (#18)
Browse files Browse the repository at this point in the history
* feat: add client.trigger

* fix: prefix user wf id with wfr_

* fix: fmt

* fix: test

* fix: add usage example

* fix: auth route
  • Loading branch information
CahidArda authored Nov 11, 2024
1 parent c43230d commit b7e6116
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 45 deletions.
11 changes: 7 additions & 4 deletions examples/nextjs/app/-call-qstash/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Client } from '@upstash/qstash'
import { Client as WorkflowClient } from '@upstash/workflow'
import { NextRequest } from 'next/server'

const client = new Client({
const client = new WorkflowClient({
baseUrl: process.env.QSTASH_URL!,
token: process.env.QSTASH_TOKEN!,
})
Expand All @@ -20,12 +20,15 @@ export const POST = async (request: NextRequest) => {
process.env.UPSTASH_WORKFLOW_URL ??
request.url.replace('/-call-qstash', '')

const { messageId } = await client.publishJSON({
const { workflowRunId } = await client.trigger({
url: `${baseUrl}/${route}`,
body: payload,
headers: {
"test": "value"
}
})

return new Response(JSON.stringify({ messageId }), { status: 200 })
return new Response(JSON.stringify({ workflowRunId }), { status: 200 })
} catch (error) {
return new Response(
JSON.stringify({ error: `Error when publishing to QStash: ${error}` }),
Expand Down
2 changes: 1 addition & 1 deletion examples/nextjs/app/auth/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { serve } from '@upstash/workflow/nextjs'

export const POST = serve<string>(async (context) => {
export const { POST } = serve<string>(async (context) => {
if (context.headers.get('authentication') !== 'Bearer secretPassword') {
console.error('Authentication failed.')
return
Expand Down
39 changes: 37 additions & 2 deletions src/client/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, test } from "bun:test";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils";
import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../test-utils";
import { Client } from ".";
import { nanoid } from "../utils";
import { getWorkflowRunId, nanoid } from "../utils";

describe("workflow client", () => {
const token = nanoid();
Expand Down Expand Up @@ -44,4 +44,39 @@ describe("workflow client", () => {
},
});
});

test("should trigger workflow run", async () => {
const myWorkflowRunId = `mock-${getWorkflowRunId()}`;
const body = "request-body";
await mockQStashServer({
execute: async () => {
await client.trigger({
url: WORKFLOW_ENDPOINT,
body,
headers: { "user-header": "user-header-value" },
workflowRunId: myWorkflowRunId,
retries: 15,
});
},
responseFields: {
status: 200,
body: "msgId",
},
receivesRequest: {
method: "POST",
url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}`,
token,
body,
headers: {
"upstash-forward-upstash-workflow-sdk-version": "1",
"upstash-forward-user-header": "user-header-value",
"upstash-method": "POST",
"upstash-retries": "15",
"upstash-workflow-init": "true",
"upstash-workflow-runid": `wfr_${myWorkflowRunId}`,
"upstash-workflow-url": "https://www.my-website.com/api",
},
},
});
});
});
63 changes: 63 additions & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { NotifyResponse, Waiter } from "../types";
import { Client as QStashClient } from "@upstash/qstash";
import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
import { getWorkflowRunId } from "../utils";
import { triggerFirstInvocation } from "../workflow-requests";
import { WorkflowContext } from "../context";
import { DEFAULT_RETRIES } from "../constants";

type ClientConfig = ConstructorParameters<typeof QStashClient>[0];

Expand Down Expand Up @@ -88,4 +92,63 @@ export class Client {
public async getWaiters({ eventId }: { eventId: string }): Promise<Required<Waiter>[]> {
return await makeGetWaitersRequest(this.client.http, eventId);
}

/**
* Trigger new workflow run and returns the workflow run id
*
* ```ts
* const { workflowRunId } await client.trigger({
* url: "https://workflow-endpoint.com",
* body: "hello there!", // optional body
* headers: { ... }, // optional headers
* workflowRunId: "my-workflow", // optional workflow run id
* retries: 3 // optional retries in the initial request
* })
*
* console.log(workflowRunId)
* // wfr_my-workflow
* ```
*
* @param url URL of the workflow
* @param body body to start the workflow with
* @param headers headers to use in the request
* @param workflowRunId optional workflow run id to use. mind that
* you should pass different workflow run ids for different runs.
* The final workflowRunId will be `wfr_${workflowRunId}`, in
* other words: the workflow run id you pass will be prefixed
* with `wfr_`.
* @param retries retry to use in the initial request. in the rest of
* the workflow, `retries` option of the `serve` will be used.
* @returns workflow run id
*/
public async trigger({
url,
body,
headers,
workflowRunId,
retries,
}: {
url: string;
body?: unknown;
headers?: Record<string, string>;
workflowRunId?: string;
retries?: number;
}): Promise<{ workflowRunId: string }> {
const finalWorkflowRunId = getWorkflowRunId(workflowRunId);
const context = new WorkflowContext({
qstashClient: this.client,
// @ts-expect-error headers type mismatch
headers: new Headers(headers ?? {}),
initialPayload: body,
steps: [],
url,
workflowRunId: finalWorkflowRunId,
});
const result = await triggerFirstInvocation(context, retries ?? DEFAULT_RETRIES);
if (result.isOk()) {
return { workflowRunId: finalWorkflowRunId };
} else {
throw result.error;
}
}
}
2 changes: 1 addition & 1 deletion src/context/steps.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ describe("test steps", () => {
concurrent,
targetStep,
});
})
});

test("should create result step", async () => {
expect(await stepWithDuration.getResultStep(6, stepId)).toEqual({
Expand Down
52 changes: 19 additions & 33 deletions src/serve/serve.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,15 @@ describe("serve", () => {

test("should send waitForEvent", async () => {
const request = getRequest(WORKFLOW_ENDPOINT, "wfr-bar", "my-payload", []);
const { handler: endpoint } = serve(async (context) => {
await context.waitForEvent("waiting step", "wait-event-id", "10d")
}, {
qstashClient,
receiver: undefined,
});
const { handler: endpoint } = serve(
async (context) => {
await context.waitForEvent("waiting step", "wait-event-id", "10d");
},
{
qstashClient,
receiver: undefined,
}
);
let called = false;
await mockQStashServer({
execute: async () => {
Expand All @@ -619,37 +622,20 @@ describe("serve", () => {
},
timeout: "10d",
timeoutHeaders: {
"Content-Type": [
"application/json"
],
"Upstash-Forward-Upstash-Workflow-Sdk-Version": [
"1"
],
"Upstash-Retries": [
"3"
],
"Upstash-Workflow-CallType": [
"step"
],
"Upstash-Workflow-Init": [
"false"
],
"Upstash-Workflow-RunId": [
"wfr-bar"
],
"Upstash-Workflow-Runid": [
"wfr-bar"
],
"Upstash-Workflow-Url": [
WORKFLOW_ENDPOINT
],
"Content-Type": ["application/json"],
"Upstash-Forward-Upstash-Workflow-Sdk-Version": ["1"],
"Upstash-Retries": ["3"],
"Upstash-Workflow-CallType": ["step"],
"Upstash-Workflow-Init": ["false"],
"Upstash-Workflow-RunId": ["wfr-bar"],
"Upstash-Workflow-Runid": ["wfr-bar"],
"Upstash-Workflow-Url": [WORKFLOW_ENDPOINT],
},
timeoutUrl: WORKFLOW_ENDPOINT,
url: WORKFLOW_ENDPOINT,
}
},
},
});
expect(called).toBeTrue();

})
});
});
3 changes: 1 addition & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,5 +292,4 @@ export type CallResponse = {
header: Record<string, string[]>;
};


export type Duration = `${bigint}s` | `${bigint}m` | `${bigint}h` | `${bigint}d`
export type Duration = `${bigint}s` | `${bigint}m` | `${bigint}h` | `${bigint}d`;
16 changes: 16 additions & 0 deletions src/utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { describe, test, expect } from "bun:test";
import { getWorkflowRunId } from "./utils";

describe("getWorkflowRunId", () => {
test("should return random with no id", () => {
const workflowRunId = getWorkflowRunId();
expect(workflowRunId.length).toBe(25);
expect(workflowRunId.slice(0, 4)).toBe("wfr_");
});

test("should return with given id", () => {
const workflowRunId = getWorkflowRunId("my-id");
expect(workflowRunId.length).toBe(9);
expect(workflowRunId).toBe("wfr_my-id");
});
});
4 changes: 4 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ export function nanoid() {
.join("");
}

export function getWorkflowRunId(id?: string): string {
return `wfr_${id ?? nanoid()}`;
}

/**
* When the base64 string has unicode characters, atob doesn't decode
* them correctly since it only outputs ASCII characters. Therefore,
Expand Down
4 changes: 2 additions & 2 deletions src/workflow-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type {
import type { WorkflowLogger } from "./logger";
import { WorkflowContext } from "./context";
import { recreateUserHeaders } from "./workflow-requests";
import { decodeBase64, nanoid } from "./utils";
import { decodeBase64, getWorkflowRunId } from "./utils";

/**
* Gets the request body. If that fails, returns undefined
Expand Down Expand Up @@ -196,7 +196,7 @@ export const validateRequest = (

// get workflow id
const workflowRunId = isFirstInvocation
? `wfr_${nanoid()}`
? getWorkflowRunId()
: (request.headers.get(WORKFLOW_ID_HEADER) ?? "");
if (workflowRunId.length === 0) {
throw new QStashWorkflowError("Couldn't get workflow id from header");
Expand Down

0 comments on commit b7e6116

Please sign in to comment.