diff --git a/.github/workflows/validate_go_quickstarts.yaml b/.github/workflows/validate_go_quickstarts.yaml index 943621aa5..b79a0d052 100644 --- a/.github/workflows/validate_go_quickstarts.yaml +++ b/.github/workflows/validate_go_quickstarts.yaml @@ -35,23 +35,23 @@ jobs: env: DAPR_DEFAULT_IMAGE_REGISTRY: GHCR DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install - GOVER: 1.18 + GOVER: 1.21 KUBERNETES_VERSION: v1.21.1 KIND_VERSION: v0.11.0 KIND_IMAGE_SHA: sha256:69860bda5563ac81e3c0057d654b5253219618a22ec3a346306239bba8cfa1a6 PODMAN_VERSION: 4.4.4 strategy: - matrix: + matrix: os: [ubuntu-latest] fail-fast: false steps: - - name: Check out code - uses: actions/checkout@v2 + - name: Check out code + uses: actions/checkout@v4 - name: Load environment variables uses: artursouza/export-env-action@v2 with: - envFile: './.github/env/global.env' - expand: 'true' + envFile: "./.github/env/global.env" + expand: "true" - name: Install podman - MacOS timeout-minutes: 15 if: matrix.os == 'macos-latest' @@ -76,7 +76,7 @@ jobs: sudo ln -s $(which podman) /usr/local/bin/docker sudo ln -s $(which podman-compose) /usr/local/bin/docker-compose - name: Set up Go ${{ env.GOVER }} - uses: actions/setup-go@v2 + uses: actions/setup-go@v5 with: go-version: ${{ env.GOVER }} - name: Set up Dapr CLI - Mac/Linux @@ -84,7 +84,7 @@ jobs: run: wget -q ${{ env.DAPR_INSTALL_URL }}/install.sh -O - | /bin/bash -s ${{ env.DAPR_CLI_VERSION }} - name: Set up Dapr CLI - Windows if: matrix.os == 'windows-latest' - run: powershell -Command "\$$script=iwr -useb ${{ env.DAPR_INSTALL_URL }}/install.ps1; \$$block=[ScriptBlock]::Create(\$$script); invoke-command -ScriptBlock \$$block -ArgumentList ${{ env.DAPR_CLI_VERSION }}" + run: powershell -Command "\$$script=iwr -useb ${{ env.DAPR_INSTALL_URL }}/install.ps1; \$$block=[ScriptBlock]::Create(\$$script); invoke-command -ScriptBlock \$$block -ArgumentList ${{ env.DAPR_CLI_VERSION }}" - name: Install Dapr run: | export GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }} @@ -115,4 +115,5 @@ jobs: done - name: Linkcheck README.md run: | - make validate \ No newline at end of file + make validate + diff --git a/pub_sub/go/http/README.md b/pub_sub/go/http/README.md index 4ecc48e5b..b56b7f2d8 100644 --- a/pub_sub/go/http/README.md +++ b/pub_sub/go/http/README.md @@ -27,8 +27,8 @@ expected_stderr_lines: output_match_mode: substring match_order: none background: true -sleep: 15 -timeout_seconds: 30 +sleep: 30 +timeout_seconds: 60 --> ```bash diff --git a/pub_sub/go/sdk/README.md b/pub_sub/go/sdk/README.md index 1ebfaf9ca..5e78c5c55 100644 --- a/pub_sub/go/sdk/README.md +++ b/pub_sub/go/sdk/README.md @@ -27,8 +27,8 @@ expected_stderr_lines: output_match_mode: substring match_order: none background: true -sleep: 15 -timeout_seconds: 30 +sleep: 30 +timeout_seconds: 60 --> ```bash diff --git a/secrets_management/go/http/README.md b/secrets_management/go/http/README.md index d86f0196a..862078fe8 100644 --- a/secrets_management/go/http/README.md +++ b/secrets_management/go/http/README.md @@ -32,4 +32,4 @@ dapr run --app-id order-processor --resources-path ../../../components/ -- go ru ```bash dapr stop --app-id order-processor -``` \ No newline at end of file +``` diff --git a/service_invocation/go/http/README.md b/service_invocation/go/http/README.md index 3ab01e0ef..c36459dc0 100644 --- a/service_invocation/go/http/README.md +++ b/service_invocation/go/http/README.md @@ -24,8 +24,8 @@ expected_stderr_lines: output_match_mode: substring match_order: none background: true -sleep: 15 -timeout_seconds: 30 +sleep: 30 +timeout_seconds: 60 --> ```bash diff --git a/workflows/go/sdk/README.md b/workflows/go/sdk/README.md new file mode 100644 index 000000000..55e2e42f1 --- /dev/null +++ b/workflows/go/sdk/README.md @@ -0,0 +1,110 @@ +# Dapr workflows + +In this quickstart, you'll create a simple console application to demonstrate Dapr's workflow programming model and the workflow authoring client. The console app +starts and manages an order processing workflow. +The workflow management API provided by the Dapr client can also be used interchangeably with minor adjustments + +This quickstart includes one project: + +- Go app `order-processor` + +The quickstart contains 1 workflow (OrderProcessingWorkflow) which simulates purchasing items from a store, and 5 unique activities within the workflow. These 5 +activities are as follows: + +- NotifyActivity: This activity utilizes a logger to print out messages throughout the workflow. These messages notify the user when there is insufficient +§inventory, their payment couldn't be processed, and more. +- ProcessPaymentActivity: This activity is responsible for processing and authorizing the payment. +- VerifyInventoryActivity: This activity checks the state store to ensure that there is enough inventory present for purchase. +- UpdateInventoryActivity: This activity removes the requested items from the state store and updates the store with the new remaining inventory value. +- RequestApprovalActivity: This activity seeks approval from Manager, if payment is greater than 50000 USD. + +### Run the order processor workflow + +1. Open a new terminal window and navigate to `order-processor` directory. +2. Run the console app with Dapr: + + +```sh + +dapr run -f . +``` + +3. Expected output + +``` +== APP - order-processor == *** Welcome to the Dapr Workflow console app sample! +== APP - order-processor == *** Using this app, you can place orders that start workflows. +== APP - order-processor == dapr client initializing for: 127.0.0.1:50056 +== APP - order-processor == adding base stock item: paperclip +== APP - order-processor == 2024/02/01 12:59:52 work item listener started +== APP - order-processor == INFO: 2024/02/01 12:59:52 starting background processor +== APP - order-processor == adding base stock item: cars +== APP - order-processor == adding base stock item: computers +== APP - order-processor == ==========Begin the purchase of item:========== +== APP - order-processor == NotifyActivity: Received order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 cars - $150000 +== APP - order-processor == VerifyInventoryActivity: Verifying inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 of 10 cars +== APP - order-processor == VerifyInventoryActivity: There are 100 cars available for purchase +== APP - order-processor == RequestApprovalActivity: Requesting approval for payment of 150000USD for 10 cars +== APP - order-processor == NotifyActivity: Payment for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has been approved! +== APP - order-processor == ProcessPaymentActivity: 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 - cars (150000USD) +== APP - order-processor == UpdateInventoryActivity: Checking Inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 * cars +== APP - order-processor == UpdateInventoryActivity: There are now 90 cars left in stock +== APP - order-processor == NotifyActivity: Order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has completed! +== APP - order-processor == Workflow completed - result: COMPLETED +== APP - order-processor == Purchase of item is complete +``` + +4. Stop Dapr workflow with CTRL-C or: + + +```sh +dapr stop -f . +``` + + + +### View workflow output with Zipkin + +For a more detailed view of the workflow activities (duration, progress etc.), try using Zipkin. + +1. View Traces in Zipkin UI - In your browser go to http://localhost:9411 to view the workflow trace spans in the Zipkin web UI. The order-processor workflow +should be viewable with the following output in the Zipkin web UI. Note: the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) docker container is +launched on running `dapr init`. + + + +### What happened? + +When you ran the above comands: + +1. First the "user" inputs an order for 10 cars into the concole app. +2. A unique order ID for the workflow is generated (in the above example, `b903d749cd814e099f06ebf4a56a2f90`) and the workflow is scheduled. +3. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +4. The `VerifyInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars +in stock. +5. The `RequestApprovalActivity` workflow activity is triggered due to buisness logic for orders exceeding $50k and user is prompted to manually approve the +purchase before continuing the order. +6. The workflow starts and notifies you of its status. +7. The `ProcessPaymentActivity` workflow activity begins processing payment for order `b903d749cd814e099f06ebf4a56a2f90` and confirms if successful. +8. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +9. The `NotifyActivity` workflow activity sends a notification saying that order `b903d749cd814e099f06ebf4a56a2f90` has completed. +10. The workflow terminates as completed. + + + + + + diff --git a/workflows/go/sdk/dapr.yaml b/workflows/go/sdk/dapr.yaml new file mode 100644 index 000000000..d19f90694 --- /dev/null +++ b/workflows/go/sdk/dapr.yaml @@ -0,0 +1,8 @@ +version: 1 +common: + resourcesPath: ../../components +apps: + - appDirPath: ./order-processor/ + appID: order-processor + command: ["go", "run", "."] + diff --git a/workflows/go/sdk/img/workflow-trace-spans-zipkin.png b/workflows/go/sdk/img/workflow-trace-spans-zipkin.png new file mode 100644 index 000000000..e439d6f92 Binary files /dev/null and b/workflows/go/sdk/img/workflow-trace-spans-zipkin.png differ diff --git a/workflows/go/sdk/makefile b/workflows/go/sdk/makefile new file mode 100644 index 000000000..f1577bf6c --- /dev/null +++ b/workflows/go/sdk/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk diff --git a/workflows/go/sdk/order-processor/go.mod b/workflows/go/sdk/order-processor/go.mod new file mode 100644 index 000000000..61ca95f3f --- /dev/null +++ b/workflows/go/sdk/order-processor/go.mod @@ -0,0 +1,29 @@ +module dapr_example + +go 1.21 + +toolchain go1.21.6 + +require github.com/dapr/go-sdk v1.6.1-0.20240209153236-ac26e622c4a6 + +require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/dapr/dapr v1.13.0-rc.2 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/marusama/semaphore/v2 v2.5.0 // indirect + github.com/microsoft/durabletask-go v0.4.1-0.20240122160106-fb5c4c05729d // indirect + go.opentelemetry.io/otel v1.23.1 // indirect + go.opentelemetry.io/otel/metric v1.23.1 // indirect + go.opentelemetry.io/otel/trace v1.23.1 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sys v0.17.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect + google.golang.org/grpc v1.61.0 // indirect + google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/workflows/go/sdk/order-processor/go.sum b/workflows/go/sdk/order-processor/go.sum new file mode 100644 index 000000000..d5d4d1315 --- /dev/null +++ b/workflows/go/sdk/order-processor/go.sum @@ -0,0 +1,65 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/dapr/dapr v1.13.0-rc.2 h1:Y5tQ07KB856aSWXxVjb/Lob4AT8Gy/hJxZtwODI21CI= +github.com/dapr/dapr v1.13.0-rc.2/go.mod h1:QvxJ5htwv17PeRfFMGkHznEVRkpnt35re7TpF4CsCc8= +github.com/dapr/go-sdk v1.6.1-0.20240209153236-ac26e622c4a6 h1:YXOqOgB6RslXup0R10ZPc/hcQBD+LmUXXRuohb9wrjs= +github.com/dapr/go-sdk v1.6.1-0.20240209153236-ac26e622c4a6/go.mod h1:DEftCAXK4mAt2OY7B5+9TcFQyo7lxI+OA85vyKNR01s= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= +github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= +github.com/microsoft/durabletask-go v0.4.1-0.20240122160106-fb5c4c05729d h1:CVjystOHucBzKExLHD8E96D4KUNbehP0ozgue/6Tq/Y= +github.com/microsoft/durabletask-go v0.4.1-0.20240122160106-fb5c4c05729d/go.mod h1:OSZ4K7SgqBEsaouk3lAVdDzvanIzsdj7angZ0FTeSAU= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.opentelemetry.io/otel v1.23.1 h1:Za4UzOqJYS+MUczKI320AtqZHZb7EqxO00jAHE0jmQY= +go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA= +go.opentelemetry.io/otel/metric v1.23.1 h1:PQJmqJ9u2QaJLBOELl1cxIdPcpbwzbkjfEyelTl2rlo= +go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI= +go.opentelemetry.io/otel/trace v1.23.1 h1:4LrmmEd8AU2rFvU1zegmvqW7+kWarxtNOPyeL6HmYY8= +go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= +golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 h1:FSL3lRCkhaPFxqi0s9o+V4UI2WTzAVOvkgbd4kVV4Wg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014/go.mod h1:SaPjaZGWb0lPqs6Ittu0spdfrOArqji4ZdeP5IC/9N4= +google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= +google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/workflows/go/sdk/order-processor/main.go b/workflows/go/sdk/order-processor/main.go new file mode 100644 index 000000000..26baf7819 --- /dev/null +++ b/workflows/go/sdk/order-processor/main.go @@ -0,0 +1,144 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/workflow" +) + +var ( + stateStoreName = "statestore" + workflowComponent = "dapr" + workflowName = "OrderProcessingWorkflow" + defaultItemName = "cars" +) + +func main() { + fmt.Println("*** Welcome to the Dapr Workflow console app sample!") + fmt.Println("*** Using this app, you can place orders that start workflows.") + + w, err := workflow.NewWorker() + if err != nil { + log.Fatalf("failed to start worker: %v", err) + } + + if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(NotifyActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(RequestApprovalActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(VerifyInventoryActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(ProcessPaymentActivity); err != nil { + log.Fatal(err) + } + if err := w.RegisterActivity(UpdateInventoryActivity); err != nil { + log.Fatal(err) + } + + if err := w.Start(); err != nil { + log.Fatal(err) + } + + daprClient, err := client.NewClient() + if err != nil { + log.Fatalf("failed to initialise dapr client: %v", err) + } + wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient)) + if err != nil { + log.Fatalf("failed to initialise workflow client: %v", err) + } + + inventory := []InventoryItem{ + {ItemName: "paperclip", PerItemCost: 5, Quantity: 100}, + {ItemName: "cars", PerItemCost: 15000, Quantity: 100}, + {ItemName: "computers", PerItemCost: 500, Quantity: 100}, + } + if err := restockInventory(daprClient, inventory); err != nil { + log.Fatalf("failed to restock: %v", err) + } + + fmt.Println("==========Begin the purchase of item:==========") + + itemName := defaultItemName + orderQuantity := 10 + + totalCost := inventory[1].PerItemCost * orderQuantity + + orderPayload := OrderPayload{ + ItemName: itemName, + Quantity: orderQuantity, + TotalCost: totalCost, + } + + id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload)) + if err != nil { + log.Fatalf("failed to start workflow: %v", err) + } + + approvalSought := false + + startTime := time.Now() + + for { + timeDelta := time.Since(startTime) + metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) + if err != nil { + log.Fatalf("failed to fetch workflow: %v", err) + } + if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) { + fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String()) + break + } + if timeDelta.Seconds() >= 10 { + metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id) + if err != nil { + log.Fatalf("failed to fetch workflow: %v", err) + } + if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) { + approvalSought = true + promptForApproval(id) + } + } + // Sleep before the next iteration + time.Sleep(time.Second) + } + + fmt.Println("Purchase of item is complete") +} + +// promptForApproval is an example case. There is no user input required here due to this being for testing purposes only. +// It would be perfectly valid to add a wait here or display a prompt to continue the process. +func promptForApproval(id string) { + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatalf("failed to initialise wfClient: %v", err) + } + if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil { + log.Fatal(err) + } +} + +func restockInventory(daprClient client.Client, inventory []InventoryItem) error { + for _, item := range inventory { + itemSerialized, err := json.Marshal(item) + if err != nil { + return err + } + fmt.Printf("adding base stock item: %s\n", item.ItemName) + if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil { + return err + } + } + return nil +} diff --git a/workflows/go/sdk/order-processor/models.go b/workflows/go/sdk/order-processor/models.go new file mode 100644 index 000000000..29d340932 --- /dev/null +++ b/workflows/go/sdk/order-processor/models.go @@ -0,0 +1,43 @@ +package main + +type OrderPayload struct { + ItemName string `json:"item_name"` + TotalCost int `json:"total_cost"` + Quantity int `json:"quanity"` +} + +type OrderResult struct { + Processed bool `json:"processed"` +} + +type InventoryItem struct { + ItemName string `json:"item_name"` + PerItemCost int `json:"per_item_cost"` + Quantity int `json:"quanity"` +} + +type InventoryRequest struct { + RequestID string `json:"request_id"` + ItemName string `json:"item_name"` + Quantity int `json:"quanity"` +} + +type InventoryResult struct { + Success bool `json:"success"` + InventoryItem InventoryItem `json:"inventory_item"` +} + +type PaymentRequest struct { + RequestID string `json:"request_id"` + ItemBeingPurchased string `json:"item_being_purchased"` + Amount int `json:"amount"` + Quantity int `json:"quantity"` +} + +type ApprovalRequired struct { + Approval bool `json:"approval"` +} + +type Notification struct { + Message string `json:"message"` +} diff --git a/workflows/go/sdk/order-processor/workflow.go b/workflows/go/sdk/order-processor/workflow.go new file mode 100644 index 000000000..507539906 --- /dev/null +++ b/workflows/go/sdk/order-processor/workflow.go @@ -0,0 +1,189 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/workflow" +) + +// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process. +func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) { + orderID := ctx.InstanceID() + var orderPayload OrderPayload + if err := ctx.GetInput(&orderPayload); err != nil { + return nil, err + } + err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{ + Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost), + })).Await(nil) + if err != nil { + return OrderResult{Processed: false}, err + } + + var verifyInventoryResult InventoryResult + if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{ + RequestID: orderID, + ItemName: orderPayload.ItemName, + Quantity: orderPayload.Quantity, + })).Await(&verifyInventoryResult); err != nil { + return OrderResult{Processed: false}, err + } + + if !verifyInventoryResult.Success { + notification := Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)} + err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(notification)).Await(nil) + return OrderResult{Processed: false}, err + } + + if orderPayload.TotalCost > 50000 { + var approvalRequired ApprovalRequired + if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil { + return OrderResult{Processed: false}, err + } + if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil { + return OrderResult{Processed: false}, err + } + // TODO: Confirm timeout flow - this will be in the form of an error. + if approvalRequired.Approval { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a successful order: %v\n", err) + } + } else { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of an unsuccessful order :%v\n", err) + } + return OrderResult{Processed: false}, err + } + } + err = ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{ + RequestID: orderID, + ItemBeingPurchased: orderPayload.ItemName, + Amount: orderPayload.TotalCost, + Quantity: orderPayload.Quantity, + })).Await(nil) + if err != nil { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a failed order: %v", err) + } + return OrderResult{Processed: false}, err + } + + err = ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{ + RequestID: orderID, + ItemBeingPurchased: orderPayload.ItemName, + Amount: orderPayload.TotalCost, + Quantity: orderPayload.Quantity, + })).Await(nil) + if err != nil { + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a failed order: %v", err) + } + return OrderResult{Processed: false}, err + } + + if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil { + log.Printf("failed to notify of a successful order: %v", err) + } + return OrderResult{Processed: true}, err +} + +// NotifyActivity outputs a notification message +func NotifyActivity(ctx workflow.ActivityContext) (any, error) { + var input Notification + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("NotifyActivity: %s\n", input.Message) + return nil, nil +} + +// ProcessPaymentActivity is used to process a payment +func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) { + var input PaymentRequest + if err := ctx.GetInput(&input); err != nil { + return "", err + } + fmt.Printf("ProcessPaymentActivity: %s for %d - %s (%dUSD)\n", input.RequestID, input.Quantity, input.ItemBeingPurchased, input.Amount) + return nil, nil +} + +// VerifyInventoryActivity is used to verify if an item is available in the inventory +func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) { + var input InventoryRequest + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("VerifyInventoryActivity: Verifying inventory for order %s of %d %s\n", input.RequestID, input.Quantity, input.ItemName) + dClient, err := client.NewClient() + if err != nil { + return nil, err + } + item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemName, nil) + if err != nil { + return nil, err + } + if item == nil { + return InventoryResult{ + Success: false, + InventoryItem: InventoryItem{}, + }, nil + } + var result InventoryItem + if err := json.Unmarshal(item.Value, &result); err != nil { + log.Fatalf("failed to parse inventory result %v", err) + } + fmt.Printf("VerifyInventoryActivity: There are %d %s available for purchase\n", result.Quantity, result.ItemName) + if result.Quantity >= input.Quantity { + return InventoryResult{Success: true, InventoryItem: result}, nil + } + return InventoryResult{Success: false, InventoryItem: InventoryItem{}}, nil +} + +// UpdateInventoryActivity modifies the inventory. +func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) { + var input PaymentRequest + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("UpdateInventoryActivity: Checking Inventory for order %s for %d * %s\n", input.RequestID, input.Quantity, input.ItemBeingPurchased) + dClient, err := client.NewClient() + if err != nil { + return nil, err + } + item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemBeingPurchased, nil) + if err != nil { + return nil, err + } + var result InventoryItem + err = json.Unmarshal(item.Value, &result) + if err != nil { + return nil, err + } + newQuantity := result.Quantity - input.Quantity + if newQuantity < 0 { + return nil, fmt.Errorf("insufficient inventory for: %s", input.ItemBeingPurchased) + } + result.Quantity = newQuantity + newState, err := json.Marshal(result) + if err != nil { + log.Fatalf("failed to marshal new state: %v", err) + } + dClient.SaveState(context.Background(), stateStoreName, input.ItemBeingPurchased, newState, nil) + fmt.Printf("UpdateInventoryActivity: There are now %d %s left in stock\n", result.Quantity, result.ItemName) + return InventoryResult{Success: true, InventoryItem: result}, nil +} + +// RequestApprovalActivity requests approval for the order +func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) { + var input OrderPayload + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName) + return ApprovalRequired{Approval: true}, nil +}