From c12c9594c4a16270e9b285c210fb34baa3c6c845 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Thu, 14 Nov 2024 06:02:27 +0100 Subject: [PATCH] worflows: activity retry policy (#644) * worflows: activity retry policy Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * adjust name Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * fix build Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * add tests Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * register activity Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --------- Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> Co-authored-by: Mike Nguyen --- examples/go.mod | 2 +- examples/go.sum | 4 ++-- examples/workflow/README.md | 2 ++ examples/workflow/main.go | 30 +++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 ++-- workflow/activity_context.go | 32 ++++++++++++++++++++++++++++++- workflow/activity_context_test.go | 22 +++++++++++++++++++++ workflow/context.go | 2 +- 9 files changed, 92 insertions(+), 8 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index a70622bf..37c75c6e 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -26,7 +26,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/marusama/semaphore/v2 v2.5.0 // indirect - github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect + github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index b3b2f562..41985448 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 54b9e5fe..8c592757 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -15,6 +15,7 @@ expected_stdout_lines: - '== APP == Worker initialized' - '== APP == TestWorkflow registered' - '== APP == TestActivity registered' + - '== APP == FailActivity registered' - '== APP == runner started' - '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' - '== APP == workflow paused' @@ -22,6 +23,7 @@ expected_stdout_lines: - '== APP == stage: 1' - '== APP == workflow event raised' - '== APP == stage: 2' + - '== APP == fail activity executions: 3' - '== APP == workflow status: COMPLETED' - '== APP == workflow purged' - '== APP == stage: 2' diff --git a/examples/workflow/main.go b/examples/workflow/main.go index 0e5677dd..2e78bc32 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "errors" "fmt" "log" "time" @@ -24,6 +25,7 @@ import ( ) var stage = 0 +var failActivityTries = 0 func main() { w, err := workflow.NewWorker() @@ -43,6 +45,11 @@ func main() { } fmt.Println("TestActivity registered") + if err := w.RegisterActivity(FailActivity); err != nil { + log.Fatal(err) + } + fmt.Println("FailActivity registered") + // Start workflow runner if err := w.Start(); err != nil { log.Fatal(err) @@ -112,6 +119,15 @@ func main() { fmt.Printf("stage: %d\n", stage) + waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + _, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID) + cancel() + if err != nil { + log.Fatalf("failed to wait for workflow: %v", err) + } + + fmt.Printf("fail activity executions: %d\n", failActivityTries) + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { log.Fatalf("failed to get workflow: %v", err) @@ -186,6 +202,15 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { return nil, err } + if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 1 * time.Second, + })).Await(nil); err == nil { + return nil, fmt.Errorf("unexpected no error executing fail activity") + } + return output, nil } @@ -199,3 +224,8 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) { return fmt.Sprintf("Stage: %d", stage), nil } + +func FailActivity(ctx workflow.ActivityContext) (any, error) { + failActivityTries += 1 + return nil, errors.New("dummy activity error") +} diff --git a/go.mod b/go.mod index 5c2549f2..17d84f18 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 - github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d + github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 github.com/stretchr/testify v1.9.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 diff --git a/go.sum b/go.sum index 0db5a317..9270d70d 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/workflow/activity_context.go b/workflow/activity_context.go index 81c60b6a..8da9a4c8 100644 --- a/workflow/activity_context.go +++ b/workflow/activity_context.go @@ -17,6 +17,7 @@ package workflow import ( "context" "encoding/json" + "time" "google.golang.org/protobuf/types/known/wrapperspb" @@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context { type callActivityOption func(*callActivityOptions) error type callActivityOptions struct { - rawInput *wrapperspb.StringValue + rawInput *wrapperspb.StringValue + retryPolicy *RetryPolicy +} + +type RetryPolicy struct { + MaxAttempts int + InitialRetryInterval time.Duration + BackoffCoefficient float64 + MaxRetryInterval time.Duration + RetryTimeout time.Duration } // ActivityInput is an option to pass a JSON-serializable input @@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption { } } +func ActivityRetryPolicy(policy RetryPolicy) callActivityOption { + return func(opts *callActivityOptions) error { + opts.retryPolicy = &policy + return nil + } +} + +func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy { + if opts.retryPolicy == nil { + return nil + } + return &task.ActivityRetryPolicy{ + MaxAttempts: opts.retryPolicy.MaxAttempts, + InitialRetryInterval: opts.retryPolicy.InitialRetryInterval, + BackoffCoefficient: opts.retryPolicy.BackoffCoefficient, + MaxRetryInterval: opts.retryPolicy.MaxRetryInterval, + RetryTimeout: opts.retryPolicy.RetryTimeout, + } +} + func marshalData(input any) ([]byte, error) { if input == nil { return nil, nil diff --git a/workflow/activity_context_test.go b/workflow/activity_context_test.go index 0e73e5e7..da9942a5 100644 --- a/workflow/activity_context_test.go +++ b/workflow/activity_context_test.go @@ -19,7 +19,9 @@ import ( "encoding/json" "fmt" "testing" + "time" + "github.com/microsoft/durabletask-go/task" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -69,6 +71,26 @@ func TestCallActivityOptions(t *testing.T) { opts := returnCallActivityOptions(ActivityRawInput("test")) assert.Equal(t, "test", opts.rawInput.GetValue()) }) + + t.Run("activity retry policy - set", func(t *testing.T) { + opts := returnCallActivityOptions(ActivityRetryPolicy(RetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 2 * time.Second, + })) + assert.Equal(t, &task.ActivityRetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 2 * time.Second, + }, opts.getRetryPolicy()) + }) + + t.Run("activity retry policy - empty", func(t *testing.T) { + opts := returnCallActivityOptions() + assert.Empty(t, opts.getRetryPolicy()) + }) } func returnCallActivityOptions(opts ...callActivityOption) callActivityOptions { diff --git a/workflow/context.go b/workflow/context.go index 5bf77cbe..82200b10 100644 --- a/workflow/context.go +++ b/workflow/context.go @@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv } } - return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue())) + return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy())) } // CallChildWorkflow returns a completable task for a given workflow.