diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index d894c314c24..688ffecb9c2 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -58,13 +58,29 @@ def hello_act(ctx: WorkflowActivityContext, input): -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. +Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the `WorkflowActivityContext` class, which implements the workflow activities. ```javascript +export default class WorkflowActivityContext { + private readonly _innerContext: ActivityContext; + constructor(innerContext: ActivityContext) { + if (!innerContext) { + throw new Error("ActivityContext cannot be undefined"); + } + this._innerContext = innerContext; + } + public getWorkflowInstanceId(): string { + return this._innerContext.orchestrationId; + } + + public getWorkflowActivityId(): number { + return this._innerContext.taskId; + } +} ``` -[See the workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) +[See the workflow activity in context.](todo) {{% /codetab %}} @@ -214,13 +230,43 @@ def hello_world_wf(ctx: DaprWorkflowContext, input): -The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. +Next, register the workflow with the `WorkflowRuntime` class and start the workflow runtime. ```javascript +export default class WorkflowRuntime { + + //.. + // Register workflow + public registerWorkflow(workflow: TWorkflow): WorkflowRuntime { + const name = getFunctionName(workflow); + const workflowWrapper = (ctx: OrchestrationContext, input: any): any => { + const workflowContext = new WorkflowContext(ctx); + return workflow(workflowContext, input); + }; + this.worker.addNamedOrchestrator(name, workflowWrapper); + return this; + } + + // Register workflow activities + public registerActivity(fn: TWorkflowActivity): WorkflowRuntime { + const name = getFunctionName(fn); + const activityWrapper = (ctx: ActivityContext, intput: TInput): TOutput => { + const wfActivityContext = new WorkflowActivityContext(ctx); + return fn(wfActivityContext, intput); + }; + this.worker.addNamedActivity(name, activityWrapper); + return this; + } + + // Start the workflow runtime processing items and block. + public async start() { + await this.worker.start(); + } +} ``` -[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) +[See the `hello_world_wf` workflow in context.](todo) {{% /codetab %}} @@ -400,7 +446,7 @@ if __name__ == '__main__': -[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic JavaScript hello world application using the Go SDK, your project code would include: +[In the following example](todo), for a basic JavaScript hello world application using the Go SDK, your project code would include: - A JavaScript package called `todo` to receive the Go SDK capabilities. - A builder with extensions called: @@ -409,8 +455,174 @@ if __name__ == '__main__': - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) - API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. -```go +```javascript +import { TaskHubGrpcClient } from "kaibocai-durabletask-js"; +import * as grpc from "@grpc/grpc-js"; +import { WorkflowState } from "./WorkflowState"; +import { generateInterceptors } from "../internal/ApiTokenClientInterceptor"; +import { TWorkflow } from "../types/Workflow.type"; +import { getFunctionName } from "../internal"; + +export default class WorkflowClient { + private readonly _innerClient: TaskHubGrpcClient; + + /** + * Initializes a new instance of the DaprWorkflowClient. + * @param {string | undefined} hostAddress - The address of the Dapr runtime hosting the workflow services. + * @param {grpc.ChannelOptions | undefined} options - Additional options for configuring the gRPC channel. + */ + constructor(hostAddress?: string, options?: grpc.ChannelOptions) { + this._innerClient = this._buildInnerClient(hostAddress, options); + } + + _buildInnerClient(hostAddress = "127.0.0.1:50001", options: grpc.ChannelOptions = {}): TaskHubGrpcClient { + const innerOptions = { + ...options, + interceptors: [generateInterceptors(), ...(options?.interceptors ?? [])], + }; + return new TaskHubGrpcClient(hostAddress, innerOptions); + } + /** + * Schedules a new workflow using the DurableTask client. + * + * @param {TWorkflow | string} workflow - The Workflow or the name of the workflow to be scheduled. + * @return {Promise} A Promise resolving to the unique ID of the scheduled workflow instance. + */ + public async scheduleNewWorkflow( + workflow: TWorkflow | string, + input?: any, + instanceId?: string, + startAt?: Date, + ): Promise { + if (typeof workflow === "string") { + return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt); + } + return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt); + } + + /** + * Terminates the workflow associated with the provided instance id. + * + * @param {string} workflowInstanceId - Workflow instance id to terminate. + * @param {any} output - The optional output to set for the terminated workflow instance. + */ + public async terminateWorkflow(workflowInstanceId: string, output: any) { + await this._innerClient.terminateOrchestration(workflowInstanceId, output); + } + + /** + * Fetches workflow instance metadata from the configured durable store. + * + * @param {string} workflowInstanceId - The unique identifier of the workflow instance to fetch. + * @param {boolean} getInputsAndOutputs - Indicates whether to fetch the workflow instance's + * inputs, outputs, and custom status (true) or omit them (false). + * @returns {Promise} A Promise that resolves to a metadata record describing + * the workflow instance and its execution status, or undefined + * if the instance is not found. + */ + public async getWorkflowState( + workflowInstanceId: string, + getInputsAndOutputs: boolean, + ): Promise { + const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs); + if (state !== undefined) { + return new WorkflowState(state); + } + } + + /** + * Waits for a workflow to start running and returns a {@link WorkflowState} object + * containing metadata about the started instance, and optionally, its input, output, + * and custom status payloads. + * + * A "started" workflow instance refers to any instance not in the Pending state. + * + * If a workflow instance is already running when this method is called, it returns immediately. + * + * @param {string} workflowInstanceId - The unique identifier of the workflow instance to wait for. + * @param {boolean} fetchPayloads - Indicates whether to fetch the workflow instance's + * inputs, outputs (true) or omit them (false). + * @param {number} timeout - The amount of time, in seconds, to wait for the workflow instance to start. + * @returns {Promise} A Promise that resolves to the workflow instance metadata + * or undefined if no such instance is found. + */ + public async waitForWorkflowStart( + workflowInstanceId: string, + fetchPayloads?: boolean, + timeout?: number, + ): Promise { + const state = await this._innerClient.waitForOrchestrationStart(workflowInstanceId, fetchPayloads, timeout); + if (state !== undefined) { + return new WorkflowState(state); + } + } + + /** + * Waits for a workflow to complete running and returns a {@link WorkflowState} object + * containing metadata about the completed instance, and optionally, its input, output, + * and custom status payloads. + * + * A "completed" workflow instance refers to any instance in one of the terminal states. + * For example, the Completed, Failed, or Terminated states. + * + * If a workflow instance is already running when this method is called, it returns immediately. + * + * @param {string} workflowInstanceId - The unique identifier of the workflow instance to wait for. + * @param {boolean} fetchPayloads - Indicates whether to fetch the workflow instance's + * inputs, outputs (true) or omit them (false). + * @param {number} timeout - The amount of time, in seconds, to wait for the workflow instance to start. + * @returns {Promise} A Promise that resolves to the workflow instance metadata + * or undefined if no such instance is found. + */ + public async waitForWorkflowCompletion( + workflowInstanceId: string, + fetchPayloads = true, + timeout: number, + ): Promise { + const state = await this._innerClient.waitForOrchestrationCompletion(workflowInstanceId, fetchPayloads, timeout); + if (state != undefined) { + return new WorkflowState(state); + } + } + + /** + * Sends an event notification message to an awaiting workflow instance. + * + * This method triggers the specified event in a running workflow instance, + * allowing the workflow to respond to the event if it has defined event handlers. + * + * @param {string} workflowInstanceId - The unique identifier of the workflow instance that will handle the event. + * @param {string} eventName - The name of the event. Event names are case-insensitive. + * @param {any} [eventPayload] - An optional serializable data payload to include with the event. + */ + public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) { + this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload); + } + + /** + * Purges the workflow instance state from the workflow state store. + * + * This method removes the persisted state associated with a workflow instance from the state store. + * + * @param {string} workflowInstanceId - The unique identifier of the workflow instance to purge. + * @return {Promise} A Promise that resolves to true if the workflow state was found and purged successfully, otherwise false. + */ + public async purgeWorkflow(workflowInstanceId: string): Promise { + const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId); + if (purgeResult !== undefined) { + return purgeResult.deletedInstanceCount > 0; + } + return false; + } + + /** + * Closes the inner DurableTask client and shutdown the GRPC channel. + */ + public async stop() { + await this._innerClient.stop(); + } +} ``` {{% /codetab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 51356b35478..74dd1aa48f1 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -76,7 +76,60 @@ def error_handler(ctx, error): ```javascript +import WorkflowClient from "../client/WorkflowClient"; +import WorkflowActivityContext from "../runtime/WorkflowActivityContext"; +import WorkflowContext from "../runtime/WorkflowContext"; +import WorkflowRuntime from "../runtime/WorkflowRuntime"; +import { TWorkflow } from "../types/Workflow.type"; + +(async () => { + const grpcEndpoint = "localhost:4001"; + const workflowClient = new WorkflowClient(grpcEndpoint); + const workflowRuntime = new WorkflowRuntime(grpcEndpoint); + + const hello = async (_: WorkflowActivityContext, name: string) => { + return `Hello ${name}!`; + }; + + const sequence: TWorkflow = async function* (ctx: WorkflowContext): any { + const cities: string[] = []; + + const result1 = yield ctx.callActivity(hello, "Tokyo"); + cities.push(result1); + const result2 = yield ctx.callActivity(hello, "Seattle"); // Correct the spelling of "Seattle" + cities.push(result2); + const result3 = yield ctx.callActivity(hello, "London"); + cities.push(result3); + + return cities; + }; + + workflowRuntime.registerWorkflow(sequence).registerActivity(hello); + + // Wrap the worker startup in a try-catch block to handle any errors during startup + try { + await workflowRuntime.start(); + console.log("Workflow runtime started successfully"); + } catch (error) { + console.error("Error starting workflow runtime:", error); + } + + // Schedule a new orchestration + try { + const id = await workflowClient.scheduleNewWorkflow(sequence); + console.log(`Orchestration scheduled with ID: ${id}`); + + // Wait for orchestration completion + const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30); + + console.log(`Orchestration completed! Result: ${state?.serializedOutput}`); + } catch (error) { + console.error("Error scheduling or waiting for orchestration:", error); + } + await workflowRuntime.stop(); + await workflowClient.stop(); +})(); ``` {{% /codetab %}} @@ -241,7 +294,87 @@ def process_results(ctx, final_result: int): ```javascript +import { Task } from "kaibocai-durabletask-js/task/task"; +import WorkflowClient from "../client/WorkflowClient"; +import WorkflowActivityContext from "../runtime/WorkflowActivityContext"; +import WorkflowContext from "../runtime/WorkflowContext"; +import WorkflowRuntime from "../runtime/WorkflowRuntime"; +import { TWorkflow } from "../types/Workflow.type"; + +// Wrap the entire code in an immediately-invoked async function +(async () => { + // Update the gRPC client and worker to use a local address and port + const grpcServerAddress = "localhost:4001"; + const workflowClient: WorkflowClient = new WorkflowClient(grpcServerAddress); + const workflowRuntime: WorkflowRuntime = new WorkflowRuntime(grpcServerAddress); + + function getRandomInt(min: number, max: number): number { + return Math.floor(Math.random() * (max - min + 1)) + min; + } + + async function getWorkItemsActivity(_: WorkflowActivityContext): Promise { + const count: number = getRandomInt(2, 10); + console.log(`generating ${count} work items...`); + + const workItems: string[] = Array.from({ length: count }, (_, i) => `work item ${i}`); + return workItems; + } + + function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise { + console.log(`processing work item: ${item}`); + + // Simulate some work that takes a variable amount of time + const sleepTime = Math.random() * 5000; + await sleep(sleepTime); + + // Return a result for the given work item, which is also a random number in this case + return Math.floor(Math.random() * 11); + } + + const workflow: TWorkflow = async function* (ctx: WorkflowContext): any { + const tasks: Task[] = []; + const workItems = yield ctx.callActivity(getWorkItemsActivity); + for (const workItem of workItems) { + tasks.push(ctx.callActivity(processWorkItemActivity, workItem)); + } + const results: number[] = yield ctx.whenAll(tasks); + const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0); + return sum; + }; + + workflowRuntime.registerWorkflow(workflow); + workflowRuntime.registerActivity(getWorkItemsActivity); + workflowRuntime.registerActivity(processWorkItemActivity); + + // Wrap the worker startup in a try-catch block to handle any errors during startup + try { + await workflowRuntime.start(); + console.log("Worker started successfully"); + } catch (error) { + console.error("Error starting worker:", error); + } + + // Schedule a new orchestration + try { + const id = await workflowClient.scheduleNewWorkflow(workflow); + console.log(`Orchestration scheduled with ID: ${id}`); + + // Wait for orchestration completion + const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30); + console.log(`Orchestration completed! Result: ${state?.serializedOutput}`); + } catch (error) { + console.error("Error scheduling or waiting for orchestration:", error); + } + + // stop worker and client + await workflowRuntime.stop(); + await workflowClient.stop(); +})(); ``` {{% /codetab %}} @@ -634,7 +767,116 @@ def place_order(_, order: Order) -> None: ```javascript +import { Task } from "kaibocai-durabletask-js/task/task"; +import WorkflowClient from "../client/WorkflowClient"; +import WorkflowActivityContext from "../runtime/WorkflowActivityContext"; +import WorkflowContext from "../runtime/WorkflowContext"; +import WorkflowRuntime from "../runtime/WorkflowRuntime"; +import { TWorkflow } from "../types/Workflow.type"; +import * as readlineSync from "readline-sync"; + +// Wrap the entire code in an immediately-invoked async function +(async () => { + class Order { + cost: number; + product: string; + quantity: number; + constructor(cost: number, product: string, quantity: number) { + this.cost = cost; + this.product = product; + this.quantity = quantity; + } + } + + function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + // Update the gRPC client and worker to use a local address and port + const grpcServerAddress = "localhost:4001"; + let workflowClient: WorkflowClient = new WorkflowClient(grpcServerAddress); + let workflowRuntime: WorkflowRuntime = new WorkflowRuntime(grpcServerAddress); + + //Activity function that sends an approval request to the manager + const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => { + // Simulate some work that takes an amount of time + await sleep(3000); + console.log(`Sending approval request for order: ${order.product}`); + }; + + // Activity function that places an order + const placeOrder = async (_: WorkflowActivityContext, order: Order) => { + console.log(`Placing order: ${order.product}`); + }; + + // Orchestrator function that represents a purchase order workflow + const purchaseOrderWorkflow: TWorkflow = async function* (ctx: WorkflowContext, order: Order): any { + // Orders under $1000 are auto-approved + if (order.cost < 1000) { + return "Auto-approved"; + } + + // Orders of $1000 or more require manager approval + yield ctx.callActivity(sendApprovalRequest, order); + + // Approvals must be received within 24 hours or they will be cancled. + const tasks: Task[] = []; + const approvalEvent = ctx.waitForExternalEvent("approval_received"); + const timeoutEvent = ctx.createTimer(24 * 60 * 60); + tasks.push(approvalEvent); + tasks.push(timeoutEvent); + const winner = ctx.whenAny(tasks); + + if (winner == timeoutEvent) { + return "Cancelled"; + } + + yield ctx.callActivity(placeOrder, order); + const approvalDetails = approvalEvent.getResult(); + return `Approved by ${approvalDetails.approver}`; + }; + + workflowRuntime + .registerWorkflow(purchaseOrderWorkflow) + .registerActivity(sendApprovalRequest) + .registerActivity(placeOrder); + + // Wrap the worker startup in a try-catch block to handle any errors during startup + try { + await workflowRuntime.start(); + console.log("Worker started successfully"); + } catch (error) { + console.error("Error starting worker:", error); + } + + // Schedule a new orchestration + try { + const cost = readlineSync.questionInt("Cost of your order:"); + const approver = readlineSync.question("Approver of your order:"); + const timeout = readlineSync.questionInt("Timeout for your order in seconds:"); + const order = new Order(cost, "MyProduct", 1); + const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order); + console.log(`Orchestration scheduled with ID: ${id}`); + + if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) { + const approvalEvent = { approver: approver }; + await workflowClient.raiseEvent(id, "approval_received", approvalEvent); + } else { + return "Order rejected"; + } + + // Wait for orchestration completion + const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2); + + console.log(`Orchestration completed! Result: ${state?.serializedOutput}`); + } catch (error) { + console.error("Error scheduling or waiting for orchestration:", error); + } + // stop worker and client + await workflowRuntime.stop(); + await workflowClient.stop(); +})(); ``` {{% /codetab %}}