Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async activity example #342

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ Each sample demonstrates one feature of the SDK, together with tests.
- [**Basic mTLS hello world**](./helloworldmtls): Simple example of a
Workflow Definition and an Activity Definition using mTLS like Temporal Cloud.

- [**Basic async activity execution**](./async-activity/): Simple example of a
Workflow Definition starting activities asynchronously.

### API demonstrations

- **Async activity completion**: Example of
Expand Down Expand Up @@ -230,7 +233,6 @@ resource waiting its successful completion

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/

- Async activity calling: *Example to be completed*
- Async lambda: *Example to be completed*
- Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed*
- Exception propagation and wrapping: *Example to be completed*
Expand Down
10 changes: 10 additions & 0 deletions async-activity/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker (assuming you are in the async-activity directory)
```
go run ./worker/main.go
```
3) Run the following command to start the example
```
go run ./starter/main.go
```
40 changes: 40 additions & 0 deletions async-activity/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"context"
"log"

"go.temporal.io/sdk/client"

"github.com/google/uuid"
asyncactivity "github.com/temporalio/samples-go/async-activity"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "async-activity-" + uuid.NewString(),
TaskQueue: "async-activity",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, asyncactivity.AsyncActivityWorkflow, "Temporal")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

// Synchronously wait for the workflow completion.
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("Unable get workflow result", err)
}
log.Println("Workflow result:", result)
}
42 changes: 42 additions & 0 deletions async-activity/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

asyncactivity "github.com/temporalio/samples-go/async-activity"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

// Useful events to look for: timestamp of ActivityTaskScheduled,
// ActivityTaskStarted and ActivityTaskCompleted (note that they
// may not be in the correct timestamp order in the event history).
w := worker.New(c, "async-activity", worker.Options{
// Set this to 1 to make the activities run one after the other (note
// how both are scheduled at the same time, but ActivityTaskStarted
// differs).
MaxConcurrentActivityExecutionSize: 2,
// Set this to 0.5 to create some delay between when activities are
// started. Note that in this case, the started time does not differ.
// Only the completed time is different.
WorkerActivitiesPerSecond: 2,
Comment on lines +24 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these settings are related to the sample and don't need to be set for this sample

})

w.RegisterWorkflow(asyncactivity.AsyncActivityWorkflow)
w.RegisterActivity(asyncactivity.HelloActivity)
w.RegisterActivity(asyncactivity.ByeActivity)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
54 changes: 54 additions & 0 deletions async-activity/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package asyncactivity

import (
"context"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

// AsyncActivityWorkflow is a workflow definition starting two activities
// asynchronously.
func AsyncActivityWorkflow(ctx workflow.Context, name string) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

// Start activities asynchronously.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not what we call an "async activity" in Temporal terms. All activities are async by default. In Temporal there is an "async activity completion" concept at https://docs.temporal.io/activities#asynchronous-activity-completion this would get confused with. This sample is just showing how to do parallel or concurrent activities.

You can add a sample for "parallel activities" if you'd like (assuming we don't already have such a sample, though it's very trivial), though I would recommend using a selector to wait for first complete since that's a better demonstration for most users.

Copy link
Author

@Ozoniuss Ozoniuss Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indeed wasn't meant to be async completion (the expense example showcases that). I was going through the README and I saw:


Pending examples

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/

Async activity calling: Example to be completed


which I thought meant essentially porting this code from the java example:

    public String getGreeting(String name) {

      /*
       * This is our workflow method. We invoke the composeGreeting method two times using
       * {@link io.temporal.workflow.Async#function(Func)}.
       * The results of each async activity method invocation returns us a
       * {@link io.temporal.workflow.Promise} which is similar to a Java {@link java.util.concurrent.Future}
       */
      Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
      Promise<String> bye = Async.function(activities::composeGreeting, "Bye", name);

      // After calling the two activity methods async, we block until we receive their results
      return hello.get() + "\n" + bye.get();
    }

I suppose since in Java there is also a different syntax which allows calling an activity and waiting for it to complete synchronously, it helps to have both samples. But given that in Go you always get a Future when calling ExecuteActivity, not sure if having this sample as well adds as much value as in the Java repository (you could pretty much instead use the hello example and build upon that in this case). I made this PR pretty mainly because of that readme note.

though I would recommend using a selector to wait for first complete since that's a better demonstration for most users

That would end up making this example more or less like pickfirst, so I guess in that case I'd be questioning the purpose of this sample.

Thus, I'd suggest dropping the sample, but maybe keep this PR for removing that section from the README, since there is the pickfirst sample?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, removing that whole "pending" section of the README may be best. @Quinn-With-Two-Ns - thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would just remove the "pending" from the README

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since that's quite different from what I started with, I made #343 as a result to remove the "pending" and will close this PR afterwards.

var helloResult, byeResult string
helloFuture := workflow.ExecuteActivity(ctx, HelloActivity, name)
byeFuture := workflow.ExecuteActivity(ctx, ByeActivity, name)

// This can be done alternatively by creating a workflow selector. See
// "pickfirst" example.
err := helloFuture.Get(ctx, &helloResult)
if err != nil {
return "", fmt.Errorf("hello activity error: %s", err.Error())
}
err = byeFuture.Get(ctx, &byeResult)
if err != nil {
return "", fmt.Errorf("bye activity error: %s", err.Error())
}

return helloResult + "\n" + byeResult, nil
}

// Each of these activities will sleep for 5 seconds, but see in the temporal
// dashboard that they were created immediately one after the other.

func HelloActivity(ctx context.Context, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Hello activity", "name", name)
time.Sleep(5 * time.Second)
return "Hello " + name + "!", nil
}

func ByeActivity(ctx context.Context, name string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Bye activity", "name", name)
time.Sleep(5 * time.Second)
return "Bye " + name + "!", nil
}
29 changes: 29 additions & 0 deletions async-activity/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package asyncactivity

import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func Test_Workflow(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.RegisterActivity(HelloActivity)
env.RegisterActivity(ByeActivity)

// Mock the activities to skip the timers (and avoid test timeout).
env.OnActivity(HelloActivity, mock.Anything, "Temporal").Return("Hello Temporal!", nil)
env.OnActivity(ByeActivity, mock.Anything, "Temporal").Return("Bye Temporal!", nil)

env.ExecuteWorkflow(AsyncActivityWorkflow, "Temporal")

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())

var result string
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, "Hello Temporal!\nBye Temporal!", result)
}