Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement retry strategies ♻️ #6

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 60 additions & 10 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"sync"
"time"

"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish. `Meta` is whatever
Expand All @@ -20,6 +23,7 @@ type Manager[Meta any] struct {
len int
stalledThreshold time.Duration
hooks Hooks[Meta]
strategies Retry
}

// Options provides a means for configuring the background manager and attaching hooks to it.
Expand All @@ -30,6 +34,9 @@ type Options[Meta any] struct {
// Hooks allow you to register monitoring functions that are called when something happens with the goroutines that
// you schedule. These are useful for logging, monitoring, etc.
Hooks Hooks[Meta]
// DefaultRetry defines the default retry strategies that will be used for all tasks unless overridden by the task.
// Several strategies are provided by github.com/kamilsk/retry/v5/strategy package.
DefaultRetry Retry
}

// Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of
Expand All @@ -46,6 +53,22 @@ type Hooks[Meta any] struct {
OnGoroutineStalled func(ctx context.Context, meta Meta)
}

// TaskDefinition describes how a unit of work (a Task) should be executed.
type TaskDefinition[Meta any] struct {
// Task is the function to be executed in a goroutine.
Task Task
// Meta is whatever you wish to associate with the task.
Meta Meta
// Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry
// strategies you might have configured in the Manager. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
Retry Retry
}

// Retry defines the functions that control the retry behavior of a task. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
type Retry []strategy.Strategy

// Task is the function to be executed in a goroutine
type Task func(ctx context.Context) error

Expand All @@ -62,21 +85,27 @@ func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] {
return &Manager[Meta]{
stalledThreshold: options.StalledThreshold,
hooks: options.Hooks,
strategies: options.DefaultRetry,
}
}

// Run schedules the provided task to be executed in a goroutine. `Meta` is whatever you wish to associate with the
// task.
func (m *Manager[Meta]) Run(ctx context.Context, meta Meta, task Task) {
m.callOnTaskAdded(ctx, meta)
// Run schedules the provided task to be executed in a goroutine.
func (m *Manager[Meta]) Run(ctx context.Context, task Task) {
definition := TaskDefinition[Meta]{Task: task}
m.RunTaskDefinition(ctx, definition)
}

// RunTaskDefinition schedules the provided task definition to be executed in a goroutine.
func (m *Manager[Meta]) RunTaskDefinition(ctx context.Context, definition TaskDefinition[Meta]) {
m.callOnTaskAdded(ctx, definition.Meta)
m.wg.Add(1)
m.len++

ctx = context.WithoutCancel(ctx)
done := make(chan bool, 1)

go m.run(ctx, meta, task, done)
go m.ticktock(ctx, meta, done)
go m.run(ctx, definition, done)
go m.ticktock(ctx, definition.Meta, done)
}

// Wait blocks until all scheduled tasks have finished.
Expand All @@ -89,16 +118,17 @@ func (m *Manager[Meta]) Len() int {
return m.len
}

func (m *Manager[Meta]) run(ctx context.Context, meta Meta, task Task, done chan<- bool) {
err := task(ctx)
func (m *Manager[Meta]) run(ctx context.Context, definition TaskDefinition[Meta], done chan<- bool) {
strategies := mkstrategies(m.strategies, definition.Retry)
err := retry.Do(ctx, definition.Task, strategies...)
done <- true
m.wg.Done()
m.len--

if err != nil {
m.callOnTaskFailed(ctx, meta, err)
m.callOnTaskFailed(ctx, definition.Meta, err)
} else {
m.callOnTaskSucceeded(ctx, meta)
m.callOnTaskSucceeded(ctx, definition.Meta)
}
}

Expand Down Expand Up @@ -145,3 +175,23 @@ func mktimeout(duration time.Duration) <-chan time.Time {
}
return time.After(duration)
}

// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a
// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on
// failure if no strategy is provided.
func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy {
result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides)))

if len(overrides) > 0 {
result = append(result, overrides...)
} else {
result = append(result, defaults...)
}

// If no retry strategies are provided we default to a single execution attempt
if len(result) == 0 {
result = append(result, strategy.Limit(1))
}

return result
}
96 changes: 77 additions & 19 deletions background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/kamilsk/retry/v5/strategy"
"github.com/stretchr/testify/assert"
"go.strv.io/background"
)
Expand All @@ -24,7 +25,7 @@ func Test_RunExecutesInGoroutine(t *testing.T) {
m := background.NewManager()
proceed := make(chan bool, 1)

m.Run(context.Background(), nil, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
// Let the main thread advance a bit
<-proceed
proceed <- true
Expand All @@ -44,7 +45,7 @@ func Test_WaitWaitsForPendingTasks(t *testing.T) {
done := make(chan bool, 1)
var waited bool

m.Run(context.Background(), nil, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
// Let the main thread advance a bit
<-proceed
return nil
Expand All @@ -67,7 +68,7 @@ func Test_CancelledParentContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
proceed := make(chan bool, 1)

m.Run(ctx, nil, func(ctx context.Context) error {
m.Run(ctx, func(ctx context.Context) error {
<-proceed
assert.Nil(t, ctx.Err())
return nil
Expand All @@ -81,17 +82,17 @@ func Test_CancelledParentContext(t *testing.T) {
func Test_Len(t *testing.T) {
proceed := make(chan bool, 1)
remaining := 10
m := background.NewManagerWithOptions(background.Options[testmeta]{
Hooks: background.Hooks[testmeta]{
OnTaskSucceeded: func(ctx context.Context, meta testmeta) {
m := background.NewManagerWithOptions(background.Options[background.NilMeta]{
Hooks: background.Hooks[background.NilMeta]{
OnTaskSucceeded: func(ctx context.Context, meta background.NilMeta) {
remaining--
proceed <- true
},
},
})

for range 10 {
m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
<-proceed
return nil
})
Expand All @@ -117,9 +118,13 @@ func Test_OnTaskAdded(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return nil
})
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
return nil
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)

wg.Wait()
assert.True(t, executed)
Expand All @@ -140,9 +145,13 @@ func Test_OnTaskSucceeded(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return nil
})
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
return nil
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)

wg.Wait()
assert.True(t, executed)
Expand All @@ -164,9 +173,13 @@ func Test_OnTaskFailed(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return assert.AnError
})
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
return assert.AnError
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)

wg.Wait()
assert.True(t, executed)
Expand Down Expand Up @@ -203,8 +216,15 @@ func Test_OnGoroutineStalled(t *testing.T) {
},
})

m.Run(context.Background(), metaval, func(ctx context.Context) error {
<-time.After(test.duration)
def := background.TaskDefinition[testmeta]{
Task: func(ctx context.Context) error {
<-time.After(test.duration)
return nil
},
Meta: metaval,
}
m.RunTaskDefinition(context.Background(), def)
m.Run(context.Background(), func(ctx context.Context) error {
return nil
})

Expand All @@ -228,11 +248,49 @@ func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) {
})

wg.Add(1)
m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), func(ctx context.Context) error {
<-time.After(time.Millisecond * 3)
return nil
})

wg.Wait()
assert.True(t, executed)
}

func Test_TaskDefinitionRetryStrategies(t *testing.T) {
var limit uint = 5
var count uint = 0
m := background.NewManager()
def := background.TaskDefinition[background.NilMeta]{
Task: func(ctx context.Context) error {
count++
return assert.AnError
},
Retry: background.Retry{
strategy.Limit(limit),
},
}

m.RunTaskDefinition(context.Background(), def)
m.Wait()

assert.Equal(t, limit, count)
}

func Test_ManagerDefaultRetryStrategies(t *testing.T) {
var limit uint = 5
var count uint = 0
m := background.NewManagerWithOptions(background.Options[background.NilMeta]{
DefaultRetry: background.Retry{
strategy.Limit(limit),
},
})

m.Run(context.Background(), func(ctx context.Context) error {
count++
return assert.AnError
})
m.Wait()

assert.Equal(t, limit, count)
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module go.strv.io/background

go 1.22.0

require github.com/stretchr/testify v1.8.4
require (
github.com/kamilsk/retry/v5 v5.0.0-rc8
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kamilsk/retry/v5 v5.0.0-rc8 h1:7gPn+mf/wYpiBdovfFtE9jJ2O4eFny8Y/p6vrXON8ZI=
github.com/kamilsk/retry/v5 v5.0.0-rc8/go.mod h1:pY2mWDkk4Ld6B4XFBk4GiPIUSIjIAHuvRZczhbcWKQs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
Expand Down