Skip to content

Commit

Permalink
feat: implement retry strategies ♻️
Browse files Browse the repository at this point in the history
Now the task can be set up to retry on failure. By default, we only make a single execution attempt.

The retry mechanism is implemented in github.com/kamilsk/retry/v5 . I chose this package over https://github.com/eapache/go-resiliency because `retry` has a much more flexible API, provides a ton of strategies and backoff algorithms and the strategies are nicely defined via an interface, making extensions and customisations very easy.

Unfortunately, the last update to `retry` was in February 2021 so I might yet regret this decision but for the time being I feel like this is the superior choice over `go-resiliency`.
  • Loading branch information
robertrossmann committed Feb 28, 2024
1 parent 026eb05 commit 14a4f2b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 30 deletions.
70 changes: 60 additions & 10 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"sync"
"time"

"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish. `Meta` is whatever
Expand All @@ -20,6 +23,7 @@ type Manager[Meta any] struct {
len int
stalledThreshold time.Duration
hooks Hooks[Meta]
strategies Retry
}

// Options provides a means for configuring the background manager and attaching hooks to it.
Expand All @@ -30,6 +34,9 @@ type Options[Meta any] struct {
// Hooks allow you to register monitoring functions that are called when something happens with the goroutines that
// you schedule. These are useful for logging, monitoring, etc.
Hooks Hooks[Meta]
// DefaultRetry defines the default retry strategies that will be used for all tasks unless overridden by the task.
// Several strategies are provided by github.com/kamilsk/retry/v5/strategy package.
DefaultRetry Retry
}

// Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of
Expand All @@ -46,6 +53,22 @@ type Hooks[Meta any] struct {
OnGoroutineStalled func(ctx context.Context, meta Meta)
}

// TaskDefinition describes how a unit of work (a Task) should be executed.
type TaskDefinition[Meta any] struct {
// Task is the function to be executed in a goroutine.
Task Task
// Meta is whatever you wish to associate with the task.
Meta Meta
// Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry
// strategies you might have configured in the Manager. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
Retry Retry
}

// Retry defines the functions that control the retry behavior of a task. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
type Retry []strategy.Strategy

// Task is the function to be executed in a goroutine
type Task func(ctx context.Context) error

Expand All @@ -62,21 +85,27 @@ func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] {
return &Manager[Meta]{
stalledThreshold: options.StalledThreshold,
hooks: options.Hooks,
strategies: options.DefaultRetry,
}
}

// Run schedules the provided task to be executed in a goroutine. `Meta` is whatever you wish to associate with the
// task.
func (m *Manager[Meta]) Run(ctx context.Context, meta Meta, task Task) {
m.callOnTaskAdded(ctx, meta)
// Run schedules the provided task to be executed in a goroutine.
func (m *Manager[Meta]) Run(ctx context.Context, task Task) {
definition := TaskDefinition[Meta]{Task: task}
m.RunTaskDefinition(ctx, definition)
}

// RunTaskDefinition schedules the provided task definition to be executed in a goroutine.
func (m *Manager[Meta]) RunTaskDefinition(ctx context.Context, definition TaskDefinition[Meta]) {
m.callOnTaskAdded(ctx, definition.Meta)
m.wg.Add(1)
m.len++

ctx = context.WithoutCancel(ctx)
done := make(chan bool, 1)

go m.run(ctx, meta, task, done)
go m.ticktock(ctx, meta, done)
go m.run(ctx, definition, done)
go m.ticktock(ctx, definition.Meta, done)
}

// Wait blocks until all scheduled tasks have finished.
Expand All @@ -89,16 +118,17 @@ func (m *Manager[Meta]) Len() int {
return m.len
}

func (m *Manager[Meta]) run(ctx context.Context, meta Meta, task Task, done chan<- bool) {
err := task(ctx)
func (m *Manager[Meta]) run(ctx context.Context, definition TaskDefinition[Meta], done chan<- bool) {
strategies := mkstrategies(m.strategies, definition.Retry)
err := retry.Do(ctx, definition.Task, strategies...)
done <- true
m.wg.Done()
m.len--

if err != nil {
m.callOnTaskFailed(ctx, meta, err)
m.callOnTaskFailed(ctx, definition.Meta, err)
} else {
m.callOnTaskSucceeded(ctx, meta)
m.callOnTaskSucceeded(ctx, definition.Meta)
}
}

Expand Down Expand Up @@ -145,3 +175,23 @@ func mktimeout(duration time.Duration) <-chan time.Time {
}
return time.After(duration)
}

// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a
// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on
// failure if no strategy is provided.
func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy {
result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides)))

if len(overrides) > 0 {
result = append(result, overrides...)
} else {
result = append(result, defaults...)
}

// If no retry strategies are provided we default to a single execution attempt
if len(result) == 0 {
result = append(result, strategy.Limit(1))
}

return result
}
96 changes: 77 additions & 19 deletions background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/kamilsk/retry/v5/strategy"
"github.com/stretchr/testify/assert"
"go.strv.io/background"
)
Expand All @@ -24,7 +25,7 @@ func Test_RunExecutesInGoroutine(t *testing.T) {
m := background.NewManager()
proceed := make(chan bool, 1)

m.Run(context.Background(), nil, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
// Let the main thread advance a bit
<-proceed
proceed <- true
Expand All @@ -44,7 +45,7 @@ func Test_WaitWaitsForPendingTasks(t *testing.T) {
done := make(chan bool, 1)
var waited bool

m.Run(context.Background(), nil, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
// Let the main thread advance a bit
<-proceed
return nil
Expand All @@ -67,7 +68,7 @@ func Test_CancelledParentContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
proceed := make(chan bool, 1)

m.Run(ctx, nil, func(ctx context.Context) error {
m.Run(ctx, func(ctx context.Context) error {
<-proceed
assert.Nil(t, ctx.Err())
return nil
Expand All @@ -81,17 +82,17 @@ func Test_CancelledParentContext(t *testing.T) {
func Test_Len(t *testing.T) {
proceed := make(chan bool, 1)
remaining := 10
m := background.NewManagerWithOptions(background.Options[testmeta]{
Hooks: background.Hooks[testmeta]{
OnTaskSucceeded: func(ctx context.Context, meta testmeta) {
m := background.NewManagerWithOptions(background.Options[background.NilMeta]{
Hooks: background.Hooks[background.NilMeta]{
OnTaskSucceeded: func(ctx context.Context, meta background.NilMeta) {
remaining--
proceed <- true
},
},
})

for range 10 {
m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
<-proceed
return nil
})
Expand All @@ -117,9 +118,13 @@ func Test_OnTaskAdded(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return nil
})
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
return nil
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)

wg.Wait()
assert.True(t, executed)
Expand All @@ -140,9 +145,13 @@ func Test_OnTaskSucceeded(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return nil
})
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
return nil
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)

wg.Wait()
assert.True(t, executed)
Expand All @@ -164,9 +173,13 @@ func Test_OnTaskFailed(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return assert.AnError
})
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
return assert.AnError
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)

wg.Wait()
assert.True(t, executed)
Expand Down Expand Up @@ -203,8 +216,15 @@ func Test_OnGoroutineStalled(t *testing.T) {
},
})

m.Run(context.Background(), metaval, func(ctx context.Context) error {
<-time.After(test.duration)
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
<-time.After(test.duration)
return nil
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)
m.Run(context.Background(), func(ctx context.Context) error {
return nil
})

Expand All @@ -228,11 +248,49 @@ func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
<-time.After(time.Millisecond * 3)
return nil
})

wg.Wait()
assert.True(t, executed)
}

func Test_TaskDefinitionRetryStrategies(t *testing.T) {
var limit uint = 5
var count uint = 0
m := background.NewManager()
def := background.TaskDefinition[background.NilMeta]{
Task: func(ctx context.Context) error {
count++
return assert.AnError
},
Retry: background.Retry{
strategy.Limit(limit),
},
}

m.RunTaskDefinition(context.Background(), def)
m.Wait()

assert.Equal(t, limit, count)
}

func Test_ManagerDefaultRetryStrategies(t *testing.T) {
var limit uint = 5
var count uint = 0
m := background.NewManagerWithOptions(background.Options[background.NilMeta]{
DefaultRetry: background.Retry{
strategy.Limit(limit),
},
})

m.Run(context.Background(), func(ctx context.Context) error {
count++
return assert.AnError
})
m.Wait()

assert.Equal(t, limit, count)
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module go.strv.io/background

go 1.22.0

require github.com/stretchr/testify v1.8.4
require (
github.com/kamilsk/retry/v5 v5.0.0-rc8
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kamilsk/retry/v5 v5.0.0-rc8 h1:7gPn+mf/wYpiBdovfFtE9jJ2O4eFny8Y/p6vrXON8ZI=
github.com/kamilsk/retry/v5 v5.0.0-rc8/go.mod h1:pY2mWDkk4Ld6B4XFBk4GiPIUSIjIAHuvRZczhbcWKQs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down

0 comments on commit 14a4f2b

Please sign in to comment.