diff --git a/background.go b/background.go index d831bda..1457476 100644 --- a/background.go +++ b/background.go @@ -4,6 +4,9 @@ import ( "context" "sync" "time" + + "github.com/kamilsk/retry/v5" + "github.com/kamilsk/retry/v5/strategy" ) // Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish. `Meta` is whatever @@ -20,6 +23,7 @@ type Manager[Meta any] struct { len int stalledThreshold time.Duration hooks Hooks[Meta] + strategies Retry } // Options provides a means for configuring the background manager and attaching hooks to it. @@ -30,6 +34,9 @@ type Options[Meta any] struct { // Hooks allow you to register monitoring functions that are called when something happens with the goroutines that // you schedule. These are useful for logging, monitoring, etc. Hooks Hooks[Meta] + // DefaultRetry defines the default retry strategies that will be used for all tasks unless overridden by the task. + // Several strategies are provided by github.com/kamilsk/retry/v5/strategy package. + DefaultRetry Retry } // Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of @@ -46,6 +53,22 @@ type Hooks[Meta any] struct { OnGoroutineStalled func(ctx context.Context, meta Meta) } +// TaskDefinition describes how a unit of work (a Task) should be executed. +type TaskDefinition[Meta any] struct { + // Task is the function to be executed in a goroutine. + Task Task + // Meta is whatever you wish to associate with the task. + Meta Meta + // Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry + // strategies you might have configured in the Manager. Several strategies are provided by + // github.com/kamilsk/retry/v5/strategy package. + Retry Retry +} + +// Retry defines the functions that control the retry behavior of a task. Several strategies are provided by +// github.com/kamilsk/retry/v5/strategy package. +type Retry []strategy.Strategy + // Task is the function to be executed in a goroutine type Task func(ctx context.Context) error @@ -62,21 +85,27 @@ func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] { return &Manager[Meta]{ stalledThreshold: options.StalledThreshold, hooks: options.Hooks, + strategies: options.DefaultRetry, } } -// Run schedules the provided task to be executed in a goroutine. `Meta` is whatever you wish to associate with the -// task. -func (m *Manager[Meta]) Run(ctx context.Context, meta Meta, task Task) { - m.callOnTaskAdded(ctx, meta) +// Run schedules the provided task to be executed in a goroutine. +func (m *Manager[Meta]) Run(ctx context.Context, task Task) { + definition := TaskDefinition[Meta]{Task: task} + m.RunTaskDefinition(ctx, definition) +} + +// RunTaskDefinition schedules the provided task definition to be executed in a goroutine. +func (m *Manager[Meta]) RunTaskDefinition(ctx context.Context, definition TaskDefinition[Meta]) { + m.callOnTaskAdded(ctx, definition.Meta) m.wg.Add(1) m.len++ ctx = context.WithoutCancel(ctx) done := make(chan bool, 1) - go m.run(ctx, meta, task, done) - go m.ticktock(ctx, meta, done) + go m.run(ctx, definition, done) + go m.ticktock(ctx, definition.Meta, done) } // Wait blocks until all scheduled tasks have finished. @@ -89,16 +118,17 @@ func (m *Manager[Meta]) Len() int { return m.len } -func (m *Manager[Meta]) run(ctx context.Context, meta Meta, task Task, done chan<- bool) { - err := task(ctx) +func (m *Manager[Meta]) run(ctx context.Context, definition TaskDefinition[Meta], done chan<- bool) { + strategies := mkstrategies(m.strategies, definition.Retry) + err := retry.Do(ctx, definition.Task, strategies...) done <- true m.wg.Done() m.len-- if err != nil { - m.callOnTaskFailed(ctx, meta, err) + m.callOnTaskFailed(ctx, definition.Meta, err) } else { - m.callOnTaskSucceeded(ctx, meta) + m.callOnTaskSucceeded(ctx, definition.Meta) } } @@ -145,3 +175,23 @@ func mktimeout(duration time.Duration) <-chan time.Time { } return time.After(duration) } + +// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a +// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on +// failure if no strategy is provided. +func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy { + result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides))) + + if len(overrides) > 0 { + result = append(result, overrides...) + } else { + result = append(result, defaults...) + } + + // If no retry strategies are provided we default to a single execution attempt + if len(result) == 0 { + result = append(result, strategy.Limit(1)) + } + + return result +} diff --git a/background_test.go b/background_test.go index 914dcc5..542e693 100644 --- a/background_test.go +++ b/background_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/kamilsk/retry/v5/strategy" "github.com/stretchr/testify/assert" "go.strv.io/background" ) @@ -24,7 +25,7 @@ func Test_RunExecutesInGoroutine(t *testing.T) { m := background.NewManager() proceed := make(chan bool, 1) - m.Run(context.Background(), nil, func(ctx context.Context) error { + m.Run(context.Background(), func(ctx context.Context) error { // Let the main thread advance a bit <-proceed proceed <- true @@ -44,7 +45,7 @@ func Test_WaitWaitsForPendingTasks(t *testing.T) { done := make(chan bool, 1) var waited bool - m.Run(context.Background(), nil, func(ctx context.Context) error { + m.Run(context.Background(), func(ctx context.Context) error { // Let the main thread advance a bit <-proceed return nil @@ -67,7 +68,7 @@ func Test_CancelledParentContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) proceed := make(chan bool, 1) - m.Run(ctx, nil, func(ctx context.Context) error { + m.Run(ctx, func(ctx context.Context) error { <-proceed assert.Nil(t, ctx.Err()) return nil @@ -81,9 +82,9 @@ func Test_CancelledParentContext(t *testing.T) { func Test_Len(t *testing.T) { proceed := make(chan bool, 1) remaining := 10 - m := background.NewManagerWithOptions(background.Options[testmeta]{ - Hooks: background.Hooks[testmeta]{ - OnTaskSucceeded: func(ctx context.Context, meta testmeta) { + m := background.NewManagerWithOptions(background.Options[background.NilMeta]{ + Hooks: background.Hooks[background.NilMeta]{ + OnTaskSucceeded: func(ctx context.Context, meta background.NilMeta) { remaining-- proceed <- true }, @@ -91,7 +92,7 @@ func Test_Len(t *testing.T) { }) for range 10 { - m.Run(context.Background(), true, func(ctx context.Context) error { + m.Run(context.Background(), func(ctx context.Context) error { <-proceed return nil }) @@ -117,9 +118,13 @@ func Test_OnTaskAdded(t *testing.T) { }) wg.Add(1) - m.Run(context.Background(), metaval, func(ctx context.Context) error { - return nil - }) + def := background.TaskDefinition[testmeta]{ + Task: func(ctx context.Context) error { + return nil + }, + Meta: metaval, + } + m.RunTaskDefinition(context.Background(), def) wg.Wait() assert.True(t, executed) @@ -140,9 +145,13 @@ func Test_OnTaskSucceeded(t *testing.T) { }) wg.Add(1) - m.Run(context.Background(), metaval, func(ctx context.Context) error { - return nil - }) + def := background.TaskDefinition[testmeta]{ + Task: func(ctx context.Context) error { + return nil + }, + Meta: metaval, + } + m.RunTaskDefinition(context.Background(), def) wg.Wait() assert.True(t, executed) @@ -164,9 +173,13 @@ func Test_OnTaskFailed(t *testing.T) { }) wg.Add(1) - m.Run(context.Background(), metaval, func(ctx context.Context) error { - return assert.AnError - }) + def := background.TaskDefinition[testmeta]{ + Task: func(ctx context.Context) error { + return assert.AnError + }, + Meta: metaval, + } + m.RunTaskDefinition(context.Background(), def) wg.Wait() assert.True(t, executed) @@ -203,8 +216,15 @@ func Test_OnGoroutineStalled(t *testing.T) { }, }) - m.Run(context.Background(), metaval, func(ctx context.Context) error { - <-time.After(test.duration) + def := background.TaskDefinition[testmeta]{ + Task: func(ctx context.Context) error { + <-time.After(test.duration) + return nil + }, + Meta: metaval, + } + m.RunTaskDefinition(context.Background(), def) + m.Run(context.Background(), func(ctx context.Context) error { return nil }) @@ -228,7 +248,7 @@ func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) { }) wg.Add(1) - m.Run(context.Background(), true, func(ctx context.Context) error { + m.Run(context.Background(), func(ctx context.Context) error { <-time.After(time.Millisecond * 3) return nil }) @@ -236,3 +256,41 @@ func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) { wg.Wait() assert.True(t, executed) } + +func Test_TaskDefinitionRetryStrategies(t *testing.T) { + var limit uint = 5 + var count uint = 0 + m := background.NewManager() + def := background.TaskDefinition[background.NilMeta]{ + Task: func(ctx context.Context) error { + count++ + return assert.AnError + }, + Retry: background.Retry{ + strategy.Limit(limit), + }, + } + + m.RunTaskDefinition(context.Background(), def) + m.Wait() + + assert.Equal(t, limit, count) +} + +func Test_ManagerDefaultRetryStrategies(t *testing.T) { + var limit uint = 5 + var count uint = 0 + m := background.NewManagerWithOptions(background.Options[background.NilMeta]{ + DefaultRetry: background.Retry{ + strategy.Limit(limit), + }, + }) + + m.Run(context.Background(), func(ctx context.Context) error { + count++ + return assert.AnError + }) + m.Wait() + + assert.Equal(t, limit, count) +} diff --git a/go.mod b/go.mod index 40f09ff..a252888 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module go.strv.io/background go 1.22.0 -require github.com/stretchr/testify v1.8.4 +require ( + github.com/kamilsk/retry/v5 v5.0.0-rc8 + github.com/stretchr/testify v1.8.4 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index fa4b6e6..f4e438c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kamilsk/retry/v5 v5.0.0-rc8 h1:7gPn+mf/wYpiBdovfFtE9jJ2O4eFny8Y/p6vrXON8ZI= +github.com/kamilsk/retry/v5 v5.0.0-rc8/go.mod h1:pY2mWDkk4Ld6B4XFBk4GiPIUSIjIAHuvRZczhbcWKQs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=