From 767cfae21ac16d5d369d27706c5ef88003694656 Mon Sep 17 00:00:00 2001 From: Robert Rossmann Date: Wed, 28 Feb 2024 16:54:52 +0100 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20remove=20generic=20parameter=20?= =?UTF-8?q?=F0=9F=A5=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Having the Meta field generic was nice but Go cannot yet infer the generic type on many places yet and it was just very cumbersome to work with while it brought minimal benefits. It's sad to see it go away but I belive it's for the best. --- background.go | 71 +++++++++++++++++++------------------- background_test.go | 86 ++++++++++++++++++++++------------------------ 2 files changed, 78 insertions(+), 79 deletions(-) diff --git a/background.go b/background.go index 1457476..b64f40f 100644 --- a/background.go +++ b/background.go @@ -18,22 +18,22 @@ import ( // before they can run to completion. This package is not a replacement for a proper task queue system but it is a great // package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able // to wait for all those goroutines to finish before allowing the process to exit. -type Manager[Meta any] struct { +type Manager struct { wg sync.WaitGroup len int stalledThreshold time.Duration - hooks Hooks[Meta] + hooks Hooks strategies Retry } // Options provides a means for configuring the background manager and attaching hooks to it. -type Options[Meta any] struct { +type Options struct { // StalledThreshold is the amount of time within which the goroutine should return before it is considered stalled. // Note that no effort is made to actually kill the goroutine. StalledThreshold time.Duration // Hooks allow you to register monitoring functions that are called when something happens with the goroutines that // you schedule. These are useful for logging, monitoring, etc. - Hooks Hooks[Meta] + Hooks Hooks // DefaultRetry defines the default retry strategies that will be used for all tasks unless overridden by the task. // Several strategies are provided by github.com/kamilsk/retry/v5/strategy package. DefaultRetry Retry @@ -41,24 +41,24 @@ type Options[Meta any] struct { // Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of // them are optional; implement only those you need. -type Hooks[Meta any] struct { +type Hooks struct { // OnTaskAdded is called immediately after calling Run(). - OnTaskAdded func(ctx context.Context, meta Meta) + OnTaskAdded func(ctx context.Context, meta Metadata) // OnTaskSucceeded is called immediately after Task returns with no error. - OnTaskSucceeded func(ctx context.Context, meta Meta) + OnTaskSucceeded func(ctx context.Context, meta Metadata) // OnTaskFailed is called immediately after Task returns with an error. - OnTaskFailed func(ctx context.Context, meta Meta, err error) + OnTaskFailed func(ctx context.Context, meta Metadata, err error) // OnGoroutineStalled is called when the goroutine does not return within the StalledThreshold. You can use this to // make sure your goroutines do not take excessive amounts of time to run to completion. - OnGoroutineStalled func(ctx context.Context, meta Meta) + OnGoroutineStalled func(ctx context.Context, meta Metadata) } -// TaskDefinition describes how a unit of work (a Task) should be executed. -type TaskDefinition[Meta any] struct { - // Task is the function to be executed in a goroutine. - Task Task - // Meta is whatever you wish to associate with the task. - Meta Meta +// Task describes how a unit of work (a function) should be executed. +type Task struct { + // Fn is the function to be executed in a goroutine. + Fn Fn + // Meta is whatever custom information you wish to associate with the task. + Meta Metadata // Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry // strategies you might have configured in the Manager. Several strategies are provided by // github.com/kamilsk/retry/v5/strategy package. @@ -69,20 +69,21 @@ type TaskDefinition[Meta any] struct { // github.com/kamilsk/retry/v5/strategy package. type Retry []strategy.Strategy -// Task is the function to be executed in a goroutine -type Task func(ctx context.Context) error +// Fn is the function to be executed in a goroutine +type Fn func(ctx context.Context) error -// NilMeta is a type that can be used as Meta generic type when you do not need to associate any metadata with the task. -type NilMeta *struct{} +// Metadata is whatever custom information you wish to associate with a task. This information will be available in your +// lifecycle hooks to help you identify which task is being processed. +type Metadata map[string]string // NewManager creates a new instance of Manager with default options and no hooks. -func NewManager() *Manager[NilMeta] { - return &Manager[NilMeta]{} +func NewManager() *Manager { + return &Manager{} } // NewManagerWithOptions creates a new instance of Manager with the provided options and hooks. -func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] { - return &Manager[Meta]{ +func NewManagerWithOptions(options Options) *Manager { + return &Manager{ stalledThreshold: options.StalledThreshold, hooks: options.Hooks, strategies: options.DefaultRetry, @@ -90,13 +91,13 @@ func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] { } // Run schedules the provided task to be executed in a goroutine. -func (m *Manager[Meta]) Run(ctx context.Context, task Task) { - definition := TaskDefinition[Meta]{Task: task} +func (m *Manager) Run(ctx context.Context, task Fn) { + definition := Task{Fn: task} m.RunTaskDefinition(ctx, definition) } // RunTaskDefinition schedules the provided task definition to be executed in a goroutine. -func (m *Manager[Meta]) RunTaskDefinition(ctx context.Context, definition TaskDefinition[Meta]) { +func (m *Manager) RunTaskDefinition(ctx context.Context, definition Task) { m.callOnTaskAdded(ctx, definition.Meta) m.wg.Add(1) m.len++ @@ -109,18 +110,18 @@ func (m *Manager[Meta]) RunTaskDefinition(ctx context.Context, definition TaskDe } // Wait blocks until all scheduled tasks have finished. -func (m *Manager[Meta]) Wait() { +func (m *Manager) Wait() { m.wg.Wait() } // Len returns the number of currently running tasks. -func (m *Manager[Meta]) Len() int { +func (m *Manager) Len() int { return m.len } -func (m *Manager[Meta]) run(ctx context.Context, definition TaskDefinition[Meta], done chan<- bool) { +func (m *Manager) run(ctx context.Context, definition Task, done chan<- bool) { strategies := mkstrategies(m.strategies, definition.Retry) - err := retry.Do(ctx, definition.Task, strategies...) + err := retry.Do(ctx, definition.Fn, strategies...) done <- true m.wg.Done() m.len-- @@ -132,7 +133,7 @@ func (m *Manager[Meta]) run(ctx context.Context, definition TaskDefinition[Meta] } } -func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan bool) { +func (m *Manager) ticktock(ctx context.Context, meta Metadata, done <-chan bool) { timeout := mktimeout(m.stalledThreshold) select { case <-done: @@ -143,25 +144,25 @@ func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan boo } } -func (m *Manager[Meta]) callOnTaskFailed(ctx context.Context, meta Meta, err error) { +func (m *Manager) callOnTaskFailed(ctx context.Context, meta Metadata, err error) { if m.hooks.OnTaskFailed != nil { m.hooks.OnTaskFailed(ctx, meta, err) } } -func (m *Manager[Meta]) callOnTaskSucceeded(ctx context.Context, meta Meta) { +func (m *Manager) callOnTaskSucceeded(ctx context.Context, meta Metadata) { if m.hooks.OnTaskSucceeded != nil { m.hooks.OnTaskSucceeded(ctx, meta) } } -func (m *Manager[Meta]) callOnTaskAdded(ctx context.Context, meta Meta) { +func (m *Manager) callOnTaskAdded(ctx context.Context, meta Metadata) { if m.hooks.OnTaskAdded != nil { m.hooks.OnTaskAdded(ctx, meta) } } -func (m *Manager[Meta]) callOnGoroutineStalled(ctx context.Context, meta Meta) { +func (m *Manager) callOnGoroutineStalled(ctx context.Context, meta Metadata) { if m.hooks.OnGoroutineStalled != nil { m.hooks.OnGoroutineStalled(ctx, meta) } diff --git a/background_test.go b/background_test.go index 542e693..22e1e87 100644 --- a/background_test.go +++ b/background_test.go @@ -12,12 +12,10 @@ import ( "go.strv.io/background" ) -type testmeta bool - func Test_New(t *testing.T) { m := background.NewManager() assert.NotNil(t, m) - assert.IsType(t, &background.Manager[background.NilMeta]{}, m) + assert.IsType(t, &background.Manager{}, m) assert.Equal(t, 0, m.Len()) } @@ -82,9 +80,9 @@ func Test_CancelledParentContext(t *testing.T) { func Test_Len(t *testing.T) { proceed := make(chan bool, 1) remaining := 10 - m := background.NewManagerWithOptions(background.Options[background.NilMeta]{ - Hooks: background.Hooks[background.NilMeta]{ - OnTaskSucceeded: func(ctx context.Context, meta background.NilMeta) { + m := background.NewManagerWithOptions(background.Options{ + Hooks: background.Hooks{ + OnTaskSucceeded: func(ctx context.Context, meta background.Metadata) { remaining-- proceed <- true }, @@ -104,13 +102,13 @@ func Test_Len(t *testing.T) { } func Test_OnTaskAdded(t *testing.T) { - var metaval testmeta = true + metadata := background.Metadata{"test": "value"} executed := false var wg sync.WaitGroup - m := background.NewManagerWithOptions(background.Options[testmeta]{ - Hooks: background.Hooks[testmeta]{ - OnTaskAdded: func(ctx context.Context, meta testmeta) { - assert.Equal(t, metaval, meta) + m := background.NewManagerWithOptions(background.Options{ + Hooks: background.Hooks{ + OnTaskAdded: func(ctx context.Context, meta background.Metadata) { + assert.Equal(t, metadata, meta) executed = true wg.Done() }, @@ -118,11 +116,11 @@ func Test_OnTaskAdded(t *testing.T) { }) wg.Add(1) - def := background.TaskDefinition[testmeta]{ - Task: func(ctx context.Context) error { + def := background.Task{ + Fn: func(ctx context.Context) error { return nil }, - Meta: metaval, + Meta: metadata, } m.RunTaskDefinition(context.Background(), def) @@ -131,13 +129,13 @@ func Test_OnTaskAdded(t *testing.T) { } func Test_OnTaskSucceeded(t *testing.T) { - var metaval testmeta = true + metadata := background.Metadata{"test": "value"} executed := false var wg sync.WaitGroup - m := background.NewManagerWithOptions(background.Options[testmeta]{ - Hooks: background.Hooks[testmeta]{ - OnTaskSucceeded: func(ctx context.Context, meta testmeta) { - assert.Equal(t, metaval, meta) + m := background.NewManagerWithOptions(background.Options{ + Hooks: background.Hooks{ + OnTaskSucceeded: func(ctx context.Context, meta background.Metadata) { + assert.Equal(t, metadata, meta) executed = true wg.Done() }, @@ -145,11 +143,11 @@ func Test_OnTaskSucceeded(t *testing.T) { }) wg.Add(1) - def := background.TaskDefinition[testmeta]{ - Task: func(ctx context.Context) error { + def := background.Task{ + Fn: func(ctx context.Context) error { return nil }, - Meta: metaval, + Meta: metadata, } m.RunTaskDefinition(context.Background(), def) @@ -158,13 +156,13 @@ func Test_OnTaskSucceeded(t *testing.T) { } func Test_OnTaskFailed(t *testing.T) { - var metaval testmeta = true + metadata := background.Metadata{"test": "value"} executed := false var wg sync.WaitGroup - m := background.NewManagerWithOptions(background.Options[testmeta]{ - Hooks: background.Hooks[testmeta]{ - OnTaskFailed: func(ctx context.Context, meta testmeta, err error) { - assert.Equal(t, metaval, meta) + m := background.NewManagerWithOptions(background.Options{ + Hooks: background.Hooks{ + OnTaskFailed: func(ctx context.Context, meta background.Metadata, err error) { + assert.Equal(t, metadata, meta) assert.Error(t, err) executed = true wg.Done() @@ -173,11 +171,11 @@ func Test_OnTaskFailed(t *testing.T) { }) wg.Add(1) - def := background.TaskDefinition[testmeta]{ - Task: func(ctx context.Context) error { + def := background.Task{ + Fn: func(ctx context.Context) error { return assert.AnError }, - Meta: metaval, + Meta: metadata, } m.RunTaskDefinition(context.Background(), def) @@ -198,30 +196,30 @@ func Test_OnGoroutineStalled(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("duration of %s)", test.duration.String()), func(t *testing.T) { - var metaval testmeta = true + metadata := background.Metadata{"test": "value"} executed := false var wg sync.WaitGroup if test.shouldExecute == true { wg.Add(1) } - m := background.NewManagerWithOptions(background.Options[testmeta]{ + m := background.NewManagerWithOptions(background.Options{ StalledThreshold: time.Millisecond * 5, - Hooks: background.Hooks[testmeta]{ - OnGoroutineStalled: func(ctx context.Context, meta testmeta) { - assert.Equal(t, metaval, meta) + Hooks: background.Hooks{ + OnGoroutineStalled: func(ctx context.Context, meta background.Metadata) { + assert.Equal(t, metadata, meta) executed = true wg.Done() }, }, }) - def := background.TaskDefinition[testmeta]{ - Task: func(ctx context.Context) error { + def := background.Task{ + Fn: func(ctx context.Context) error { <-time.After(test.duration) return nil }, - Meta: metaval, + Meta: metadata, } m.RunTaskDefinition(context.Background(), def) m.Run(context.Background(), func(ctx context.Context) error { @@ -237,10 +235,10 @@ func Test_OnGoroutineStalled(t *testing.T) { func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) { executed := false var wg sync.WaitGroup - m := background.NewManagerWithOptions(background.Options[testmeta]{ + m := background.NewManagerWithOptions(background.Options{ StalledThreshold: time.Millisecond, - Hooks: background.Hooks[testmeta]{ - OnTaskSucceeded: func(ctx context.Context, meta testmeta) { + Hooks: background.Hooks{ + OnTaskSucceeded: func(ctx context.Context, meta background.Metadata) { executed = true wg.Done() }, @@ -261,8 +259,8 @@ func Test_TaskDefinitionRetryStrategies(t *testing.T) { var limit uint = 5 var count uint = 0 m := background.NewManager() - def := background.TaskDefinition[background.NilMeta]{ - Task: func(ctx context.Context) error { + def := background.Task{ + Fn: func(ctx context.Context) error { count++ return assert.AnError }, @@ -280,7 +278,7 @@ func Test_TaskDefinitionRetryStrategies(t *testing.T) { func Test_ManagerDefaultRetryStrategies(t *testing.T) { var limit uint = 5 var count uint = 0 - m := background.NewManagerWithOptions(background.Options[background.NilMeta]{ + m := background.NewManagerWithOptions(background.Options{ DefaultRetry: background.Retry{ strategy.Limit(limit), }, From 5786c12504651a297336812832608d21d4f102c5 Mon Sep 17 00:00:00 2001 From: Robert Rossmann Date: Wed, 28 Feb 2024 22:42:25 +0100 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20rename,=20restructure,=20reword=20t?= =?UTF-8?q?he=20shit=20out=20of=20it=20=F0=9F=94=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- background.go | 132 ++++++++++++--------------------------------- background_test.go | 46 ++++++++-------- internal.go | 36 +++++++++++++ observer.go | 41 ++++++++++++++ 4 files changed, 133 insertions(+), 122 deletions(-) create mode 100644 internal.go create mode 100644 observer.go diff --git a/background.go b/background.go index b64f40f..1664617 100644 --- a/background.go +++ b/background.go @@ -22,42 +22,29 @@ type Manager struct { wg sync.WaitGroup len int stalledThreshold time.Duration - hooks Hooks - strategies Retry + observer Observer + retry Retry } // Options provides a means for configuring the background manager and attaching hooks to it. type Options struct { - // StalledThreshold is the amount of time within which the goroutine should return before it is considered stalled. - // Note that no effort is made to actually kill the goroutine. + // StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note + // that no effort is made to actually stop or kill the task. StalledThreshold time.Duration - // Hooks allow you to register monitoring functions that are called when something happens with the goroutines that - // you schedule. These are useful for logging, monitoring, etc. - Hooks Hooks - // DefaultRetry defines the default retry strategies that will be used for all tasks unless overridden by the task. - // Several strategies are provided by github.com/kamilsk/retry/v5/strategy package. - DefaultRetry Retry -} - -// Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of -// them are optional; implement only those you need. -type Hooks struct { - // OnTaskAdded is called immediately after calling Run(). - OnTaskAdded func(ctx context.Context, meta Metadata) - // OnTaskSucceeded is called immediately after Task returns with no error. - OnTaskSucceeded func(ctx context.Context, meta Metadata) - // OnTaskFailed is called immediately after Task returns with an error. - OnTaskFailed func(ctx context.Context, meta Metadata, err error) - // OnGoroutineStalled is called when the goroutine does not return within the StalledThreshold. You can use this to - // make sure your goroutines do not take excessive amounts of time to run to completion. - OnGoroutineStalled func(ctx context.Context, meta Metadata) + // Observer allow you to register monitoring functions that are called when something happens with the tasks that you + // schedule. These are useful for logging, monitoring, etc. + Observer Observer + // Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several + // strategies are provided by github.com/kamilsk/retry/v5/strategy package. + Retry Retry } // Task describes how a unit of work (a function) should be executed. type Task struct { // Fn is the function to be executed in a goroutine. Fn Fn - // Meta is whatever custom information you wish to associate with the task. + // Meta is whatever custom information you wish to associate with the task. This will be passed to the observer's + // functions. Meta Metadata // Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry // strategies you might have configured in the Manager. Several strategies are provided by @@ -69,44 +56,44 @@ type Task struct { // github.com/kamilsk/retry/v5/strategy package. type Retry []strategy.Strategy -// Fn is the function to be executed in a goroutine +// Fn is the function to be executed in a goroutine. type Fn func(ctx context.Context) error // Metadata is whatever custom information you wish to associate with a task. This information will be available in your // lifecycle hooks to help you identify which task is being processed. type Metadata map[string]string -// NewManager creates a new instance of Manager with default options and no hooks. +// NewManager creates a new instance of Manager with default options and no observer. func NewManager() *Manager { return &Manager{} } -// NewManagerWithOptions creates a new instance of Manager with the provided options and hooks. +// NewManagerWithOptions creates a new instance of Manager with the provided options and observer. func NewManagerWithOptions(options Options) *Manager { return &Manager{ stalledThreshold: options.StalledThreshold, - hooks: options.Hooks, - strategies: options.DefaultRetry, + observer: options.Observer, + retry: options.Retry, } } -// Run schedules the provided task to be executed in a goroutine. -func (m *Manager) Run(ctx context.Context, task Fn) { - definition := Task{Fn: task} - m.RunTaskDefinition(ctx, definition) +// Run schedules the provided function to be executed in a goroutine. +func (m *Manager) Run(ctx context.Context, fn Fn) { + task := Task{Fn: fn} + m.RunTask(ctx, task) } -// RunTaskDefinition schedules the provided task definition to be executed in a goroutine. -func (m *Manager) RunTaskDefinition(ctx context.Context, definition Task) { - m.callOnTaskAdded(ctx, definition.Meta) +// RunTask schedules the provided task to be executed in a goroutine. +func (m *Manager) RunTask(ctx context.Context, task Task) { + m.observer.callOnTaskAdded(ctx, task) m.wg.Add(1) m.len++ ctx = context.WithoutCancel(ctx) done := make(chan bool, 1) - go m.run(ctx, definition, done) - go m.ticktock(ctx, definition.Meta, done) + go m.ticktock(ctx, task, done) + go m.run(ctx, task, done) } // Wait blocks until all scheduled tasks have finished. @@ -119,80 +106,27 @@ func (m *Manager) Len() int { return m.len } -func (m *Manager) run(ctx context.Context, definition Task, done chan<- bool) { - strategies := mkstrategies(m.strategies, definition.Retry) - err := retry.Do(ctx, definition.Fn, strategies...) +func (m *Manager) run(ctx context.Context, task Task, done chan<- bool) { + strategies := mkstrategies(m.retry, task.Retry) + err := retry.Do(ctx, task.Fn, strategies...) done <- true m.wg.Done() m.len-- if err != nil { - m.callOnTaskFailed(ctx, definition.Meta, err) + m.observer.callOnTaskFailed(ctx, task, err) } else { - m.callOnTaskSucceeded(ctx, definition.Meta) + m.observer.callOnTaskSucceeded(ctx, task) } } -func (m *Manager) ticktock(ctx context.Context, meta Metadata, done <-chan bool) { +func (m *Manager) ticktock(ctx context.Context, task Task, done <-chan bool) { timeout := mktimeout(m.stalledThreshold) select { case <-done: return case <-timeout: - m.callOnGoroutineStalled(ctx, meta) + m.observer.callOnTaskStalled(ctx, task) return } } - -func (m *Manager) callOnTaskFailed(ctx context.Context, meta Metadata, err error) { - if m.hooks.OnTaskFailed != nil { - m.hooks.OnTaskFailed(ctx, meta, err) - } -} - -func (m *Manager) callOnTaskSucceeded(ctx context.Context, meta Metadata) { - if m.hooks.OnTaskSucceeded != nil { - m.hooks.OnTaskSucceeded(ctx, meta) - } -} - -func (m *Manager) callOnTaskAdded(ctx context.Context, meta Metadata) { - if m.hooks.OnTaskAdded != nil { - m.hooks.OnTaskAdded(ctx, meta) - } -} - -func (m *Manager) callOnGoroutineStalled(ctx context.Context, meta Metadata) { - if m.hooks.OnGoroutineStalled != nil { - m.hooks.OnGoroutineStalled(ctx, meta) - } -} - -// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0, -// the channel will never receive any message. -func mktimeout(duration time.Duration) <-chan time.Time { - if duration == 0 { - return make(<-chan time.Time) - } - return time.After(duration) -} - -// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a -// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on -// failure if no strategy is provided. -func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy { - result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides))) - - if len(overrides) > 0 { - result = append(result, overrides...) - } else { - result = append(result, defaults...) - } - - // If no retry strategies are provided we default to a single execution attempt - if len(result) == 0 { - result = append(result, strategy.Limit(1)) - } - - return result -} diff --git a/background_test.go b/background_test.go index 22e1e87..249b452 100644 --- a/background_test.go +++ b/background_test.go @@ -12,7 +12,7 @@ import ( "go.strv.io/background" ) -func Test_New(t *testing.T) { +func Test_NewManager(t *testing.T) { m := background.NewManager() assert.NotNil(t, m) assert.IsType(t, &background.Manager{}, m) @@ -81,8 +81,8 @@ func Test_Len(t *testing.T) { proceed := make(chan bool, 1) remaining := 10 m := background.NewManagerWithOptions(background.Options{ - Hooks: background.Hooks{ - OnTaskSucceeded: func(ctx context.Context, meta background.Metadata) { + Observer: background.Observer{ + OnTaskSucceeded: func(ctx context.Context, task background.Task) { remaining-- proceed <- true }, @@ -106,9 +106,9 @@ func Test_OnTaskAdded(t *testing.T) { executed := false var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ - Hooks: background.Hooks{ - OnTaskAdded: func(ctx context.Context, meta background.Metadata) { - assert.Equal(t, metadata, meta) + Observer: background.Observer{ + OnTaskAdded: func(ctx context.Context, task background.Task) { + assert.Equal(t, metadata, task.Meta) executed = true wg.Done() }, @@ -122,7 +122,7 @@ func Test_OnTaskAdded(t *testing.T) { }, Meta: metadata, } - m.RunTaskDefinition(context.Background(), def) + m.RunTask(context.Background(), def) wg.Wait() assert.True(t, executed) @@ -133,9 +133,9 @@ func Test_OnTaskSucceeded(t *testing.T) { executed := false var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ - Hooks: background.Hooks{ - OnTaskSucceeded: func(ctx context.Context, meta background.Metadata) { - assert.Equal(t, metadata, meta) + Observer: background.Observer{ + OnTaskSucceeded: func(ctx context.Context, task background.Task) { + assert.Equal(t, metadata, task.Meta) executed = true wg.Done() }, @@ -149,7 +149,7 @@ func Test_OnTaskSucceeded(t *testing.T) { }, Meta: metadata, } - m.RunTaskDefinition(context.Background(), def) + m.RunTask(context.Background(), def) wg.Wait() assert.True(t, executed) @@ -160,9 +160,9 @@ func Test_OnTaskFailed(t *testing.T) { executed := false var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ - Hooks: background.Hooks{ - OnTaskFailed: func(ctx context.Context, meta background.Metadata, err error) { - assert.Equal(t, metadata, meta) + Observer: background.Observer{ + OnTaskFailed: func(ctx context.Context, task background.Task, err error) { + assert.Equal(t, metadata, task.Meta) assert.Error(t, err) executed = true wg.Done() @@ -177,7 +177,7 @@ func Test_OnTaskFailed(t *testing.T) { }, Meta: metadata, } - m.RunTaskDefinition(context.Background(), def) + m.RunTask(context.Background(), def) wg.Wait() assert.True(t, executed) @@ -205,9 +205,9 @@ func Test_OnGoroutineStalled(t *testing.T) { m := background.NewManagerWithOptions(background.Options{ StalledThreshold: time.Millisecond * 5, - Hooks: background.Hooks{ - OnGoroutineStalled: func(ctx context.Context, meta background.Metadata) { - assert.Equal(t, metadata, meta) + Observer: background.Observer{ + OnTaskStalled: func(ctx context.Context, task background.Task) { + assert.Equal(t, metadata, task.Meta) executed = true wg.Done() }, @@ -221,7 +221,7 @@ func Test_OnGoroutineStalled(t *testing.T) { }, Meta: metadata, } - m.RunTaskDefinition(context.Background(), def) + m.RunTask(context.Background(), def) m.Run(context.Background(), func(ctx context.Context) error { return nil }) @@ -237,8 +237,8 @@ func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) { var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ StalledThreshold: time.Millisecond, - Hooks: background.Hooks{ - OnTaskSucceeded: func(ctx context.Context, meta background.Metadata) { + Observer: background.Observer{ + OnTaskSucceeded: func(ctx context.Context, task background.Task) { executed = true wg.Done() }, @@ -269,7 +269,7 @@ func Test_TaskDefinitionRetryStrategies(t *testing.T) { }, } - m.RunTaskDefinition(context.Background(), def) + m.RunTask(context.Background(), def) m.Wait() assert.Equal(t, limit, count) @@ -279,7 +279,7 @@ func Test_ManagerDefaultRetryStrategies(t *testing.T) { var limit uint = 5 var count uint = 0 m := background.NewManagerWithOptions(background.Options{ - DefaultRetry: background.Retry{ + Retry: background.Retry{ strategy.Limit(limit), }, }) diff --git a/internal.go b/internal.go new file mode 100644 index 0000000..0149162 --- /dev/null +++ b/internal.go @@ -0,0 +1,36 @@ +package background + +import ( + "time" + + "github.com/kamilsk/retry/v5/strategy" +) + +// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0, +// the channel will never receive any message. +func mktimeout(duration time.Duration) <-chan time.Time { + if duration == 0 { + return make(<-chan time.Time) + } + return time.After(duration) +} + +// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a +// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on +// failure if no strategy is provided. +func mkstrategies(defaults Retry, overrides Retry) Retry { + result := make(Retry, 0, max(len(defaults), len(overrides), 1)) + + if len(overrides) > 0 { + result = append(result, overrides...) + } else { + result = append(result, defaults...) + } + + // If no retry strategies are provided we default to a single execution attempt + if len(result) == 0 { + result = append(result, strategy.Limit(1)) + } + + return result +} diff --git a/observer.go b/observer.go new file mode 100644 index 0000000..a92bdd4 --- /dev/null +++ b/observer.go @@ -0,0 +1,41 @@ +package background + +import "context" + +// Observer includes a set of functions that are called when certain events happen with the tasks that you schedule. All +// of them are optional; implement only those you need. +type Observer struct { + // OnTaskAdded is called immediately after adding the Task to the background manager. + OnTaskAdded func(ctx context.Context, task Task) + // OnTaskSucceeded is called immediately after Task returns with no error. + OnTaskSucceeded func(ctx context.Context, task Task) + // OnTaskFailed is called immediately after Task returns with an error. + OnTaskFailed func(ctx context.Context, task Task, err error) + // OnTaskStalled is called when the task does not return within the StalledThreshold. You can use this to make sure + // your tasks do not take excessive amounts of time to run to completion. + OnTaskStalled func(ctx context.Context, task Task) +} + +func (h Observer) callOnTaskFailed(ctx context.Context, task Task, err error) { + if h.OnTaskFailed != nil { + h.OnTaskFailed(ctx, task, err) + } +} + +func (h Observer) callOnTaskSucceeded(ctx context.Context, task Task) { + if h.OnTaskSucceeded != nil { + h.OnTaskSucceeded(ctx, task) + } +} + +func (h Observer) callOnTaskAdded(ctx context.Context, task Task) { + if h.OnTaskAdded != nil { + h.OnTaskAdded(ctx, task) + } +} + +func (h Observer) callOnTaskStalled(ctx context.Context, task Task) { + if h.OnTaskStalled != nil { + h.OnTaskStalled(ctx, task) + } +} From 5030f3cb6ed3aa07d6bc3667713d2a1809a8e0c6 Mon Sep 17 00:00:00 2001 From: Robert Rossmann Date: Wed, 28 Feb 2024 23:52:38 +0100 Subject: [PATCH 3/3] chore: refactor task lifecycle, use sync/atomic for counting --- background.go | 50 ++++++++++++++++++++++++---------------------- background_test.go | 4 ++-- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/background.go b/background.go index 1664617..1af3617 100644 --- a/background.go +++ b/background.go @@ -3,6 +3,7 @@ package background import ( "context" "sync" + "sync/atomic" "time" "github.com/kamilsk/retry/v5" @@ -20,7 +21,7 @@ import ( // to wait for all those goroutines to finish before allowing the process to exit. type Manager struct { wg sync.WaitGroup - len int + len atomic.Int32 stalledThreshold time.Duration observer Observer retry Retry @@ -87,12 +88,12 @@ func (m *Manager) Run(ctx context.Context, fn Fn) { func (m *Manager) RunTask(ctx context.Context, task Task) { m.observer.callOnTaskAdded(ctx, task) m.wg.Add(1) - m.len++ + m.len.Add(1) ctx = context.WithoutCancel(ctx) - done := make(chan bool, 1) + done := make(chan error, 1) - go m.ticktock(ctx, task, done) + go m.monitor(ctx, task, done) go m.run(ctx, task, done) } @@ -102,31 +103,32 @@ func (m *Manager) Wait() { } // Len returns the number of currently running tasks. -func (m *Manager) Len() int { - return m.len +func (m *Manager) Len() int32 { + return m.len.Load() } -func (m *Manager) run(ctx context.Context, task Task, done chan<- bool) { +func (m *Manager) run(ctx context.Context, task Task, done chan<- error) { strategies := mkstrategies(m.retry, task.Retry) - err := retry.Do(ctx, task.Fn, strategies...) - done <- true - m.wg.Done() - m.len-- - - if err != nil { - m.observer.callOnTaskFailed(ctx, task, err) - } else { - m.observer.callOnTaskSucceeded(ctx, task) - } + done <- retry.Do(ctx, task.Fn, strategies...) } -func (m *Manager) ticktock(ctx context.Context, task Task, done <-chan bool) { +func (m *Manager) monitor(ctx context.Context, task Task, done <-chan error) { timeout := mktimeout(m.stalledThreshold) - select { - case <-done: - return - case <-timeout: - m.observer.callOnTaskStalled(ctx, task) - return + + for { + select { + case <-timeout: + m.observer.callOnTaskStalled(ctx, task) + case err := <-done: + if err != nil { + m.observer.callOnTaskFailed(ctx, task, err) + } else { + m.observer.callOnTaskSucceeded(ctx, task) + } + + m.wg.Done() + m.len.Add(-1) + return + } } } diff --git a/background_test.go b/background_test.go index 249b452..236e9ff 100644 --- a/background_test.go +++ b/background_test.go @@ -16,7 +16,7 @@ func Test_NewManager(t *testing.T) { m := background.NewManager() assert.NotNil(t, m) assert.IsType(t, &background.Manager{}, m) - assert.Equal(t, 0, m.Len()) + assert.EqualValues(t, 0, m.Len()) } func Test_RunExecutesInGoroutine(t *testing.T) { @@ -98,7 +98,7 @@ func Test_Len(t *testing.T) { proceed <- true m.Wait() - assert.Equal(t, 0, m.Len()) + assert.EqualValues(t, 0, m.Len()) } func Test_OnTaskAdded(t *testing.T) {