Skip to content

Commit

Permalink
Add async activity example
Browse files Browse the repository at this point in the history
Include a very basic async activity example. This is the same example
from the samples-java repository, ported to Go.
  • Loading branch information
Ozoniuss committed Apr 13, 2024
1 parent ea23602 commit 30ce5b4
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ Each sample demonstrates one feature of the SDK, together with tests.
- [**Basic mTLS hello world**](./helloworldmtls): Simple example of a
Workflow Definition and an Activity Definition using mTLS like Temporal Cloud.

- [**Basic async activity execution**](./async-activity/): Simple example of a
Workflow Definition starting activities asynchronously.

### API demonstrations

- **Async activity completion**: Example of
Expand Down Expand Up @@ -230,7 +233,6 @@ resource waiting its successful completion

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/

- Async activity calling: *Example to be completed*
- Async lambda: *Example to be completed*
- Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed*
- Exception propagation and wrapping: *Example to be completed*
Expand Down
10 changes: 10 additions & 0 deletions async-activity/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker (assuming you are in the async-activity directory)
```
go run ./worker/main.go
```
3) Run the following command to start the example
```
go run ./starter/main.go
```
40 changes: 40 additions & 0 deletions async-activity/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"context"
"log"

"go.temporal.io/sdk/client"

"github.com/google/uuid"
asyncactivity "github.com/temporalio/samples-go/async-activity"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "async-activity-" + uuid.NewString(),
TaskQueue: "async-activity",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, asyncactivity.AsyncActivityWorkflow, "Temporal")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

// Synchronously wait for the workflow completion.
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)
}
42 changes: 42 additions & 0 deletions async-activity/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

asyncactivity "github.com/temporalio/samples-go/async-activity"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

// Useful events to look for: timestamp of ActivityTaskScheduled,
// ActivityTaskStarted and ActivityTaskCompleted (note that they
// may not be in the correct timestamp order in the event history).
w := worker.New(c, "async-activity", worker.Options{
// Set this to 1 to make the activities run one after the other (note
// how both are scheduled at the same time, but ActivityTaskStarted
// differs).
MaxConcurrentActivityExecutionSize: 2,
// Set this to 0.5 to create some delay between when activities are
// started. Note that in this case, the started time does not differ.
// Only the completed time is different.
WorkerActivitiesPerSecond: 2,
})

w.RegisterWorkflow(asyncactivity.AsyncActivityWorkflow)
w.RegisterActivity(asyncactivity.HelloActivity)
w.RegisterActivity(asyncactivity.ByeActivity)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
54 changes: 54 additions & 0 deletions async-activity/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package asyncactivity

import (
"context"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

// AsyncActivityWorkflow is a workflow definition starting two activities
// asynchronously.
func AsyncActivityWorkflow(ctx workflow.Context, name string) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

// Start activities asynchronously.
var helloResult, byeResult string
helloFuture := workflow.ExecuteActivity(ctx, HelloActivity, name)
byeFuture := workflow.ExecuteActivity(ctx, ByeActivity, name)

// This can be done alternatively by creating a workflow selector. See
// "pickfirst" example.
err := helloFuture.Get(ctx, &helloResult)
if err != nil {
return "", fmt.Errorf("hello activity error: %s", err.Error())
}
err = byeFuture.Get(ctx, &byeResult)
if err != nil {
return "", fmt.Errorf("bye activity error: %s", err.Error())
}

return helloResult + "\n" + byeResult, nil
}

// Each of these activities will sleep for 5 seconds, but see in the temporal
// dashboard that they were created immediately one after the other.

func HelloActivity(ctx context.Context, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Hello activity", "name", name)
time.Sleep(5 * time.Second)
return "Hello " + name + "!", nil
}

func ByeActivity(ctx context.Context, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Bye activity", "name", name)
time.Sleep(5 * time.Second)
return "Bye " + name + "!", nil
}
29 changes: 29 additions & 0 deletions async-activity/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package asyncactivity

import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func Test_Workflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.RegisterActivity(HelloActivity)
env.RegisterActivity(ByeActivity)

// Mock the activities to skip the timers (and avoid test timeout).
env.OnActivity(HelloActivity, mock.Anything, "Temporal").Return("Hello Temporal!", nil)
env.OnActivity(ByeActivity, mock.Anything, "Temporal").Return("Bye Temporal!", nil)

env.ExecuteWorkflow(AsyncActivityWorkflow, "Temporal")

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())

var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "Hello Temporal!\nBye Temporal!", result)
}

0 comments on commit 30ce5b4

Please sign in to comment.