-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async activity example #342
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
### Steps to run this sample: | ||
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). | ||
2) Run the following command to start the worker (assuming you are in the async-activity directory) | ||
``` | ||
go run ./worker/main.go | ||
``` | ||
3) Run the following command to start the example | ||
``` | ||
go run ./starter/main.go | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"log" | ||
|
||
"go.temporal.io/sdk/client" | ||
|
||
"github.com/google/uuid" | ||
asyncactivity "github.com/temporalio/samples-go/async-activity" | ||
) | ||
|
||
func main() { | ||
// The client is a heavyweight object that should be created once per process. | ||
c, err := client.Dial(client.Options{}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
workflowOptions := client.StartWorkflowOptions{ | ||
ID: "async-activity-" + uuid.NewString(), | ||
TaskQueue: "async-activity", | ||
} | ||
|
||
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, asyncactivity.AsyncActivityWorkflow, "Temporal") | ||
if err != nil { | ||
log.Fatalln("Unable to execute workflow", err) | ||
} | ||
|
||
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) | ||
|
||
// Synchronously wait for the workflow completion. | ||
var result string | ||
err = we.Get(context.Background(), &result) | ||
if err != nil { | ||
log.Fatalln("Unable get workflow result", err) | ||
} | ||
log.Println("Workflow result:", result) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package main | ||
|
||
import ( | ||
"log" | ||
|
||
"go.temporal.io/sdk/client" | ||
"go.temporal.io/sdk/worker" | ||
|
||
asyncactivity "github.com/temporalio/samples-go/async-activity" | ||
) | ||
|
||
func main() { | ||
// The client and worker are heavyweight objects that should be created once per process. | ||
c, err := client.Dial(client.Options{}) | ||
if err != nil { | ||
log.Fatalln("Unable to create client", err) | ||
} | ||
defer c.Close() | ||
|
||
// Useful events to look for: timestamp of ActivityTaskScheduled, | ||
// ActivityTaskStarted and ActivityTaskCompleted (note that they | ||
// may not be in the correct timestamp order in the event history). | ||
w := worker.New(c, "async-activity", worker.Options{ | ||
// Set this to 1 to make the activities run one after the other (note | ||
// how both are scheduled at the same time, but ActivityTaskStarted | ||
// differs). | ||
MaxConcurrentActivityExecutionSize: 2, | ||
// Set this to 0.5 to create some delay between when activities are | ||
// started. Note that in this case, the started time does not differ. | ||
// Only the completed time is different. | ||
WorkerActivitiesPerSecond: 2, | ||
}) | ||
|
||
w.RegisterWorkflow(asyncactivity.AsyncActivityWorkflow) | ||
w.RegisterActivity(asyncactivity.HelloActivity) | ||
w.RegisterActivity(asyncactivity.ByeActivity) | ||
|
||
err = w.Run(worker.InterruptCh()) | ||
if err != nil { | ||
log.Fatalln("Unable to start worker", err) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package asyncactivity | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.temporal.io/sdk/activity" | ||
"go.temporal.io/sdk/workflow" | ||
) | ||
|
||
// AsyncActivityWorkflow is a workflow definition starting two activities | ||
// asynchronously. | ||
func AsyncActivityWorkflow(ctx workflow.Context, name string) (string, error) { | ||
ao := workflow.ActivityOptions{ | ||
StartToCloseTimeout: 10 * time.Second, | ||
} | ||
ctx = workflow.WithActivityOptions(ctx, ao) | ||
|
||
// Start activities asynchronously. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not what we call an "async activity" in Temporal terms. All activities are async by default. In Temporal there is an "async activity completion" concept at https://docs.temporal.io/activities#asynchronous-activity-completion this would get confused with. This sample is just showing how to do parallel or concurrent activities. You can add a sample for "parallel activities" if you'd like (assuming we don't already have such a sample, though it's very trivial), though I would recommend using a selector to wait for first complete since that's a better demonstration for most users. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This indeed wasn't meant to be async completion (the expense example showcases that). I was going through the README and I saw: Pending examplesMostly examples we haven't yet ported from https://github.com/temporalio/samples-java/ Async activity calling: Example to be completed which I thought meant essentially porting this code from the java example: public String getGreeting(String name) {
/*
* This is our workflow method. We invoke the composeGreeting method two times using
* {@link io.temporal.workflow.Async#function(Func)}.
* The results of each async activity method invocation returns us a
* {@link io.temporal.workflow.Promise} which is similar to a Java {@link java.util.concurrent.Future}
*/
Promise<String> hello = Async.function(activities::composeGreeting, "Hello", name);
Promise<String> bye = Async.function(activities::composeGreeting, "Bye", name);
// After calling the two activity methods async, we block until we receive their results
return hello.get() + "\n" + bye.get();
} I suppose since in Java there is also a different syntax which allows calling an activity and waiting for it to complete synchronously, it helps to have both samples. But given that in Go you always get a Future when calling
That would end up making this example more or less like Thus, I'd suggest dropping the sample, but maybe keep this PR for removing that section from the README, since there is the pickfirst sample? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, removing that whole "pending" section of the README may be best. @Quinn-With-Two-Ns - thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I would just remove the "pending" from the README There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since that's quite different from what I started with, I made #343 as a result to remove the "pending" and will close this PR afterwards. |
||
var helloResult, byeResult string | ||
helloFuture := workflow.ExecuteActivity(ctx, HelloActivity, name) | ||
byeFuture := workflow.ExecuteActivity(ctx, ByeActivity, name) | ||
|
||
// This can be done alternatively by creating a workflow selector. See | ||
// "pickfirst" example. | ||
err := helloFuture.Get(ctx, &helloResult) | ||
if err != nil { | ||
return "", fmt.Errorf("hello activity error: %s", err.Error()) | ||
} | ||
err = byeFuture.Get(ctx, &byeResult) | ||
if err != nil { | ||
return "", fmt.Errorf("bye activity error: %s", err.Error()) | ||
} | ||
|
||
return helloResult + "\n" + byeResult, nil | ||
} | ||
|
||
// Each of these activities will sleep for 5 seconds, but see in the temporal | ||
// dashboard that they were created immediately one after the other. | ||
|
||
func HelloActivity(ctx context.Context, name string) (string, error) { | ||
logger := activity.GetLogger(ctx) | ||
logger.Info("Hello activity", "name", name) | ||
time.Sleep(5 * time.Second) | ||
return "Hello " + name + "!", nil | ||
} | ||
|
||
func ByeActivity(ctx context.Context, name string) (string, error) { | ||
logger := activity.GetLogger(ctx) | ||
logger.Info("Bye activity", "name", name) | ||
time.Sleep(5 * time.Second) | ||
return "Bye " + name + "!", nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package asyncactivity | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/require" | ||
"go.temporal.io/sdk/testsuite" | ||
) | ||
|
||
func Test_Workflow(t *testing.T) { | ||
testSuite := &testsuite.WorkflowTestSuite{} | ||
env := testSuite.NewTestWorkflowEnvironment() | ||
env.RegisterActivity(HelloActivity) | ||
env.RegisterActivity(ByeActivity) | ||
|
||
// Mock the activities to skip the timers (and avoid test timeout). | ||
env.OnActivity(HelloActivity, mock.Anything, "Temporal").Return("Hello Temporal!", nil) | ||
env.OnActivity(ByeActivity, mock.Anything, "Temporal").Return("Bye Temporal!", nil) | ||
|
||
env.ExecuteWorkflow(AsyncActivityWorkflow, "Temporal") | ||
|
||
require.True(t, env.IsWorkflowCompleted()) | ||
require.NoError(t, env.GetWorkflowError()) | ||
|
||
var result string | ||
require.NoError(t, env.GetWorkflowResult(&result)) | ||
require.Equal(t, "Hello Temporal!\nBye Temporal!", result) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these settings are related to the sample and don't need to be set for this sample