diff --git a/README.md b/README.md index 6899607b..72956410 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,9 @@ Each sample demonstrates one feature of the SDK, together with tests. - [**Basic mTLS hello world**](./helloworldmtls): Simple example of a Workflow Definition and an Activity Definition using mTLS like Temporal Cloud. +- [**Basic async activity execution**](./async-activity/): Simple example of a + Workflow Definition starting activities asynchronously. + ### API demonstrations - **Async activity completion**: Example of @@ -230,7 +233,6 @@ resource waiting its successful completion Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/ -- Async activity calling: *Example to be completed* - Async lambda: *Example to be completed* - Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed* - Exception propagation and wrapping: *Example to be completed* diff --git a/async-activity/README.md b/async-activity/README.md new file mode 100644 index 00000000..f1cc555a --- /dev/null +++ b/async-activity/README.md @@ -0,0 +1,10 @@ +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker (assuming you are in the async-activity directory) +``` +go run ./worker/main.go +``` +3) Run the following command to start the example +``` +go run ./starter/main.go +``` diff --git a/async-activity/starter/main.go b/async-activity/starter/main.go new file mode 100644 index 00000000..ebb8fc32 --- /dev/null +++ b/async-activity/starter/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "context" + "log" + + "go.temporal.io/sdk/client" + + "github.com/google/uuid" + asyncactivity "github.com/temporalio/samples-go/async-activity" +) + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + ID: "async-activity-" + uuid.NewString(), + TaskQueue: "async-activity", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, asyncactivity.AsyncActivityWorkflow, "Temporal") + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // Synchronously wait for the workflow completion. + var result string + err = we.Get(context.Background(), &result) + if err != nil { + log.Fatalln("Unable get workflow result", err) + } + log.Println("Workflow result:", result) +} diff --git a/async-activity/worker/main.go b/async-activity/worker/main.go new file mode 100644 index 00000000..07cfdccb --- /dev/null +++ b/async-activity/worker/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + asyncactivity "github.com/temporalio/samples-go/async-activity" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + // Useful events to look for: timestamp of ActivityTaskScheduled, + // ActivityTaskStarted and ActivityTaskCompleted (note that they + // may not be in the correct timestamp order in the event history). + w := worker.New(c, "async-activity", worker.Options{ + // Set this to 1 to make the activities run one after the other (note + // how both are scheduled at the same time, but ActivityTaskStarted + // differs). + MaxConcurrentActivityExecutionSize: 2, + // Set this to 0.5 to create some delay between when activities are + // started. Note that in this case, the started time does not differ. + // Only the completed time is different. + WorkerActivitiesPerSecond: 2, + }) + + w.RegisterWorkflow(asyncactivity.AsyncActivityWorkflow) + w.RegisterActivity(asyncactivity.HelloActivity) + w.RegisterActivity(asyncactivity.ByeActivity) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/async-activity/workflow.go b/async-activity/workflow.go new file mode 100644 index 00000000..ed1b1dae --- /dev/null +++ b/async-activity/workflow.go @@ -0,0 +1,54 @@ +package asyncactivity + +import ( + "context" + "fmt" + "time" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/workflow" +) + +// AsyncActivityWorkflow is a workflow definition starting two activities +// asynchronously. +func AsyncActivityWorkflow(ctx workflow.Context, name string) (string, error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + // Start activities asynchronously. + var helloResult, byeResult string + helloFuture := workflow.ExecuteActivity(ctx, HelloActivity, name) + byeFuture := workflow.ExecuteActivity(ctx, ByeActivity, name) + + // This can be done alternatively by creating a workflow selector. See + // "pickfirst" example. + err := helloFuture.Get(ctx, &helloResult) + if err != nil { + return "", fmt.Errorf("hello activity error: %s", err.Error()) + } + err = byeFuture.Get(ctx, &byeResult) + if err != nil { + return "", fmt.Errorf("bye activity error: %s", err.Error()) + } + + return helloResult + "\n" + byeResult, nil +} + +// Each of these activities will sleep for 5 seconds, but see in the temporal +// dashboard that they were created immediately one after the other. + +func HelloActivity(ctx context.Context, name string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Hello activity", "name", name) + time.Sleep(5 * time.Second) + return "Hello " + name + "!", nil +} + +func ByeActivity(ctx context.Context, name string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Bye activity", "name", name) + time.Sleep(5 * time.Second) + return "Bye " + name + "!", nil +} diff --git a/async-activity/workflow_test.go b/async-activity/workflow_test.go new file mode 100644 index 00000000..4c41733a --- /dev/null +++ b/async-activity/workflow_test.go @@ -0,0 +1,29 @@ +package asyncactivity + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func Test_Workflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterActivity(HelloActivity) + env.RegisterActivity(ByeActivity) + + // Mock the activities to skip the timers (and avoid test timeout). + env.OnActivity(HelloActivity, mock.Anything, "Temporal").Return("Hello Temporal!", nil) + env.OnActivity(ByeActivity, mock.Anything, "Temporal").Return("Bye Temporal!", nil) + + env.ExecuteWorkflow(AsyncActivityWorkflow, "Temporal") + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + + var result string + require.NoError(t, env.GetWorkflowResult(&result)) + require.Equal(t, "Hello Temporal!\nBye Temporal!", result) +}