From bec3842e94b470c142f0991071b0561598e8b0a4 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 8 Dec 2023 14:44:30 -0500 Subject: [PATCH 01/13] prep for go workflow Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 53 ++++++++- .../workflow/howto-manage-workflow.md | 24 +++- .../workflow/workflow-features-concepts.md | 60 +++++++++- .../workflow/workflow-overview.md | 12 -- .../workflow/workflow-patterns.md | 54 ++++++++- .../quickstarts/workflow-quickstart.md | 108 +++++++++++++++++- 6 files changed, 282 insertions(+), 29 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 98f0df760e9..7fb31d19ff1 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -34,7 +34,7 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si [Workflow activities]({{< ref "workflow-features-concepts.md#workflow-activites" >}}) are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process. -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -165,6 +165,19 @@ public class DemoWorkflowActivity implements WorkflowActivity { {{% /codetab %}} +{{% codetab %}} + + + +Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `DemoWorkflowActivity` class, which implements the workflow activities. + +```go + +``` + +[See the Go SDK workflow activity example in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java) + +{{% /codetab %}} {{< /tabs >}} @@ -172,7 +185,7 @@ public class DemoWorkflowActivity implements WorkflowActivity { Next, register and call the activites in a workflow. -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -267,6 +280,20 @@ public class DemoWorkflowWorker { [See the Java SDK workflow in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) +{{% /codetab %}} + +{{% codetab %}} + + + +Next, register the workflow with the `WorkflowRuntimeBuilder` and start the workflow runtime. + +```go + +``` + +[See the Go SDK workflow in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) + {{% /codetab %}} {{< /tabs >}} @@ -275,7 +302,7 @@ public class DemoWorkflowWorker { Finally, compose the application using the workflow. -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -484,6 +511,25 @@ public class DemoWorkflow extends Workflow { {{% /codetab %}} +{{% codetab %}} + + + +[As in the following example](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java), a hello-world application using the Go SDK and Dapr Workflow would include: + +- A Go package called `todo` to receive the Go SDK client capabilities. +- An import of `todo` +- The `DemoWorkflow` class which extends `Workflow` +- Creating the workflow with input and output. +- API calls. In the example below, these calls start and call the workflow activities. + +```go + +``` + +[See the full Go SDK workflow example in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java) + +{{% /codetab %}} {{< /tabs >}} @@ -506,3 +552,4 @@ Now that you've authored a workflow, learn how to manage it. - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Go example](todo) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index fb7ad9d57c5..50205f23a52 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -12,7 +12,7 @@ Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-v Now that you've [authored the workflow and its activities in your application]({{< ref howto-author-workflow.md >}}), you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the [workflow API reference]({{< ref workflow_api.md >}}). -{{< tabs Python ".NET" Java HTTP >}} +{{< tabs Python ".NET" Java Go HTTP >}} {{% codetab %}} @@ -99,7 +99,7 @@ await daprClient.PurgeWorkflowAsync(orderId, workflowComponent); {{% /codetab %}} - + {{% codetab %}} Manage your workflow within your code. [In the workflow example from the Java SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java), the workflow is registered in the code using the following APIs: @@ -164,6 +164,25 @@ public class DemoWorkflowClient { {{% /codetab %}} + +{{% codetab %}} + +Manage your workflow within your code. [In the workflow example from the Go SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java), the workflow is registered in the code using the following APIs: + +- **scheduleNewWorkflow**: Starts a new workflow instance +- **getInstanceState**: Get information on the status of the workflow +- **waitForInstanceStart**: Pauses or suspends a workflow instance that can later be resumed +- **raiseEvent**: Raises events/tasks for the running workflow instance +- **waitForInstanceCompletion**: Waits for the workflow to complete its tasks +- **purgeInstance**: Removes all metadata related to a specific workflow instance +- **terminateWorkflow**: Terminates the workflow +- **purgeInstance**: Removes all metadata related to a specific workflow + +```go + +``` + +{{% /codetab %}} {{% codetab %}} @@ -244,5 +263,6 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref - [Python example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Go example](todo) - [Workflow API reference]({{< ref workflow_api.md >}}) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md index ce39d4bac96..05e250bdd53 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md @@ -162,7 +162,7 @@ APIs that generate random numbers, random UUIDs, or the current date are _non-de For example, instead of this: -{{< tabs ".NET" Java >}} +{{< tabs ".NET" Java Go >}} {{% codetab %}} @@ -186,11 +186,19 @@ string randomString = GetRandomString(); {{% /codetab %}} +{{% codetab %}} + +```go +// DON'T DO THIS! +``` + +{{% /codetab %}} + {{< /tabs >}} Do this: -{{< tabs ".NET" Java >}} +{{< tabs ".NET" Java Go >}} {{% codetab %}} @@ -214,6 +222,14 @@ String randomString = context.callActivity(GetRandomString.class.getName(), Stri {{% /codetab %}} +{{% codetab %}} + +```go +// Do this!! +``` + +{{% /codetab %}} + {{< /tabs >}} @@ -224,7 +240,7 @@ Instead, workflows should interact with external state _indirectly_ using workfl For example, instead of this: -{{< tabs ".NET" Java >}} +{{< tabs ".NET" Java Go >}} {{% codetab %}} @@ -247,11 +263,19 @@ HttpResponse response = HttpClient.newBuilder().build().send(request, Ht {{% /codetab %}} +{{% codetab %}} + +```go +// DON'T DO THIS! +``` + +{{% /codetab %}} + {{< /tabs >}} Do this: -{{< tabs ".NET" Java >}} +{{< tabs ".NET" Java Go >}} {{% codetab %}} @@ -273,6 +297,14 @@ String data = ctx.callActivity(MakeHttpCall.class, "https://example.com/api/data {{% /codetab %}} +{{% codetab %}} + +```go +// Do this!! +``` + +{{% /codetab %}} + {{< /tabs >}} @@ -285,7 +317,7 @@ Failure to follow this rule could result in undefined behavior. Any background p For example, instead of this: -{{< tabs ".NET" Java >}} +{{< tabs ".NET" Java Go >}} {{% codetab %}} @@ -308,11 +340,19 @@ ctx.createTimer(Duration.ofSeconds(5)).await(); {{% /codetab %}} +{{% codetab %}} + +```go +// DON'T DO THIS! +``` + +{{% /codetab %}} + {{< /tabs >}} Do this: -{{< tabs ".NET" Java >}} +{{< tabs ".NET" Java Go >}} {{% codetab %}} @@ -334,6 +374,14 @@ ctx.createTimer(Duration.ofSeconds(5)).await(); {{% /codetab %}} +{{% codetab %}} + +```go +// Do this!! +``` + +{{% /codetab %}} + {{< /tabs >}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index f5b6dae8b6d..78391522979 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -73,16 +73,6 @@ Learn more about [different types of workflow patterns]({{< ref workflow-pattern The Dapr Workflow _authoring SDKs_ are language-specific SDKs that contain types and functions to implement workflow logic. The workflow logic lives in your application and is orchestrated by the Dapr Workflow engine running in the Dapr sidecar via a gRPC stream. -### Supported SDKs - -You can use the following SDKs to author a workflow. - -| Language stack | Package | -| - | - | -| Python | [dapr-ext-workflow](https://github.com/dapr/python-sdk/tree/master/ext/dapr-ext-workflow) | -| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) | -| Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) | - ## Try out workflows ### Quickstarts and tutorials @@ -103,8 +93,6 @@ Want to skip the quickstarts? Not a problem. You can try out the workflow buildi ## Limitations -With Dapr Workflow in beta stage comes the following limitation(s): - - **State stores:** For the {{% dapr-latest-version cli="true" %}} beta release of Dapr Workflow, using the NoSQL databases as a state store results in limitations around storing internal states. For example, CosmosDB has a maximum single operation item limit of only 100 states in a single request. - **Horizontal scaling:** For the {{% dapr-latest-version cli="true" %}} beta release of Dapr Workflow, if you scale out Dapr sidecars or your application pods to more than 2, then the concurrency of the workflow execution drops. It is recommended to test with 1 or 2 instances, and no more than 2. diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index c7aebca4e9e..c7474322b6e 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -25,7 +25,7 @@ While the pattern is simple, there are many complexities hidden in the implement Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example. -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -160,6 +160,15 @@ public class ChainWorkflow extends Workflow { {{% /codetab %}} +{{% codetab %}} + + +```go + +``` + +{{% /codetab %}} + {{< /tabs >}} As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture. @@ -186,7 +195,7 @@ In addition to the challenges mentioned in [the previous pattern]({{< ref "workf Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example: -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -279,6 +288,15 @@ public class FaninoutWorkflow extends Workflow { {{% /codetab %}} +{{% codetab %}} + + +```go + +``` + +{{% /codetab %}} + {{< /tabs >}} The key takeaways from this example are: @@ -379,7 +397,7 @@ Depending on the business needs, there may be a single monitor or there may be m Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input. -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -512,6 +530,15 @@ public class MonitorWorkflow extends Workflow { {{% /codetab %}} +{{% codetab %}} + + +```go + +``` + +{{% /codetab %}} + {{< /tabs >}} A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling _continue-as-new_. @@ -540,7 +567,7 @@ The following diagram illustrates this flow. The following example code shows how this pattern can be implemented using Dapr Workflow. -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -682,11 +709,20 @@ public class ExternalSystemInteractionWorkflow extends Workflow { {{% /codetab %}} +{{% codetab %}} + + +```go + +``` + +{{% /codetab %}} + {{< /tabs >}} The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example: -{{< tabs Python ".NET" Java >}} +{{< tabs Python ".NET" Java Go >}} {{% codetab %}} @@ -729,6 +765,14 @@ client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); {{% /codetab %}} +{{% codetab %}} + + +```go +``` + +{{% /codetab %}} + {{< /tabs >}} External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API. diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 0a1f2e77900..7b1bdc65600 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -21,7 +21,7 @@ In this guide, you'll: -{{< tabs "Python" ".NET" "Java" >}} +{{< tabs "Python" ".NET" "Java" "Go" >}} {{% codetab %}} @@ -852,6 +852,112 @@ The `Activities` directory holds the four workflow activities used by the workfl {{% /codetab %}} + +{{% codetab %}} + +The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: + +- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: + - You have insufficient inventory + - Your payment couldn't be processed, etc. +- `process_payment_activity`: Processes and authorizes the payment. +- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. +- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. +- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. + +### Step 1: Pre-requisites + +For this example, you will need: + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). +- [Latest version of Go](https://go.dev/dl/). + +- [Docker Desktop](https://www.docker.com/products/docker-desktop) + + +### Step 2: Set up the environment + +Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows). + +```bash +git clone https://github.com/dapr/quickstarts.git +``` + +In a new terminal window, navigate to the `order-processor` directory: + +```bash +cd workflows/go/sdk/order-processor +``` + +Install the Dapr Go SDK dependencies: + +```bash +go build . +``` + +### Step 3: Run the order processor app + +In the terminal, start the order processor app alongside a Dapr sidecar: + +```bash +dapr run +``` + +This starts the `order-processor` app with unique workflow ID and runs the workflow activities. + +Expected output: + +```bash +need +``` + +### (Optional) Step 4: View in Zipkin + +Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command: + +``` +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + + +### What happened? + +When you ran `dapr run`: + +1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +1. Your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The workflow terminates as completed. + +#### `order-processor/app.go` + +In the application's program file: +- The unique workflow order ID is generated +- The workflow is scheduled +- The workflow status is retrieved +- The workflow and the workflow activities it invokes are registered + +```go +need +``` + +#### `order-processor/workflow.go` + +In `workflow.go`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```go +need +``` +{{% /codetab %}} + + {{< /tabs >}} ## Tell us what you think! From a7e270e02f6aabbfe45690ad3c03a1c6fccb8a73 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Tue, 2 Jan 2024 16:11:13 -0500 Subject: [PATCH 02/13] add early code examples and cross links Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 88 +++++++++++++++++-- .../workflow/howto-manage-workflow.md | 57 ++++++++++-- .../workflow/workflow-architecture.md | 3 +- .../workflow/workflow-features-concepts.md | 6 +- .../workflow/workflow-overview.md | 1 + .../workflow/workflow-patterns.md | 3 +- 6 files changed, 141 insertions(+), 17 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 7fb31d19ff1..648d769a2fa 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -169,13 +169,40 @@ public class DemoWorkflowActivity implements WorkflowActivity { -Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `DemoWorkflowActivity` class, which implements the workflow activities. +Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `callActivityOptions` class, which implements the workflow activities. ```go +type ActivityContext struct { + ctx task.ActivityContext +} + +func (wfac *ActivityContext) GetInput(v interface{}) error { + return wfac.ctx.GetInput(&v) +} + +func (wfac *ActivityContext) Context() context.Context { + return wfac.ctx.Context() +} +type callActivityOption func(*callActivityOptions) error + +type callActivityOptions struct { + rawInput *wrapperspb.StringValue +} + +func WithActivityInput(input any) callActivityOption { + return func(opt *callActivityOptions) error { + data, err := marshalData(input) + if err != nil { + return err + } + opt.rawInput = wrapperspb.String(string(data)) + return nil + } +} ``` -[See the Go SDK workflow activity example in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowActivity.java) +[See the Go SDK workflow activity example in context.](todo) {{% /codetab %}} @@ -289,10 +316,61 @@ public class DemoWorkflowWorker { Next, register the workflow with the `WorkflowRuntimeBuilder` and start the workflow runtime. ```go +package workflow + +// Register workflow +func (wr *WorkflowRuntime) RegisterWorkflow(w Workflow) error { + wrappedOrchestration := wrapWorkflow(w) + // get decorator for workflow + name, err := getDecorator(w) + if err != nil { + return fmt.Errorf("failed to get workflow decorator: %v", err) + } + + err = wr.tasks.AddOrchestratorN(name, wrappedOrchestration) + return err +} + +// Activity wrapper +func wrapActivity(a Activity) task.Activity { + return func(ctx task.ActivityContext) (any, error) { + aCtx := ActivityContext{ctx: ctx} + + return a(aCtx) + } +} + +// Register wrapped activity +func (wr *WorkflowRuntime) RegisterActivity(a Activity) error { + wrappedActivity := wrapActivity(a) + + // get decorator for activity + name, err := getDecorator(a) + if err != nil { + return fmt.Errorf("failed to get activity decorator: %v", err) + } + + err = wr.tasks.AddActivityN(name, wrappedActivity) + return err +} + +// Start workflow runtime +func (wr *WorkflowRuntime) Start() error { + // go func start + go func() { + err := wr.client.StartWorkItemListener(context.Background(), wr.tasks) + if err != nil { + log.Fatalf("failed to start work stream: %v", err) + } + }() + <-wr.quit + + return nil +} ``` -[See the Go SDK workflow in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) +[See the Go SDK workflow in context.](todo) {{% /codetab %}} @@ -515,7 +593,7 @@ public class DemoWorkflow extends Workflow { -[As in the following example](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java), a hello-world application using the Go SDK and Dapr Workflow would include: +[As in the following example](todo), a hello-world application using the Go SDK and Dapr Workflow would include: - A Go package called `todo` to receive the Go SDK client capabilities. - An import of `todo` @@ -527,7 +605,7 @@ public class DemoWorkflow extends Workflow { ``` -[See the full Go SDK workflow example in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflow.java) +[See the full Go SDK workflow example in context.](todo) {{% /codetab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 50205f23a52..9f6021114f6 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -167,19 +167,58 @@ public class DemoWorkflowClient { {{% codetab %}} -Manage your workflow within your code. [In the workflow example from the Go SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java), the workflow is registered in the code using the following APIs: +Manage your workflow within your code. [In the workflow example from the Go SDK](todo), the workflow is registered in the code using the following APIs: -- **scheduleNewWorkflow**: Starts a new workflow instance -- **getInstanceState**: Get information on the status of the workflow -- **waitForInstanceStart**: Pauses or suspends a workflow instance that can later be resumed -- **raiseEvent**: Raises events/tasks for the running workflow instance -- **waitForInstanceCompletion**: Waits for the workflow to complete its tasks -- **purgeInstance**: Removes all metadata related to a specific workflow instance -- **terminateWorkflow**: Terminates the workflow -- **purgeInstance**: Removes all metadata related to a specific workflow +- **StartWorkflow**: Starts a new workflow instance +- **GetWorkflow**: Get information on the status of the workflow +- **PauseWorkflow**: Pauses or suspends a workflow instance that can later be resumed +- **RaiseEventWorkflow**: Raises events/tasks for the running workflow instance +- **ResumeWorkflow**: Waits for the workflow to complete its tasks +- **PurgeWorkflow**: Removes all metadata related to a specific workflow instance +- **TerminateWorkflow**: Terminates the workflow ```go + // StartWorkflowAlpha1 starts a workflow. + StartWorkflowAlpha1(ctx context.Context, req *StartWorkflowRequest) (*StartWorkflowResponse, error) + + // GetWorkflowAlpha1 gets a workflow. + GetWorkflowAlpha1(ctx context.Context, req *GetWorkflowRequest) (*GetWorkflowResponse, error) + + // PurgeWorkflowAlpha1 purges a workflow. + PurgeWorkflowAlpha1(ctx context.Context, req *PurgeWorkflowRequest) error + + // TerminateWorkflowAlpha1 terminates a workflow. + TerminateWorkflowAlpha1(ctx context.Context, req *TerminateWorkflowRequest) error + + // PauseWorkflowAlpha1 pauses a workflow. + PauseWorkflowAlpha1(ctx context.Context, req *PauseWorkflowRequest) error + + // ResumeWorkflowAlpha1 resumes a workflow. + ResumeWorkflowAlpha1(ctx context.Context, req *ResumeWorkflowRequest) error + + // RaiseEventWorkflowAlpha1 raises an event for a workflow. + RaiseEventWorkflowAlpha1(ctx context.Context, req *RaiseEventWorkflowRequest) error + + // StartWorkflowBeta1 starts a workflow. + StartWorkflowBeta1(ctx context.Context, req *StartWorkflowRequest) (*StartWorkflowResponse, error) + + // GetWorkflowBeta1 gets a workflow. + GetWorkflowBeta1(ctx context.Context, req *GetWorkflowRequest) (*GetWorkflowResponse, error) + + // PurgeWorkflowBeta1 purges a workflow. + PurgeWorkflowBeta1(ctx context.Context, req *PurgeWorkflowRequest) error + + // TerminateWorkflowBeta1 terminates a workflow. + TerminateWorkflowBeta1(ctx context.Context, req *TerminateWorkflowRequest) error + + // PauseWorkflowBeta1 pauses a workflow. + PauseWorkflowBeta1(ctx context.Context, req *PauseWorkflowRequest) error + + // ResumeWorkflowBeta1 resumes a workflow. + ResumeWorkflowBeta1(ctx context.Context, req *ResumeWorkflowRequest) error + // RaiseEventWorkflowBeta1 raises an event for a workflow. + RaiseEventWorkflowBeta1(ctx context.Context, req *RaiseEventWorkflowRequest) error ``` {{% /codetab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md index 18ec9110b30..9887f2826aa 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md @@ -196,4 +196,5 @@ See the [Reminder usage and execution guarantees section]({{< ref "workflow-arch - Try out the following examples: - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) \ No newline at end of file + - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Go example](todo) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md index 05e250bdd53..518317af492 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md @@ -411,4 +411,8 @@ To work around these constraints: - [Try out Dapr Workflow using the quickstart]({{< ref workflow-quickstart.md >}}) - [Workflow overview]({{< ref workflow-overview.md >}}) - [Workflow API reference]({{< ref workflow_api.md >}}) -- [Try out the .NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) +- Try out the following examples: + - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) + - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) + - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Go example](todo) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index 78391522979..c19f417d044 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -114,3 +114,4 @@ Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Go example](todo) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index c7474322b6e..1ac4f70f2d5 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -789,4 +789,5 @@ External events don't have to be directly triggered by humans. They can also be - Try out the following examples: - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) \ No newline at end of file + - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) + - [Go example](todo) From 1934dcac52041dd673a3c552be3e966f27cb2ea4 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Thu, 1 Feb 2024 13:22:26 -0500 Subject: [PATCH 03/13] update go snippets Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 59 +++++++------ .../workflow/howto-manage-workflow.md | 86 ++++++++++++------- .../workflow/workflow-overview.md | 1 + 3 files changed, 87 insertions(+), 59 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 648d769a2fa..207cc9c009a 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -169,7 +169,7 @@ public class DemoWorkflowActivity implements WorkflowActivity { -Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `callActivityOptions` class, which implements the workflow activities. +Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `callActivityOptions` method, which implements the workflow activities. ```go type ActivityContext struct { @@ -190,13 +190,22 @@ type callActivityOptions struct { rawInput *wrapperspb.StringValue } -func WithActivityInput(input any) callActivityOption { - return func(opt *callActivityOptions) error { +// ActivityInput is an option to pass a JSON-serializable input +func ActivityInput(input any) callActivityOption { + return func(opts *callActivityOptions) error { data, err := marshalData(input) if err != nil { return err } - opt.rawInput = wrapperspb.String(string(data)) + opts.rawInput = wrapperspb.String(string(data)) + return nil + } +} + +// ActivityRawInput is an option to pass a byte slice as an input +func ActivityRawInput(input string) callActivityOption { + return func(opts *callActivityOptions) error { + opts.rawInput = wrapperspb.String(input) return nil } } @@ -313,22 +322,22 @@ public class DemoWorkflowWorker { -Next, register the workflow with the `WorkflowRuntimeBuilder` and start the workflow runtime. +Next, register the workflow and workflow activities and start the workflow runtime. ```go package workflow -// Register workflow -func (wr *WorkflowRuntime) RegisterWorkflow(w Workflow) error { +// RegisterWorkflow adds a workflow function to the registry +func (ww *WorkflowWorker) RegisterWorkflow(w Workflow) error { wrappedOrchestration := wrapWorkflow(w) - // get decorator for workflow - name, err := getDecorator(w) + // get the function name for the passed workflow + name, err := getFunctionName(w) if err != nil { return fmt.Errorf("failed to get workflow decorator: %v", err) } - err = wr.tasks.AddOrchestratorN(name, wrappedOrchestration) + err = ww.tasks.AddOrchestratorN(name, wrappedOrchestration) return err } @@ -341,31 +350,29 @@ func wrapActivity(a Activity) task.Activity { } } -// Register wrapped activity -func (wr *WorkflowRuntime) RegisterActivity(a Activity) error { +// RegisterActivity adds an activity function to the registry +func (ww *WorkflowWorker) RegisterActivity(a Activity) error { wrappedActivity := wrapActivity(a) - // get decorator for activity - name, err := getDecorator(a) + // get the function name for the passed activity + name, err := getFunctionName(a) if err != nil { return fmt.Errorf("failed to get activity decorator: %v", err) } - err = wr.tasks.AddActivityN(name, wrappedActivity) + err = ww.tasks.AddActivityN(name, wrappedActivity) return err } -// Start workflow runtime -func (wr *WorkflowRuntime) Start() error { - // go func start - go func() { - err := wr.client.StartWorkItemListener(context.Background(), wr.tasks) - if err != nil { - log.Fatalf("failed to start work stream: %v", err) - } - }() - <-wr.quit - +// Start initialises a non-blocking worker to handle workflows and activities registered +// prior to this being called. +func (ww *WorkflowWorker) Start() error { + ctx, cancel := context.WithCancel(context.Background()) + ww.cancel = cancel + if err := ww.client.StartWorkItemListener(ctx, ww.tasks); err != nil { + return fmt.Errorf("failed to start work stream: %v", err) + } + log.Println("work item listener started") return nil } ``` diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 9f6021114f6..6f853611abe 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -178,47 +178,67 @@ Manage your workflow within your code. [In the workflow example from the Go SDK] - **TerminateWorkflow**: Terminates the workflow ```go - // StartWorkflowAlpha1 starts a workflow. - StartWorkflowAlpha1(ctx context.Context, req *StartWorkflowRequest) (*StartWorkflowResponse, error) - - // GetWorkflowAlpha1 gets a workflow. - GetWorkflowAlpha1(ctx context.Context, req *GetWorkflowRequest) (*GetWorkflowResponse, error) - - // PurgeWorkflowAlpha1 purges a workflow. - PurgeWorkflowAlpha1(ctx context.Context, req *PurgeWorkflowRequest) error - - // TerminateWorkflowAlpha1 terminates a workflow. - TerminateWorkflowAlpha1(ctx context.Context, req *TerminateWorkflowRequest) error - - // PauseWorkflowAlpha1 pauses a workflow. - PauseWorkflowAlpha1(ctx context.Context, req *PauseWorkflowRequest) error - - // ResumeWorkflowAlpha1 resumes a workflow. - ResumeWorkflowAlpha1(ctx context.Context, req *ResumeWorkflowRequest) error +// Start workflow +type StartWorkflowRequest struct { + InstanceID string // Optional instance identifier + WorkflowComponent string + WorkflowName string + Options map[string]string // Optional metadata + Input any // Optional input + SendRawInput bool // Set to True in order to disable serialization on the input +} - // RaiseEventWorkflowAlpha1 raises an event for a workflow. - RaiseEventWorkflowAlpha1(ctx context.Context, req *RaiseEventWorkflowRequest) error +type StartWorkflowResponse struct { + InstanceID string +} - // StartWorkflowBeta1 starts a workflow. - StartWorkflowBeta1(ctx context.Context, req *StartWorkflowRequest) (*StartWorkflowResponse, error) +// Get the workflow status +type GetWorkflowRequest struct { + InstanceID string + WorkflowComponent string +} - // GetWorkflowBeta1 gets a workflow. - GetWorkflowBeta1(ctx context.Context, req *GetWorkflowRequest) (*GetWorkflowResponse, error) +type GetWorkflowResponse struct { + InstanceID string + WorkflowName string + CreatedAt time.Time + LastUpdatedAt time.Time + RuntimeStatus string + Properties map[string]string +} - // PurgeWorkflowBeta1 purges a workflow. - PurgeWorkflowBeta1(ctx context.Context, req *PurgeWorkflowRequest) error +// Purge workflow +type PurgeWorkflowRequest struct { + InstanceID string + WorkflowComponent string +} - // TerminateWorkflowBeta1 terminates a workflow. - TerminateWorkflowBeta1(ctx context.Context, req *TerminateWorkflowRequest) error +// Terminate workflow +type TerminateWorkflowRequest struct { + InstanceID string + WorkflowComponent string +} - // PauseWorkflowBeta1 pauses a workflow. - PauseWorkflowBeta1(ctx context.Context, req *PauseWorkflowRequest) error +// Pause workflow +type PauseWorkflowRequest struct { + InstanceID string + WorkflowComponent string +} - // ResumeWorkflowBeta1 resumes a workflow. - ResumeWorkflowBeta1(ctx context.Context, req *ResumeWorkflowRequest) error +// Resume workflow +type ResumeWorkflowRequest struct { + InstanceID string + WorkflowComponent string +} - // RaiseEventWorkflowBeta1 raises an event for a workflow. - RaiseEventWorkflowBeta1(ctx context.Context, req *RaiseEventWorkflowRequest) error +// Raise an event for the running workflow +type RaiseEventWorkflowRequest struct { + InstanceID string + WorkflowComponent string + EventName string + EventData any + SendRawData bool // Set to True in order to disable serialization on the data +} ``` {{% /codetab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index c19f417d044..bf8f1d55a35 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -85,6 +85,7 @@ Want to put workflows to the test? Walk through the following quickstart and tut | [Workflow Python SDK example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) | Learn how to create a Dapr Workflow and invoke it using the Python `DaprClient` package. | | [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. | | [Workflow Java SDK example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) | Learn how to create a Dapr Workflow and invoke it using the Java `io.dapr.workflows` package. | +| [Workflow Go SDK example](todo) | Learn how to create a Dapr Workflow and invoke it using the Go `workflow` package. | ### Start using workflows directly in your app From e49c4360e9bdb73a81c6d716141f3095e3cce28b Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Thu, 1 Feb 2024 14:43:13 -0500 Subject: [PATCH 04/13] add quickstart Signed-off-by: Hannah Hunter --- .../quickstarts/workflow-quickstart.md | 347 +++++++++++++++++- 1 file changed, 330 insertions(+), 17 deletions(-) diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 7b1bdc65600..03fc13433f3 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -855,15 +855,16 @@ The `Activities` directory holds the four workflow activities used by the workfl {{% codetab %}} -The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: -- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: - - You have insufficient inventory - - Your payment couldn't be processed, etc. -- `process_payment_activity`: Processes and authorizes the payment. -- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. -- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. -- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. +The `order-processor` console app starts and manages the `OrderProcessingWorkflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: + +- `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: + - You have insufficient inventory + - Your payment couldn't be processed, etc. +- `ProcessPaymentActivity`: Processes and authorizes the payment. +- `VerifyInventoryActivity`: Checks the state store to ensure there is enough inventory present for purchase. +- `UpdateInventoryActivity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. +- `RequestApprovalActivity`: Seeks approval from the manager if payment is greater than 50,000 USD. ### Step 1: Pre-requisites @@ -900,7 +901,7 @@ go build . In the terminal, start the order processor app alongside a Dapr sidecar: ```bash -dapr run +dapr run -f . ``` This starts the `order-processor` app with unique workflow ID and runs the workflow activities. @@ -908,7 +909,32 @@ This starts the `order-processor` app with unique workflow ID and runs the workf Expected output: ```bash -need +== APP - order-processor == *** Welcome to the Dapr Workflow console app sample! +== APP - order-processor == *** Using this app, you can place orders that start workflows. +== APP - order-processor == dapr client initializing for: 127.0.0.1:50056 +== APP - order-processor == adding base stock item: paperclip +== APP - order-processor == 2024/02/01 12:59:52 work item listener started +== APP - order-processor == INFO: 2024/02/01 12:59:52 starting background processor +== APP - order-processor == adding base stock item: cars +== APP - order-processor == adding base stock item: computers +== APP - order-processor == ==========Begin the purchase of item:========== +== APP - order-processor == NotifyActivity: Received order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 cars - $150000 +== APP - order-processor == VerifyInventoryActivity: Verifying inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 of 10 cars +== APP - order-processor == VerifyInventoryActivity: There are 100 cars available for purchase +== APP - order-processor == RequestApprovalActivity: Requesting approval for payment of 150000USD for 10 cars +== APP - order-processor == NotifyActivity: Payment for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has been approved! +== APP - order-processor == ProcessPaymentActivity: 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 - cars (150000USD) +== APP - order-processor == UpdateInventoryActivity: Checking Inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 * cars +== APP - order-processor == UpdateInventoryActivity: There are now 90 cars left in stock +== APP - order-processor == NotifyActivity: Order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has completed! +== APP - order-processor == Workflow completed - result: COMPLETED +== APP - order-processor == Purchase of item is complete +``` + +Stop the Dapr workflow with `CTRL+C` or: + +```bash +dapr stop -f . ``` ### (Optional) Step 4: View in Zipkin @@ -927,16 +953,16 @@ View the workflow trace spans in the Zipkin web UI (typically at `http://localho When you ran `dapr run`: -1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. A unique order ID for the workflow is generated (in the above example, `48ee83b7-5d80-48d5-97f9-6b372f5480a5`) and the workflow is scheduled. 1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. 1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. 1. Your workflow starts and notifies you of its status. -1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `48ee83b7-5d80-48d5-97f9-6b372f5480a5` and confirms if successful. 1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. -1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `48ee83b7-5d80-48d5-97f9-6b372f5480a5` has completed. 1. The workflow terminates as completed. -#### `order-processor/app.go` +#### `order-processor/main.go` In the application's program file: - The unique workflow order ID is generated @@ -945,15 +971,302 @@ In the application's program file: - The workflow and the workflow activities it invokes are registered ```go -need +func main() { + fmt.Println("*** Welcome to the Dapr Workflow console app sample!") + fmt.Println("*** Using this app, you can place orders that start workflows.") + + // ... + + // Register workflow and activities + if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(NotifyActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(RequestApprovalActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(VerifyInventoryActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(ProcessPaymentActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(UpdateInventoryActivity); err != nil { + log.Fatal(err) + } + + // Build and start workflow runtime, pulling and executing tasks + if err := w.Start(); err != nil { + log.Fatal(err) + } + + daprClient, err := client.NewClient() + if err != nil { + log.Fatalf("failed to initialise dapr client: %v", err) + } + wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient)) + if err != nil { + log.Fatalf("failed to initialise workflow client: %v", err) + } + + // Check inventory + inventory := []InventoryItem{ + {ItemName: "paperclip", PerItemCost: 5, Quantity: 100}, + {ItemName: "cars", PerItemCost: 15000, Quantity: 100}, + {ItemName: "computers", PerItemCost: 500, Quantity: 100}, + } + if err := restockInventory(daprClient, inventory); err != nil { + log.Fatalf("failed to restock: %v", err) + } + + fmt.Println("==========Begin the purchase of item:==========") + + itemName := defaultItemName + orderQuantity := 10 + + totalCost := inventory[1].PerItemCost * orderQuantity + + orderPayload := OrderPayload{ + ItemName: itemName, + Quantity: orderQuantity, + TotalCost: totalCost, + } + + // Start workflow events, like receiving order, verifying inventory, and processing payment + id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload)) + if err != nil { + log.Fatalf("failed to start workflow: %v", err) + } + + // ... + + // Notification that workflow has completed or failed + for { + timeDelta := time.Since(startTime) + metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) + if err != nil { + log.Fatalf("failed to fetch workflow: %v", err) + } + if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) { + fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String()) + break + } + if timeDelta.Seconds() >= 10 { + metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) + if err != nil { + log.Fatalf("failed to fetch workflow: %v", err) + } + if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) { + approvalSought = true + promptForApproval(id) + } + } + // Sleep to not DoS the dapr dev instance + time.Sleep(time.Second) + } + + fmt.Println("Purchase of item is complete") +} + +// Request approval (RequestApprovalActivity) +func promptForApproval(id string) { + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatalf("failed to initialise wfClient: %v", err) + } + if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil { + log.Fatal(err) + } +} + +// Update inventory for remaining stock (UpdateInventoryActivity) +func restockInventory(daprClient client.Client, inventory []InventoryItem) error { + for _, item := range inventory { + itemSerialized, err := json.Marshal(item) + if err != nil { + return err + } + fmt.Printf("adding base stock item: %s\n", item.ItemName) + if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil { + return err + } + } + return nil +} ``` #### `order-processor/workflow.go` -In `workflow.go`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). +In `main.go`, the workflow is defined as a method with all of its associated tasks (determined by workflow activities). ```go -need +// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process. +func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { + orderID := ctx.InstanceID() + var orderPayload OrderPayload + if err := ctx.GetInput(&orderPayload); err != nil { + return nil, err + } + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost)})).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + + var verifyInventoryResult InventoryResult + if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{ + RequestID: orderID, + ItemName: orderPayload.ItemName, + Quantity: orderPayload.Quantity, + })).Await(&verifyInventoryResult); err != nil { + return OrderResult{Processed: false}, err + } + + if !verifyInventoryResult.Success { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)})).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + } + + if orderPayload.TotalCost > 50000 { + var approvalRequired ApprovalRequired + if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil { + return OrderResult{Processed: false}, err + } + if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + // TODO: Confirm timeout flow - this will be in the form of an error. + if approvalRequired.Approval { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a successful order: %v\n", err) + } + } else { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of an unsuccessful order :%v\n", err) + } + return OrderResult{Processed: false}, nil + } + } + if err := ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{ + RequestID: orderID, + ItemBeingPurchased: orderPayload.ItemName, + Amount: orderPayload.TotalCost, + Quantity: orderPayload.Quantity, + })).Await(nil); err != nil { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a failed order: %v", err) + } + return OrderResult{Processed: false}, err + } + if err := ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{RequestID: orderID, ItemBeingPurchased: orderPayload.ItemName, Amount: orderPayload.TotalCost, Quantity: orderPayload.Quantity})).Await(nil); err != nil { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a failed order: %v", err) + } + return OrderResult{Processed: false}, err + } + + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a successful order: %v", err) + } + return OrderResult{Processed: true}, nil +} + +// NotifyActivity outputs a notification message +func NotifyActivity(ctx workflow.ActivityContext) (any, error) { + var input Notification + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("NotifyActivity: %s\n", input.Message) + return nil, nil +} + +// ProcessPaymentActivity is used to process a payment +func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) { + var input PaymentRequest + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("ProcessPaymentActivity: %s for %d - %s (%dUSD)\n", input.RequestID, input.Quantity, input.ItemBeingPurchased, input.Amount) + return nil, nil +} + +// VerifyInventoryActivity is used to verify if an item is available in the inventory +func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) { + var input InventoryRequest + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("VerifyInventoryActivity: Verifying inventory for order %s of %d %s\n", input.RequestID, input.Quantity, input.ItemName) + dClient, err := client.NewClient() + if err != nil { + return nil, err + } + item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemName, nil) + if err != nil { + return nil, err + } + if item == nil { + return InventoryResult{ + Success: false, + InventoryItem: InventoryItem{}, + }, nil + } + var result InventoryItem + if err := json.Unmarshal(item.Value, &result); err != nil { + log.Fatalf("failed to parse inventory result %v", err) + } + fmt.Printf("VerifyInventoryActivity: There are %d %s available for purchase\n", result.Quantity, result.ItemName) + if result.Quantity >= input.Quantity { + return InventoryResult{Success: true, InventoryItem: result}, nil + } + return InventoryResult{Success: false, InventoryItem: InventoryItem{}}, nil +} + +// UpdateInventoryActivity modifies the inventory. +func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) { + var input PaymentRequest + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("UpdateInventoryActivity: Checking Inventory for order %s for %d * %s\n", input.RequestID, input.Quantity, input.ItemBeingPurchased) + dClient, err := client.NewClient() + if err != nil { + return nil, err + } + item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemBeingPurchased, nil) + if err != nil { + return nil, err + } + var result InventoryItem + err = json.Unmarshal(item.Value, &result) + if err != nil { + return nil, err + } + newQuantity := result.Quantity - input.Quantity + if newQuantity < 0 { + return nil, fmt.Errorf("insufficient inventory for: %s", input.ItemBeingPurchased) + } + result.Quantity = newQuantity + newState, err := json.Marshal(result) + if err != nil { + log.Fatalf("failed to marshal new state: %v", err) + } + dClient.SaveState(context.Background(), stateStoreName, input.ItemBeingPurchased, newState, nil) + fmt.Printf("UpdateInventoryActivity: There are now %d %s left in stock\n", result.Quantity, result.ItemName) + return InventoryResult{Success: true, InventoryItem: result}, nil +} + +// RequestApprovalActivity requests approval for the order +func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) { + var input OrderPayload + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName) + return ApprovalRequired{Approval: true}, nil +} ``` {{% /codetab %}} From a45e25f25c714b060cc457b83fdafe6520c99260 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 2 Feb 2024 15:18:16 -0500 Subject: [PATCH 05/13] add supported SDKs Signed-off-by: Hannah Hunter --- .../building-blocks/workflow/workflow-overview.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index bf8f1d55a35..874fde37a5a 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -73,6 +73,17 @@ Learn more about [different types of workflow patterns]({{< ref workflow-pattern The Dapr Workflow _authoring SDKs_ are language-specific SDKs that contain types and functions to implement workflow logic. The workflow logic lives in your application and is orchestrated by the Dapr Workflow engine running in the Dapr sidecar via a gRPC stream. +### Supported SDKs + +You can use the following SDKs to author a workflow. + +| Language stack | Package | +| - | - | +| Python | [dapr-ext-workflow](https://github.com/dapr/python-sdk/tree/master/ext/dapr-ext-workflow) | +| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) | +| Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) | +| Go | [workflow](todo) | + ## Try out workflows ### Quickstarts and tutorials From 0f50934b63bc196cc15b1c9d187137bc8795b9f2 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Mon, 5 Feb 2024 12:43:29 -0500 Subject: [PATCH 06/13] mike initial review Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 438 ++++++++++++++---- .../workflow/workflow-architecture.md | 2 +- .../workflow/workflow-features-concepts.md | 2 +- .../workflow/workflow-overview.md | 6 +- .../workflow/workflow-patterns.md | 2 +- 5 files changed, 354 insertions(+), 96 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 207cc9c009a..276539446f6 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -169,49 +169,21 @@ public class DemoWorkflowActivity implements WorkflowActivity { -Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `callActivityOptions` method, which implements the workflow activities. - +Define each workflow activity you'd like your workflow to perform. The Activity input can be unmarshalled from the context with `ctx.GetInput`. Activities should be defined as taking a `ctx workflow.ActivityContext` parameter and returning an interface and error. + ```go -type ActivityContext struct { - ctx task.ActivityContext -} - -func (wfac *ActivityContext) GetInput(v interface{}) error { - return wfac.ctx.GetInput(&v) -} - -func (wfac *ActivityContext) Context() context.Context { - return wfac.ctx.Context() -} - -type callActivityOption func(*callActivityOptions) error - -type callActivityOptions struct { - rawInput *wrapperspb.StringValue -} - -// ActivityInput is an option to pass a JSON-serializable input -func ActivityInput(input any) callActivityOption { - return func(opts *callActivityOptions) error { - data, err := marshalData(input) - if err != nil { - return err - } - opts.rawInput = wrapperspb.String(string(data)) - return nil - } -} - -// ActivityRawInput is an option to pass a byte slice as an input -func ActivityRawInput(input string) callActivityOption { - return func(opts *callActivityOptions) error { - opts.rawInput = wrapperspb.String(input) - return nil +func TestActivity(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err } + + // Do something here + return "result", nil } ``` -[See the Go SDK workflow activity example in context.](todo) +[See the Go SDK workflow activity example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) {{% /codetab %}} @@ -322,62 +294,30 @@ public class DemoWorkflowWorker { -Next, register the workflow and workflow activities and start the workflow runtime. +Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow. ```go -package workflow - -// RegisterWorkflow adds a workflow function to the registry -func (ww *WorkflowWorker) RegisterWorkflow(w Workflow) error { - wrappedOrchestration := wrapWorkflow(w) - - // get the function name for the passed workflow - name, err := getFunctionName(w) - if err != nil { - return fmt.Errorf("failed to get workflow decorator: %v", err) +func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return nil, err } - - err = ww.tasks.AddOrchestratorN(name, wrappedOrchestration) - return err -} - -// Activity wrapper -func wrapActivity(a Activity) task.Activity { - return func(ctx task.ActivityContext) (any, error) { - aCtx := ActivityContext{ctx: ctx} - - return a(aCtx) + var output string + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err } -} - -// RegisterActivity adds an activity function to the registry -func (ww *WorkflowWorker) RegisterActivity(a Activity) error { - wrappedActivity := wrapActivity(a) - - // get the function name for the passed activity - name, err := getFunctionName(a) - if err != nil { - return fmt.Errorf("failed to get activity decorator: %v", err) + if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil { + return nil, err } - - err = ww.tasks.AddActivityN(name, wrappedActivity) - return err -} - -// Start initialises a non-blocking worker to handle workflows and activities registered -// prior to this being called. -func (ww *WorkflowWorker) Start() error { - ctx, cancel := context.WithCancel(context.Background()) - ww.cancel = cancel - if err := ww.client.StartWorkItemListener(ctx, ww.tasks); err != nil { - return fmt.Errorf("failed to start work stream: %v", err) + + if err := ctx.CreateTimer(time.Second).Await(nil); err != nil { + return nil, nil } - log.Println("work item listener started") - return nil + return output, nil } ``` -[See the Go SDK workflow in context.](todo) +[See the Go SDK workflow in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) {{% /codetab %}} @@ -600,19 +540,337 @@ public class DemoWorkflow extends Workflow { -[As in the following example](todo), a hello-world application using the Go SDK and Dapr Workflow would include: +[As in the following example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md), a hello-world application using the Go SDK and Dapr Workflow would include: -- A Go package called `todo` to receive the Go SDK client capabilities. -- An import of `todo` +- A Go package called `client` to receive the Go SDK client capabilities. +- An import of `workflow` - The `DemoWorkflow` class which extends `Workflow` - Creating the workflow with input and output. - API calls. In the example below, these calls start and call the workflow activities. ```go +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/workflow" +) + +var stage = 0 + +const ( + workflowComponent = "dapr" +) + +func main() { + w, err := workflow.NewWorker() + if err != nil { + log.Fatal(err) + } + + fmt.Println("Worker initialized") + + if err := w.RegisterWorkflow(TestWorkflow); err != nil { + log.Fatal(err) + } + fmt.Println("TestWorkflow registered") + + if err := w.RegisterActivity(TestActivity); err != nil { + log.Fatal(err) + } + fmt.Println("TestActivity registered") + + // Start workflow runner + if err := w.Start(); err != nil { + log.Fatal(err) + } + fmt.Println("runner started") + + daprClient, err := client.NewClient() + if err != nil { + log.Fatalf("failed to intialise client: %v", err) + } + defer daprClient.Close() + ctx := context.Background() + + // Start workflow test + respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + WorkflowName: "TestWorkflow", + Options: nil, + Input: 1, + SendRawInput: false, + }) + if err != nil { + log.Fatalf("failed to start workflow: %v", err) + } + fmt.Printf("workflow started with id: %v\n", respStart.InstanceID) + + // Pause workflow test + err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + + if err != nil { + log.Fatalf("failed to pause workflow: %v", err) + } + + respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil { + log.Fatalf("failed to get workflow: %v", err) + } + + if respGet.RuntimeStatus != workflow.StatusSuspended.String() { + log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus) + } + + fmt.Printf("workflow paused\n") + + // Resume workflow test + err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + + if err != nil { + log.Fatalf("failed to resume workflow: %v", err) + } + + respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil { + log.Fatalf("failed to get workflow: %v", err) + } + + if respGet.RuntimeStatus != workflow.StatusRunning.String() { + log.Fatalf("workflow not running") + } + + fmt.Println("workflow resumed") + + fmt.Printf("stage: %d\n", stage) + + // Raise Event Test + + err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + EventName: "testEvent", + EventData: "testData", + SendRawData: false, + }) + + if err != nil { + fmt.Printf("failed to raise event: %v", err) + } + + fmt.Println("workflow event raised") + + time.Sleep(time.Second) // allow workflow to advance + + fmt.Printf("stage: %d\n", stage) + respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil { + log.Fatalf("failed to get workflow: %v", err) + } + + fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus) + + // Purge workflow test + err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil { + log.Fatalf("failed to purge workflow: %v", err) + } + + respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil && respGet != nil { + log.Fatal("failed to purge workflow") + } + + fmt.Println("workflow purged") + + fmt.Printf("stage: %d\n", stage) + + // Terminate workflow test + respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + WorkflowName: "TestWorkflow", + Options: nil, + Input: 1, + SendRawInput: false, + }) + if err != nil { + log.Fatalf("failed to start workflow: %v", err) + } + + fmt.Printf("workflow started with id: %s\n", respStart.InstanceID) + + err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil { + log.Fatalf("failed to terminate workflow: %v", err) + } + + respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err != nil { + log.Fatalf("failed to get workflow: %v", err) + } + if respGet.RuntimeStatus != workflow.StatusTerminated.String() { + log.Fatal("failed to terminate workflow") + } + + fmt.Println("workflow terminated") + + err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + + respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ + InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", + WorkflowComponent: workflowComponent, + }) + if err == nil || respGet != nil { + log.Fatalf("failed to purge workflow: %v", err) + } + + fmt.Println("workflow purged") + + // WFClient + // TODO: Expand client validation + + stage = 0 + fmt.Println("workflow client test") + + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatalf("[wfclient] faield to initialize: %v", err) + } + + id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) + if err != nil { + log.Fatalf("[wfclient] failed to start workflow: %v", err) + } + + fmt.Printf("[wfclient] started workflow with id: %s\n", id) + + metadata, err := wfClient.FetchWorkflowMetadata(ctx, id) + if err != nil { + log.Fatalf("[wfclient] failed to get worfklow: %v", err) + } + + fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String()) + + if stage != 1 { + log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage) + } + + fmt.Printf("[wfclient] stage: %d\n", stage) + + // TODO: WaitForWorkflowStart + // TODO: WaitForWorkflowCompletion + + // raise event + + if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil { + log.Fatalf("[wfclient] failed to raise event: %v", err) + } + + fmt.Println("[wfclient] event raised") + + // Sleep to allow the workflow to advance + time.Sleep(time.Second) + + if stage != 2 { + log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage) + } + + fmt.Printf("[wfclient] stage: %d\n", stage) + + // stop workflow + if err := wfClient.TerminateWorkflow(ctx, id); err != nil { + log.Fatalf("[wfclient] failed to terminate workflow: %v", err) + } + + fmt.Println("[wfclient] workflow terminated") + + if err := wfClient.PurgeWorkflow(ctx, id); err != nil { + log.Fatalf("[wfclient] failed to purge workflow: %v", err) + } + + fmt.Println("[wfclient] workflow purged") + + // stop workflow runtime + if err := w.Shutdown(); err != nil { + log.Fatalf("failed to shutdown runtime: %v", err) + } + + fmt.Println("workflow worker successfully shutdown") +} + +func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err + } + + err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output) + if err != nil { + return nil, err + } + + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err + } + + return output, nil +} + +func TestActivity(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + + stage += input + + return fmt.Sprintf("Stage: %d", stage), nil +} ``` -[See the full Go SDK workflow example in context.](todo) +[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) {{% /codetab %}} @@ -637,4 +895,4 @@ Now that you've authored a workflow, learn how to manage it. - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) - - [Go example](todo) + - [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md index 9887f2826aa..f787579dc9d 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-architecture.md @@ -197,4 +197,4 @@ See the [Reminder usage and execution guarantees section]({{< ref "workflow-arch - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) - - [Go example](todo) + - [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md index fc8631e5118..6c7ed149341 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-features-concepts.md @@ -467,4 +467,4 @@ To work around these constraints: - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) - - [Go example](todo) + - [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md index 874fde37a5a..9003717a4ad 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-overview.md @@ -82,7 +82,7 @@ You can use the following SDKs to author a workflow. | Python | [dapr-ext-workflow](https://github.com/dapr/python-sdk/tree/master/ext/dapr-ext-workflow) | | .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) | | Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) | -| Go | [workflow](todo) | +| Go | [workflow](https://github.com/dapr/go-sdk/tree/main/client/workflow.go) | ## Try out workflows @@ -96,7 +96,7 @@ Want to put workflows to the test? Walk through the following quickstart and tut | [Workflow Python SDK example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) | Learn how to create a Dapr Workflow and invoke it using the Python `DaprClient` package. | | [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. | | [Workflow Java SDK example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) | Learn how to create a Dapr Workflow and invoke it using the Java `io.dapr.workflows` package. | -| [Workflow Go SDK example](todo) | Learn how to create a Dapr Workflow and invoke it using the Go `workflow` package. | +| [Workflow Go SDK example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) | Learn how to create a Dapr Workflow and invoke it using the Go `workflow` package. | ### Start using workflows directly in your app @@ -126,4 +126,4 @@ Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) - - [Go example](todo) + - [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 1ac4f70f2d5..7657273f137 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -790,4 +790,4 @@ External events don't have to be directly triggered by humans. They can also be - [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) - - [Go example](todo) + - [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) From c5ee2810a3bd48be1d3e765dbb3e5294b9d13d00 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Wed, 14 Feb 2024 14:02:09 -0500 Subject: [PATCH 07/13] update quickstart, start updating patterns Signed-off-by: Hannah Hunter --- .../workflow/workflow-patterns.md | 44 +++- .../quickstarts/workflow-quickstart.md | 196 +----------------- 2 files changed, 53 insertions(+), 187 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 7657273f137..34679a88a36 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -164,7 +164,49 @@ public class ChainWorkflow extends Workflow { ```go - +func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + var result1 int + if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil { + return nil, err + } + var result2 int + if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result2); err != nil { + return nil, err + } + var result3 int + if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result3); err != nil { + return nil, err + } + return []int{result1, result2, result3}, nil +} +func Step1(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("Step 1: Received input: %s", input) + return input + 1, nil +} +func Step2(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("Step 2: Received input: %s", input) + return input * 2, nil +} +func Step3(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("Step 3: Received input: %s", input) + return int(math.Pow(float64(input), 2)), nil +} ``` {{% /codetab %}} diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 03fc13433f3..d20bc49cf3a 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -20,6 +20,7 @@ In this guide, you'll: +Select your preferred language-specific Dapr SDK before proceeding with the Quickstart. {{< tabs "Python" ".NET" "Java" "Go" >}} @@ -68,14 +69,12 @@ pip3 install -r requirements.txt ### Step 3: Run the order processor app -In the terminal, start the order processor app alongside a Dapr sidecar: +In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}): ```bash -dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +dapr run -f . ``` -> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. - This starts the `order-processor` app with unique workflow ID and runs the workflow activities. Expected output: @@ -303,10 +302,10 @@ cd workflows/csharp/sdk/order-processor ### Step 3: Run the order processor app -In the terminal, start the order processor app alongside a Dapr sidecar: +In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}): ```bash -dapr run --app-id order-processor dotnet run +dapr run -f . ``` This starts the `order-processor` app with unique workflow ID and runs the workflow activities. @@ -559,10 +558,10 @@ mvn clean install ### Step 3: Run the order processor app -In the terminal, start the order processor app alongside a Dapr sidecar: +In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}): ```bash -dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp +dapr run -f . ``` This starts the `order-processor` app with unique workflow ID and runs the workflow activities. @@ -856,7 +855,7 @@ The `Activities` directory holds the four workflow activities used by the workfl {{% codetab %}} -The `order-processor` console app starts and manages the `OrderProcessingWorkflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: +The `order-processor` console app starts and manages the `OrderProcessingWorkflow` workflow, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: - `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: - You have insufficient inventory @@ -890,15 +889,9 @@ In a new terminal window, navigate to the `order-processor` directory: cd workflows/go/sdk/order-processor ``` -Install the Dapr Go SDK dependencies: - -```bash -go build . -``` - ### Step 3: Run the order processor app -In the terminal, start the order processor app alongside a Dapr sidecar: +In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}): ```bash dapr run -f . @@ -1097,177 +1090,8 @@ func restockInventory(daprClient client.Client, inventory []InventoryItem) error } ``` -#### `order-processor/workflow.go` - -In `main.go`, the workflow is defined as a method with all of its associated tasks (determined by workflow activities). - -```go -// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process. -func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { - orderID := ctx.InstanceID() - var orderPayload OrderPayload - if err := ctx.GetInput(&orderPayload); err != nil { - return nil, err - } - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost)})).Await(nil); err != nil { - return OrderResult{Processed: false}, err - } - - var verifyInventoryResult InventoryResult - if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{ - RequestID: orderID, - ItemName: orderPayload.ItemName, - Quantity: orderPayload.Quantity, - })).Await(&verifyInventoryResult); err != nil { - return OrderResult{Processed: false}, err - } - - if !verifyInventoryResult.Success { - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)})).Await(nil); err != nil { - return OrderResult{Processed: false}, err - } - } - - if orderPayload.TotalCost > 50000 { - var approvalRequired ApprovalRequired - if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil { - return OrderResult{Processed: false}, err - } - if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil { - return OrderResult{Processed: false}, err - } - // TODO: Confirm timeout flow - this will be in the form of an error. - if approvalRequired.Approval { - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil { - log.Printf("failed to notify of a successful order: %v\n", err) - } - } else { - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil { - log.Printf("failed to notify of an unsuccessful order :%v\n", err) - } - return OrderResult{Processed: false}, nil - } - } - if err := ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{ - RequestID: orderID, - ItemBeingPurchased: orderPayload.ItemName, - Amount: orderPayload.TotalCost, - Quantity: orderPayload.Quantity, - })).Await(nil); err != nil { - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { - log.Printf("failed to notify of a failed order: %v", err) - } - return OrderResult{Processed: false}, err - } - if err := ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{RequestID: orderID, ItemBeingPurchased: orderPayload.ItemName, Amount: orderPayload.TotalCost, Quantity: orderPayload.Quantity})).Await(nil); err != nil { - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { - log.Printf("failed to notify of a failed order: %v", err) - } - return OrderResult{Processed: false}, err - } - - if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil { - log.Printf("failed to notify of a successful order: %v", err) - } - return OrderResult{Processed: true}, nil -} - -// NotifyActivity outputs a notification message -func NotifyActivity(ctx workflow.ActivityContext) (any, error) { - var input Notification - if err := ctx.GetInput(&input); err != nil { - return "", err - } - fmt.Printf("NotifyActivity: %s\n", input.Message) - return nil, nil -} - -// ProcessPaymentActivity is used to process a payment -func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) { - var input PaymentRequest - if err := ctx.GetInput(&input); err != nil { - return "", err - } - fmt.Printf("ProcessPaymentActivity: %s for %d - %s (%dUSD)\n", input.RequestID, input.Quantity, input.ItemBeingPurchased, input.Amount) - return nil, nil -} +Meanwhile, the `OrderProcessingWorkflow` and its activities are defined as methods in [`workflow.go`](https://github.com/dapr/quickstarts/workflows/go/sdk/order-processor/workflow.go) -// VerifyInventoryActivity is used to verify if an item is available in the inventory -func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) { - var input InventoryRequest - if err := ctx.GetInput(&input); err != nil { - return nil, err - } - fmt.Printf("VerifyInventoryActivity: Verifying inventory for order %s of %d %s\n", input.RequestID, input.Quantity, input.ItemName) - dClient, err := client.NewClient() - if err != nil { - return nil, err - } - item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemName, nil) - if err != nil { - return nil, err - } - if item == nil { - return InventoryResult{ - Success: false, - InventoryItem: InventoryItem{}, - }, nil - } - var result InventoryItem - if err := json.Unmarshal(item.Value, &result); err != nil { - log.Fatalf("failed to parse inventory result %v", err) - } - fmt.Printf("VerifyInventoryActivity: There are %d %s available for purchase\n", result.Quantity, result.ItemName) - if result.Quantity >= input.Quantity { - return InventoryResult{Success: true, InventoryItem: result}, nil - } - return InventoryResult{Success: false, InventoryItem: InventoryItem{}}, nil -} - -// UpdateInventoryActivity modifies the inventory. -func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) { - var input PaymentRequest - if err := ctx.GetInput(&input); err != nil { - return nil, err - } - fmt.Printf("UpdateInventoryActivity: Checking Inventory for order %s for %d * %s\n", input.RequestID, input.Quantity, input.ItemBeingPurchased) - dClient, err := client.NewClient() - if err != nil { - return nil, err - } - item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemBeingPurchased, nil) - if err != nil { - return nil, err - } - var result InventoryItem - err = json.Unmarshal(item.Value, &result) - if err != nil { - return nil, err - } - newQuantity := result.Quantity - input.Quantity - if newQuantity < 0 { - return nil, fmt.Errorf("insufficient inventory for: %s", input.ItemBeingPurchased) - } - result.Quantity = newQuantity - newState, err := json.Marshal(result) - if err != nil { - log.Fatalf("failed to marshal new state: %v", err) - } - dClient.SaveState(context.Background(), stateStoreName, input.ItemBeingPurchased, newState, nil) - fmt.Printf("UpdateInventoryActivity: There are now %d %s left in stock\n", result.Quantity, result.ItemName) - return InventoryResult{Success: true, InventoryItem: result}, nil -} - -// RequestApprovalActivity requests approval for the order -func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) { - var input OrderPayload - if err := ctx.GetInput(&input); err != nil { - return nil, err - } - fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName) - return ApprovalRequired{Approval: true}, nil -} -``` {{% /codetab %}} From 6d9a344d363c415cd4e38b357f37e65d0731d00c Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Wed, 14 Feb 2024 14:15:47 -0500 Subject: [PATCH 08/13] add more pattern examples Signed-off-by: Hannah Hunter --- .../workflow/workflow-patterns.md | 120 +++++++++++++++++- 1 file changed, 117 insertions(+), 3 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index e52d7fb0a8f..fde2b514846 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -786,7 +786,51 @@ public class MonitorWorkflow extends Workflow { ```go - +type JobStatus struct { + JobID string `json:"job_id"` + IsHealthy bool `json:"is_healthy"` +} +func StatusMonitorWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var sleepInterval time.Duration + var job JobStatus + if err := ctx.GetInput(&job); err != nil { + return "", err + } + var status string + if err := ctx.CallActivity(CheckStatus, workflow.ActivityInput(job)).Await(&status); err != nil { + return "", err + } + if status == "healthy" { + job.IsHealthy = true + sleepInterval = time.Second * 60 + } else { + if job.IsHealthy { + job.IsHealthy = false + err := ctx.CallActivity(SendAlert, workflow.ActivityInput(fmt.Sprintf("Job '%s' is unhealthy!", job.JobID))).Await(nil) + if err != nil { + return "", err + } + } + sleepInterval = time.Second * 5 + } + if err := ctx.CreateTimer(sleepInterval).Await(nil); err != nil { + return "", err + } + ctx.ContinueAsNew(job, false) + return "", nil +} +func CheckStatus(ctx workflow.ActivityContext) (any, error) { + statuses := []string{"healthy", "unhealthy"} + return statuses[rand.Intn(1)], nil +} +func SendAlert(ctx workflow.ActivityContext) (any, error) { + var message string + if err := ctx.GetInput(&message); err != nil { + return "", err + } + fmt.Printf("*** Alert: %s", message) + return "", nil +} ``` {{% /codetab %}} @@ -1105,7 +1149,55 @@ public class ExternalSystemInteractionWorkflow extends Workflow { ```go - +type Order struct { + Cost float64 `json:"cost"` + Product string `json:"product"` + Quantity int `json:"quantity"` +} +type Approval struct { + Approver string `json:"approver"` +} +func PurchaseOrderWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var order Order + if err := ctx.GetInput(&order); err != nil { + return "", err + } + // Orders under $1000 are auto-approved + if order.Cost < 1000 { + return "Auto-approved", nil + } + // Orders of $1000 or more require manager approval + if err := ctx.CallActivity(SendApprovalRequest, workflow.ActivityInput(order)).Await(nil); err != nil { + return "", err + } + // Approvals must be received within 24 hours or they will be cancelled + var approval Approval + if err := ctx.WaitForExternalEvent("approval_received", time.Hour*24).Await(&approval); err != nil { + // Assuming that a timeout has taken place - in any case; an error. + return "error/cancelled", err + } + // The order was approved + if err := ctx.CallActivity(PlaceOrder, workflow.ActivityInput(order)).Await(nil); err != nil { + return "", err + } + return fmt.Sprintf("Approved by %s", approval.Approver), nil +} +func SendApprovalRequest(ctx workflow.ActivityContext) (any, error) { + var order Order + if err := ctx.GetInput(&order); err != nil { + return "", err + } + fmt.Printf("*** Sending approval request for order: %v\n", order) + return "", nil +} +func PlaceOrder(ctx workflow.ActivityContext) (any, error) { + var order Order + if err := ctx.GetInput(&order); err != nil { + return "", err + } + fmt.Printf("*** Placing order: %v", order) + return "", nil +} ``` {{% /codetab %}} @@ -1137,7 +1229,11 @@ with DaprClient() as d: ```javascript -// Raise the workflow event to the waiting workflow +import { DaprClient } from "@dapr/dapr"; + + public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) { + this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload); + } ``` {{% /codetab %}} @@ -1170,6 +1266,24 @@ client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload"); ```go +func raiseEvent() { + daprClient, err := client.NewClient() + if err != nil { + log.Fatalf("failed to initialize the client") + } + err = daprClient.RaiseEventWorkflowBeta1(context.Background(), &client.RaiseEventWorkflowRequest{ + InstanceID: "instance_id", + WorkflowComponent: "dapr", + EventName: "approval_received", + EventData: Approval{ + Approver: "Jane Doe", + }, + }) + if err != nil { + log.Fatalf("failed to raise event on workflow") + } + log.Println("raised an event on specified workflow") +} ``` {{% /codetab %}} From 8204495bf0282523934207e04976fa89b90e623d Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 16 Feb 2024 12:33:22 -0500 Subject: [PATCH 09/13] add fan in/fan out Signed-off-by: Hannah Hunter --- .../workflow/workflow-patterns.md | 59 ++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index fde2b514846..08016845076 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -516,7 +516,64 @@ public class FaninoutWorkflow extends Workflow { ```go - +func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return 0, err + } + var workBatch []int + if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil { + return 0, err + } + parallelTasks := workflow.NewTaskSlice(len(workBatch)) + for i, workItem := range workBatch { + parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem)) + } + var outputs int + for _, task := range parallelTasks { + var output int + err := task.Await(&output) + if err == nil { + outputs += output + } else { + return 0, err + } + } + if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil { + return 0, err + } + return 0, nil +} +func GetWorkBatch(ctx workflow.ActivityContext) (any, error) { + var batchSize int + if err := ctx.GetInput(&batchSize); err != nil { + return 0, err + } + batch := make([]int, batchSize) + for i := 0; i < batchSize; i++ { + batch[i] = i + } + return batch, nil +} +func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) { + var workItem int + if err := ctx.GetInput(&workItem); err != nil { + return 0, err + } + fmt.Printf("Processing work item: %d\n", workItem) + time.Sleep(time.Second * 5) + result := workItem * 2 + fmt.Printf("Work item %d processed. Result: %d\n", workItem, result) + return result, nil +} +func ProcessResults(ctx workflow.ActivityContext) (any, error) { + var finalResult int + if err := ctx.GetInput(&finalResult); err != nil { + return 0, err + } + fmt.Printf("Final result: %d\n", finalResult) + return finalResult, nil +} ``` {{% /codetab %}} From d9cbcbad3b7b4b5237d8e893bc24e46f6dd1288b Mon Sep 17 00:00:00 2001 From: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Date: Tue, 20 Feb 2024 08:55:21 -0500 Subject: [PATCH 10/13] Update daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md Co-authored-by: Marc Duiker Signed-off-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> --- .../building-blocks/workflow/howto-manage-workflow.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index ae779347ec8..07198da0975 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -394,6 +394,6 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref - [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow) - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) - - [Go example](todo) + - [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow) - [Workflow API reference]({{< ref workflow_api.md >}}) From e36ecc1715285f79a63dc0d68d2669a0a4d04743 Mon Sep 17 00:00:00 2001 From: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Date: Tue, 20 Feb 2024 09:49:41 -0500 Subject: [PATCH 11/13] Update daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md Co-authored-by: Mark Fussell Signed-off-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> --- .../building-blocks/workflow/howto-manage-workflow.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 07198da0975..0b0bad38fc2 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -173,7 +173,7 @@ await daprClient.PurgeWorkflowAsync(orderId, workflowComponent); {{% codetab %}} -Manage your workflow within your code. [In the workflow example from the Java SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java), the workflow is registered in the code using the following APIs: +Manage your workflow within your code. [In the workflow example from the Java SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/), the workflow is registered in the code using the following APIs: - **scheduleNewWorkflow**: Starts a new workflow instance - **getInstanceState**: Get information on the status of the workflow From b9a803827358bcfe77c5544d9ef65518cf2d6a09 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Tue, 20 Feb 2024 10:07:11 -0500 Subject: [PATCH 12/13] updates per mark and marc Signed-off-by: Hannah Hunter --- .../building-blocks/workflow/howto-author-workflow.md | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 525836e7698..2b37739d15a 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -766,8 +766,7 @@ public class DemoWorkflow extends Workflow { [As in the following example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md), a hello-world application using the Go SDK and Dapr Workflow would include: - A Go package called `client` to receive the Go SDK client capabilities. -- An import of `workflow` -- The `DemoWorkflow` class which extends `Workflow` +- The `TestWorkflow` method - Creating the workflow with input and output. - API calls. In the example below, these calls start and call the workflow activities. @@ -987,9 +986,6 @@ func main() { fmt.Println("workflow purged") - // WFClient - // TODO: Expand client validation - stage = 0 fmt.Println("workflow client test") @@ -1018,9 +1014,6 @@ func main() { fmt.Printf("[wfclient] stage: %d\n", stage) - // TODO: WaitForWorkflowStart - // TODO: WaitForWorkflowCompletion - // raise event if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil { From 3c6c9616d577f6bdce00a1a090c208612cd4279c Mon Sep 17 00:00:00 2001 From: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Date: Tue, 20 Feb 2024 13:57:07 -0500 Subject: [PATCH 13/13] Update daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md Co-authored-by: Marc Duiker Signed-off-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> --- .../building-blocks/workflow/howto-manage-workflow.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 0b0bad38fc2..162ec4a4102 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -238,7 +238,7 @@ public class DemoWorkflowClient { {{% codetab %}} -Manage your workflow within your code. [In the workflow example from the Go SDK](todo), the workflow is registered in the code using the following APIs: +Manage your workflow within your code. [In the workflow example from the Go SDK](https://github.com/dapr/go-sdk/tree/main/examples/workflow), the workflow is registered in the code using the following APIs: - **StartWorkflow**: Starts a new workflow instance - **GetWorkflow**: Get information on the status of the workflow