From 9a656d468a4468b64b10fbcd295b8492eb9fdb8a Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 25 Oct 2024 13:17:26 +0200 Subject: [PATCH 1/5] worflows: activity retry policy Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- examples/go.mod | 2 +- examples/go.sum | 4 ++-- examples/workflow/main.go | 7 ++++++- go.mod | 2 +- go.sum | 4 ++-- workflow/activity_context.go | 32 +++++++++++++++++++++++++++++++- workflow/context.go | 2 +- 7 files changed, 44 insertions(+), 9 deletions(-) diff --git a/examples/go.mod b/examples/go.mod index d504837f..5c6c1e49 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -26,7 +26,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/marusama/semaphore/v2 v2.5.0 // indirect - github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect + github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index b3b2f562..41985448 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= diff --git a/examples/workflow/main.go b/examples/workflow/main.go index 0e5677dd..e24aecf2 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -182,7 +182,12 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { return nil, err } - if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input), workflow.RetryPolicy(workflow.ActivityRetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 1 * time.Second, + BackoffCoefficient: 2, + MaxRetryInterval: 3 * time.Second, + })).Await(&output); err != nil { return nil, err } diff --git a/go.mod b/go.mod index 139a636c..2ed772f1 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 - github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d + github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 github.com/stretchr/testify v1.9.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 diff --git a/go.sum b/go.sum index 0db5a317..9270d70d 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= -github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0= +github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/workflow/activity_context.go b/workflow/activity_context.go index 81c60b6a..811ba620 100644 --- a/workflow/activity_context.go +++ b/workflow/activity_context.go @@ -17,6 +17,7 @@ package workflow import ( "context" "encoding/json" + "time" "google.golang.org/protobuf/types/known/wrapperspb" @@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context { type callActivityOption func(*callActivityOptions) error type callActivityOptions struct { - rawInput *wrapperspb.StringValue + rawInput *wrapperspb.StringValue + retryPolicy *ActivityRetryPolicy +} + +type ActivityRetryPolicy struct { + MaxAttempts int + InitialRetryInterval time.Duration + BackoffCoefficient float64 + MaxRetryInterval time.Duration + RetryTimeout time.Duration } // ActivityInput is an option to pass a JSON-serializable input @@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption { } } +func RetryPolicy(policy ActivityRetryPolicy) callActivityOption { + return func(opts *callActivityOptions) error { + opts.retryPolicy = &policy + return nil + } +} + +func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy { + if opts.retryPolicy == nil { + return nil + } + return &task.ActivityRetryPolicy{ + MaxAttempts: opts.retryPolicy.MaxAttempts, + InitialRetryInterval: opts.retryPolicy.InitialRetryInterval, + BackoffCoefficient: opts.retryPolicy.BackoffCoefficient, + MaxRetryInterval: opts.retryPolicy.MaxRetryInterval, + RetryTimeout: opts.retryPolicy.RetryTimeout, + } +} + func marshalData(input any) ([]byte, error) { if input == nil { return nil, nil diff --git a/workflow/context.go b/workflow/context.go index 5bf77cbe..82200b10 100644 --- a/workflow/context.go +++ b/workflow/context.go @@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv } } - return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue())) + return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy())) } // CallChildWorkflow returns a completable task for a given workflow. From 006d58730fb7e976a3989affb04f1b9f950ad077 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Mon, 28 Oct 2024 10:22:20 +0100 Subject: [PATCH 2/5] adjust name Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- workflow/activity_context.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/workflow/activity_context.go b/workflow/activity_context.go index 811ba620..8da9a4c8 100644 --- a/workflow/activity_context.go +++ b/workflow/activity_context.go @@ -40,10 +40,10 @@ type callActivityOption func(*callActivityOptions) error type callActivityOptions struct { rawInput *wrapperspb.StringValue - retryPolicy *ActivityRetryPolicy + retryPolicy *RetryPolicy } -type ActivityRetryPolicy struct { +type RetryPolicy struct { MaxAttempts int InitialRetryInterval time.Duration BackoffCoefficient float64 @@ -71,7 +71,7 @@ func ActivityRawInput(input string) callActivityOption { } } -func RetryPolicy(policy ActivityRetryPolicy) callActivityOption { +func ActivityRetryPolicy(policy RetryPolicy) callActivityOption { return func(opts *callActivityOptions) error { opts.retryPolicy = &policy return nil From 7040699d5e94d0574d0420f6b70005cd1a346c33 Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Thu, 31 Oct 2024 12:04:50 +0100 Subject: [PATCH 3/5] fix build Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- examples/workflow/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/workflow/main.go b/examples/workflow/main.go index e24aecf2..f8a001ae 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -182,7 +182,7 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { return nil, err } - if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input), workflow.RetryPolicy(workflow.ActivityRetryPolicy{ + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input), workflow.ActivityRetryPolicy(workflow.RetryPolicy{ MaxAttempts: 3, InitialRetryInterval: 1 * time.Second, BackoffCoefficient: 2, From 1d8d984c5f2f8d4488a03fd7c533bd1248eaeb1b Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Wed, 6 Nov 2024 13:49:37 +0100 Subject: [PATCH 4/5] add tests Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- examples/workflow/README.md | 1 + examples/workflow/main.go | 30 +++++++++++++++++++++++++----- workflow/activity_context_test.go | 22 ++++++++++++++++++++++ 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 54b9e5fe..6146d57a 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -22,6 +22,7 @@ expected_stdout_lines: - '== APP == stage: 1' - '== APP == workflow event raised' - '== APP == stage: 2' + - '== APP == fail activity executions: 3' - '== APP == workflow status: COMPLETED' - '== APP == workflow purged' - '== APP == stage: 2' diff --git a/examples/workflow/main.go b/examples/workflow/main.go index f8a001ae..456fa4a7 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "errors" "fmt" "log" "time" @@ -24,6 +25,7 @@ import ( ) var stage = 0 +var failActivityTries = 0 func main() { w, err := workflow.NewWorker() @@ -112,6 +114,15 @@ func main() { fmt.Printf("stage: %d\n", stage) + waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + _, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID) + cancel() + if err != nil { + log.Fatalf("failed to wait for workflow: %v", err) + } + + fmt.Printf("fail activity executions: %d\n", failActivityTries) + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { log.Fatalf("failed to get workflow: %v", err) @@ -182,13 +193,17 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { return nil, err } - if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input), workflow.ActivityRetryPolicy(workflow.RetryPolicy{ + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err + } + + if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{ MaxAttempts: 3, - InitialRetryInterval: 1 * time.Second, + InitialRetryInterval: 100 * time.Millisecond, BackoffCoefficient: 2, - MaxRetryInterval: 3 * time.Second, - })).Await(&output); err != nil { - return nil, err + MaxRetryInterval: 1 * time.Second, + })).Await(nil); err == nil { + return nil, fmt.Errorf("unexpected no error executing fail activity") } return output, nil @@ -204,3 +219,8 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) { return fmt.Sprintf("Stage: %d", stage), nil } + +func FailActivity(ctx workflow.ActivityContext) (any, error) { + failActivityTries += 1 + return nil, errors.New("dummy activity error") +} diff --git a/workflow/activity_context_test.go b/workflow/activity_context_test.go index 0e73e5e7..da9942a5 100644 --- a/workflow/activity_context_test.go +++ b/workflow/activity_context_test.go @@ -19,7 +19,9 @@ import ( "encoding/json" "fmt" "testing" + "time" + "github.com/microsoft/durabletask-go/task" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -69,6 +71,26 @@ func TestCallActivityOptions(t *testing.T) { opts := returnCallActivityOptions(ActivityRawInput("test")) assert.Equal(t, "test", opts.rawInput.GetValue()) }) + + t.Run("activity retry policy - set", func(t *testing.T) { + opts := returnCallActivityOptions(ActivityRetryPolicy(RetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 2 * time.Second, + })) + assert.Equal(t, &task.ActivityRetryPolicy{ + MaxAttempts: 3, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 2 * time.Second, + }, opts.getRetryPolicy()) + }) + + t.Run("activity retry policy - empty", func(t *testing.T) { + opts := returnCallActivityOptions() + assert.Empty(t, opts.getRetryPolicy()) + }) } func returnCallActivityOptions(opts ...callActivityOption) callActivityOptions { From a30f3a362c7e255f51a1fb82fef547206aacfe4f Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:30:48 +0100 Subject: [PATCH 5/5] register activity Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- examples/workflow/README.md | 1 + examples/workflow/main.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 6146d57a..8c592757 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -15,6 +15,7 @@ expected_stdout_lines: - '== APP == Worker initialized' - '== APP == TestWorkflow registered' - '== APP == TestActivity registered' + - '== APP == FailActivity registered' - '== APP == runner started' - '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' - '== APP == workflow paused' diff --git a/examples/workflow/main.go b/examples/workflow/main.go index 456fa4a7..2e78bc32 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -45,6 +45,11 @@ func main() { } fmt.Println("TestActivity registered") + if err := w.RegisterActivity(FailActivity); err != nil { + log.Fatal(err) + } + fmt.Println("FailActivity registered") + // Start workflow runner if err := w.Start(); err != nil { log.Fatal(err)