Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worflows: activity retry policy #644

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down
1 change: 1 addition & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ expected_stdout_lines:
- '== APP == stage: 1'
- '== APP == workflow event raised'
- '== APP == stage: 2'
- '== APP == fail activity executions: 3'
- '== APP == workflow status: COMPLETED'
- '== APP == workflow purged'
- '== APP == stage: 2'
Expand Down
25 changes: 25 additions & 0 deletions examples/workflow/main.go
mikeee marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"time"
Expand All @@ -24,6 +25,7 @@ import (
)

var stage = 0
var failActivityTries = 0

func main() {
w, err := workflow.NewWorker()
Expand Down Expand Up @@ -112,6 +114,15 @@ func main() {

fmt.Printf("stage: %d\n", stage)

waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID)
cancel()
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}

fmt.Printf("fail activity executions: %d\n", failActivityTries)

respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
Expand Down Expand Up @@ -186,6 +197,15 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
return nil, err
}

if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 1 * time.Second,
})).Await(nil); err == nil {
return nil, fmt.Errorf("unexpected no error executing fail activity")
}

return output, nil
}

Expand All @@ -199,3 +219,8 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {

return fmt.Sprintf("Stage: %d", stage), nil
}

func FailActivity(ctx workflow.ActivityContext) (any, error) {
failActivityTries += 1
return nil, errors.New("dummy activity error")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
32 changes: 31 additions & 1 deletion workflow/activity_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package workflow
import (
"context"
"encoding/json"
"time"

"google.golang.org/protobuf/types/known/wrapperspb"

Expand All @@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context {
type callActivityOption func(*callActivityOptions) error

type callActivityOptions struct {
rawInput *wrapperspb.StringValue
rawInput *wrapperspb.StringValue
retryPolicy *RetryPolicy
}

type RetryPolicy struct {
MaxAttempts int
InitialRetryInterval time.Duration
BackoffCoefficient float64
MaxRetryInterval time.Duration
RetryTimeout time.Duration
}

// ActivityInput is an option to pass a JSON-serializable input
Expand All @@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption {
}
}

func ActivityRetryPolicy(policy RetryPolicy) callActivityOption {
return func(opts *callActivityOptions) error {
opts.retryPolicy = &policy
return nil
}
}

func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
famarting marked this conversation as resolved.
Show resolved Hide resolved
if opts.retryPolicy == nil {
return nil
}
return &task.ActivityRetryPolicy{
MaxAttempts: opts.retryPolicy.MaxAttempts,
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
RetryTimeout: opts.retryPolicy.RetryTimeout,
}
}

func marshalData(input any) ([]byte, error) {
if input == nil {
return nil, nil
Expand Down
22 changes: 22 additions & 0 deletions workflow/activity_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/microsoft/durabletask-go/task"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -69,6 +71,26 @@ func TestCallActivityOptions(t *testing.T) {
opts := returnCallActivityOptions(ActivityRawInput("test"))
assert.Equal(t, "test", opts.rawInput.GetValue())
})

t.Run("activity retry policy - set", func(t *testing.T) {
opts := returnCallActivityOptions(ActivityRetryPolicy(RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}))
assert.Equal(t, &task.ActivityRetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}, opts.getRetryPolicy())
})

t.Run("activity retry policy - empty", func(t *testing.T) {
opts := returnCallActivityOptions()
assert.Empty(t, opts.getRetryPolicy())
})
}

func returnCallActivityOptions(opts ...callActivityOption) callActivityOptions {
Expand Down
2 changes: 1 addition & 1 deletion workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
}
}

return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()))
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))

Check warning on line 71 in workflow/context.go

View check run for this annotation

Codecov / codecov/patch

workflow/context.go#L71

Added line #L71 was not covered by tests
}

// CallChildWorkflow returns a completable task for a given workflow.
Expand Down
Loading