Skip to content

Commit

Permalink
update durabletask to use fork and child workflow retries (#656)
Browse files Browse the repository at this point in the history
* update durabletask to use fork and child workflow retries

Signed-off-by: Fabian Martinez <[email protected]>

* lint

Signed-off-by: Fabian Martinez <[email protected]>

---------

Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Dec 4, 2024
1 parent 282a58b commit 921a6a7
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 33 deletions.
4 changes: 2 additions & 2 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dapr/dapr v1.14.5-0.20241120233620-c86a77f6db5f // indirect
github.com/dapr/dapr v1.15.0-rc.1 // indirect
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.5-0.20241120233620-c86a77f6db5f h1:wXPHK2o5FIABU5BvKk/21MN6GKaoUvWc7fESH/hwVls=
github.com/dapr/dapr v1.14.5-0.20241120233620-c86a77f6db5f/go.mod h1:WlsLcudco11+BhaIvg2XyGxD+2GcZf8OTOawd94dAQs=
github.com/dapr/dapr v1.15.0-rc.1 h1:7JP3zSannxQwV27A9pPR2b/DSNmgcSjJOhRDwM4eFpQ=
github.com/dapr/dapr v1.15.0-rc.1/go.mod h1:SycZrBWgfmog+C5T4p0X6VIpnREQ3xajrYxdih+gn9w=
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198 h1:vTq9HmTXow4iVb/1dO4qVZhK7XC5KlwbLl2iNIE23Qc=
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198/go.mod h1:C4ykYCSNv1k2C4wvZv11h2ClkE/qsXw0tv6idOWVmDc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -39,8 +41,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ go 1.23.3

require (
github.com/dapr/dapr v1.15.0-rc.1
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.15.0-rc.1 h1:7JP3zSannxQwV27A9pPR2b/DSNmgcSjJOhRDwM4eFpQ=
github.com/dapr/dapr v1.15.0-rc.1/go.mod h1:SycZrBWgfmog+C5T4p0X6VIpnREQ3xajrYxdih+gn9w=
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198 h1:vTq9HmTXow4iVb/1dO4qVZhK7XC5KlwbLl2iNIE23Qc=
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198/go.mod h1:C4ykYCSNv1k2C4wvZv11h2ClkE/qsXw0tv6idOWVmDc=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
Expand All @@ -28,8 +30,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
6 changes: 3 additions & 3 deletions workflow/activity_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/microsoft/durabletask-go/task"
"github.com/dapr/durabletask-go/task"
)

type ActivityContext struct {
Expand Down Expand Up @@ -78,11 +78,11 @@ func ActivityRetryPolicy(policy RetryPolicy) callActivityOption {
}
}

func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
func (opts *callActivityOptions) getRetryPolicy() *task.RetryPolicy {
if opts.retryPolicy == nil {
return nil
}
return &task.ActivityRetryPolicy{
return &task.RetryPolicy{
MaxAttempts: opts.retryPolicy.MaxAttempts,
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
Expand Down
5 changes: 3 additions & 2 deletions workflow/activity_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"testing"
"time"

"github.com/microsoft/durabletask-go/task"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/durabletask-go/task"
)

type testingTaskActivityContext struct {
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestCallActivityOptions(t *testing.T) {
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}))
assert.Equal(t, &task.ActivityRetryPolicy{
assert.Equal(t, &task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
Expand Down
7 changes: 4 additions & 3 deletions workflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"fmt"
"time"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
durabletaskclient "github.com/microsoft/durabletask-go/client"
"google.golang.org/grpc"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/backend"
durabletaskclient "github.com/dapr/durabletask-go/client"

dapr "github.com/dapr/go-sdk/client"
)

Expand Down
8 changes: 4 additions & 4 deletions workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"fmt"
"time"

"github.com/microsoft/durabletask-go/task"
"github.com/dapr/durabletask-go/task"
)

type WorkflowContext struct {
Expand Down Expand Up @@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv
}
}

return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithActivityRetryPolicy(options.getRetryPolicy()))
}

// CallChildWorkflow returns a completable task for a given workflow.
Expand All @@ -84,9 +84,9 @@ func (wfc *WorkflowContext) CallChildWorkflow(workflow interface{}, opts ...call
}
}
if options.instanceID != "" {
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()), task.WithSubOrchestrationInstanceID(options.instanceID))
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()), task.WithSubOrchestrationInstanceID(options.instanceID), task.WithSubOrchestrationRetryPolicy(options.getRetryPolicy()))
}
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()))
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()), task.WithSubOrchestrationRetryPolicy(options.getRetryPolicy()))
}

// CreateTimer returns a completable task that blocks for a given duration.
Expand Down
3 changes: 2 additions & 1 deletion workflow/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"testing"
"time"

"github.com/microsoft/durabletask-go/task"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/durabletask-go/task"
)

func TestContext(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ limitations under the License.
*/
package workflow

import "github.com/microsoft/durabletask-go/api"
import "github.com/dapr/durabletask-go/api"

type Status int

Expand Down
3 changes: 2 additions & 1 deletion workflow/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package workflow
import (
"testing"

"github.com/microsoft/durabletask-go/api"
"github.com/stretchr/testify/assert"

"github.com/dapr/durabletask-go/api"
)

func TestString(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions workflow/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

dapr "github.com/dapr/go-sdk/client"

"github.com/microsoft/durabletask-go/backend"
durabletaskclient "github.com/microsoft/durabletask-go/client"
"github.com/microsoft/durabletask-go/task"
"github.com/dapr/durabletask-go/backend"
durabletaskclient "github.com/dapr/durabletask-go/client"
"github.com/dapr/durabletask-go/task"
)

type WorkflowWorker struct {
Expand Down
2 changes: 1 addition & 1 deletion workflow/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

daprClient "github.com/dapr/go-sdk/client"

"github.com/microsoft/durabletask-go/task"
"github.com/dapr/durabletask-go/task"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
30 changes: 26 additions & 4 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"fmt"
"time"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/task"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/task"
)

type Metadata struct {
Expand Down Expand Up @@ -87,8 +88,9 @@ func convertMetadata(orchestrationMetadata *api.OrchestrationMetadata) *Metadata
}

type callChildWorkflowOptions struct {
instanceID string
rawInput *wrapperspb.StringValue
instanceID string
rawInput *wrapperspb.StringValue
retryPolicy *RetryPolicy
}

type callChildWorkflowOption func(*callChildWorkflowOptions) error
Expand Down Expand Up @@ -121,6 +123,26 @@ func ChildWorkflowInstanceID(instanceID string) callChildWorkflowOption {
}
}

func ChildWorkflowRetryPolicy(policy RetryPolicy) callChildWorkflowOption {
return func(opts *callChildWorkflowOptions) error {
opts.retryPolicy = &policy
return nil
}
}

func (opts *callChildWorkflowOptions) getRetryPolicy() *task.RetryPolicy {
if opts.retryPolicy == nil {
return nil
}
return &task.RetryPolicy{
MaxAttempts: opts.retryPolicy.MaxAttempts,
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
RetryTimeout: opts.retryPolicy.RetryTimeout,
}
}

// NewTaskSlice returns a slice of tasks which can be executed in parallel
func NewTaskSlice(length int) []task.Task {
taskSlice := make([]task.Task, length)
Expand Down
25 changes: 24 additions & 1 deletion workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package workflow

import (
"testing"
"time"

"github.com/microsoft/durabletask-go/api"
"github.com/stretchr/testify/assert"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/task"
)

func TestConvertMetadata(t *testing.T) {
Expand Down Expand Up @@ -37,6 +40,26 @@ func TestCallChildWorkflowOptions(t *testing.T) {
opts := returnCallChildWorkflowOptions(ChildWorkflowInput(make(chan int)))
assert.Empty(t, opts.rawInput.GetValue())
})

t.Run("child workflow retry policy - set", func(t *testing.T) {
opts := returnCallChildWorkflowOptions(ChildWorkflowRetryPolicy(RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}))
assert.Equal(t, &task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 2 * time.Second,
}, opts.getRetryPolicy())
})

t.Run("child workflow retry policy - empty", func(t *testing.T) {
opts := returnCallChildWorkflowOptions()
assert.Empty(t, opts.getRetryPolicy())
})
}

func returnCallChildWorkflowOptions(opts ...callChildWorkflowOption) callChildWorkflowOptions {
Expand Down

0 comments on commit 921a6a7

Please sign in to comment.