From 8e8a051dcf5a3058f98e24ff2daf1a32043f13f5 Mon Sep 17 00:00:00 2001 From: Robert Rossmann Date: Mon, 26 Feb 2024 13:11:10 +0100 Subject: [PATCH] feat: overhaul construction patterns (`.NewManagerWithOptions()`) --- background.go | 56 ++++++++++++----- background_test.go | 149 ++++++++++++++++++++++++++------------------- 2 files changed, 127 insertions(+), 78 deletions(-) diff --git a/background.go b/background.go index b1d00ea..d831bda 100644 --- a/background.go +++ b/background.go @@ -16,15 +16,28 @@ import ( // package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able // to wait for all those goroutines to finish before allowing the process to exit. type Manager[Meta any] struct { - wg sync.WaitGroup - len int + wg sync.WaitGroup + len int + stalledThreshold time.Duration + hooks Hooks[Meta] +} +// Options provides a means for configuring the background manager and attaching hooks to it. +type Options[Meta any] struct { // StalledThreshold is the amount of time within which the goroutine should return before it is considered stalled. + // Note that no effort is made to actually kill the goroutine. StalledThreshold time.Duration + // Hooks allow you to register monitoring functions that are called when something happens with the goroutines that + // you schedule. These are useful for logging, monitoring, etc. + Hooks Hooks[Meta] +} +// Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of +// them are optional; implement only those you need. +type Hooks[Meta any] struct { // OnTaskAdded is called immediately after calling Run(). OnTaskAdded func(ctx context.Context, meta Meta) - // OnTaskSucceeded is called immediately after Task returns. + // OnTaskSucceeded is called immediately after Task returns with no error. OnTaskSucceeded func(ctx context.Context, meta Meta) // OnTaskFailed is called immediately after Task returns with an error. OnTaskFailed func(ctx context.Context, meta Meta, err error) @@ -36,9 +49,20 @@ type Manager[Meta any] struct { // Task is the function to be executed in a goroutine type Task func(ctx context.Context) error -// NewManager creates a new instance of Manager with the provided generic type for the metadata argument. -func NewManager[Meta any]() *Manager[Meta] { - return &Manager[Meta]{} +// NilMeta is a type that can be used as Meta generic type when you do not need to associate any metadata with the task. +type NilMeta *struct{} + +// NewManager creates a new instance of Manager with default options and no hooks. +func NewManager() *Manager[NilMeta] { + return &Manager[NilMeta]{} +} + +// NewManagerWithOptions creates a new instance of Manager with the provided options and hooks. +func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] { + return &Manager[Meta]{ + stalledThreshold: options.StalledThreshold, + hooks: options.Hooks, + } } // Run schedules the provided task to be executed in a goroutine. `Meta` is whatever you wish to associate with the @@ -60,7 +84,7 @@ func (m *Manager[Meta]) Wait() { m.wg.Wait() } -// Len returns the number of currently running tasks +// Len returns the number of currently running tasks. func (m *Manager[Meta]) Len() int { return m.len } @@ -79,7 +103,7 @@ func (m *Manager[Meta]) run(ctx context.Context, meta Meta, task Task, done chan } func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan bool) { - timeout := mktimeout(m.StalledThreshold) + timeout := mktimeout(m.stalledThreshold) select { case <-done: return @@ -90,26 +114,26 @@ func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan boo } func (m *Manager[Meta]) callOnTaskFailed(ctx context.Context, meta Meta, err error) { - if m.OnTaskFailed != nil { - m.OnTaskFailed(ctx, meta, err) + if m.hooks.OnTaskFailed != nil { + m.hooks.OnTaskFailed(ctx, meta, err) } } func (m *Manager[Meta]) callOnTaskSucceeded(ctx context.Context, meta Meta) { - if m.OnTaskSucceeded != nil { - m.OnTaskSucceeded(ctx, meta) + if m.hooks.OnTaskSucceeded != nil { + m.hooks.OnTaskSucceeded(ctx, meta) } } func (m *Manager[Meta]) callOnTaskAdded(ctx context.Context, meta Meta) { - if m.OnTaskAdded != nil { - m.OnTaskAdded(ctx, meta) + if m.hooks.OnTaskAdded != nil { + m.hooks.OnTaskAdded(ctx, meta) } } func (m *Manager[Meta]) callOnGoroutineStalled(ctx context.Context, meta Meta) { - if m.OnGoroutineStalled != nil { - m.OnGoroutineStalled(ctx, meta) + if m.hooks.OnGoroutineStalled != nil { + m.hooks.OnGoroutineStalled(ctx, meta) } } diff --git a/background_test.go b/background_test.go index fd464df..914dcc5 100644 --- a/background_test.go +++ b/background_test.go @@ -11,22 +11,20 @@ import ( "go.strv.io/background" ) +type testmeta bool + func Test_New(t *testing.T) { - m := background.NewManager[bool]() + m := background.NewManager() assert.NotNil(t, m) - assert.IsType(t, &background.Manager[bool]{}, m) - assert.Equal(t, m.StalledThreshold, time.Duration(0)) - assert.Nil(t, m.OnTaskAdded) - assert.Nil(t, m.OnTaskSucceeded) - assert.Nil(t, m.OnTaskFailed) - assert.Nil(t, m.OnGoroutineStalled) + assert.IsType(t, &background.Manager[background.NilMeta]{}, m) + assert.Equal(t, 0, m.Len()) } func Test_RunExecutesInGoroutine(t *testing.T) { - m := background.NewManager[bool]() + m := background.NewManager() proceed := make(chan bool, 1) - m.Run(context.Background(), true, func(ctx context.Context) error { + m.Run(context.Background(), nil, func(ctx context.Context) error { // Let the main thread advance a bit <-proceed proceed <- true @@ -41,12 +39,12 @@ func Test_RunExecutesInGoroutine(t *testing.T) { } func Test_WaitWaitsForPendingTasks(t *testing.T) { - m := background.NewManager[bool]() + m := background.NewManager() proceed := make(chan bool, 1) done := make(chan bool, 1) var waited bool - m.Run(context.Background(), true, func(ctx context.Context) error { + m.Run(context.Background(), nil, func(ctx context.Context) error { // Let the main thread advance a bit <-proceed return nil @@ -65,11 +63,11 @@ func Test_WaitWaitsForPendingTasks(t *testing.T) { } func Test_CancelledParentContext(t *testing.T) { - m := background.NewManager[bool]() + m := background.NewManager() ctx, cancel := context.WithCancel(context.Background()) proceed := make(chan bool, 1) - m.Run(ctx, true, func(ctx context.Context) error { + m.Run(ctx, nil, func(ctx context.Context) error { <-proceed assert.Nil(t, ctx.Err()) return nil @@ -81,15 +79,16 @@ func Test_CancelledParentContext(t *testing.T) { } func Test_Len(t *testing.T) { - m := background.NewManager[bool]() proceed := make(chan bool, 1) remaining := 10 - - m.OnTaskSucceeded = func(ctx context.Context, meta bool) { - remaining-- - assert.Equal(t, remaining, m.Len()) - proceed <- true - } + m := background.NewManagerWithOptions(background.Options[testmeta]{ + Hooks: background.Hooks[testmeta]{ + OnTaskSucceeded: func(ctx context.Context, meta testmeta) { + remaining-- + proceed <- true + }, + }, + }) for range 10 { m.Run(context.Background(), true, func(ctx context.Context) error { @@ -104,54 +103,72 @@ func Test_Len(t *testing.T) { } func Test_OnTaskAdded(t *testing.T) { - m := background.NewManager[bool]() - metaval := true + var metaval testmeta = true executed := false + var wg sync.WaitGroup + m := background.NewManagerWithOptions(background.Options[testmeta]{ + Hooks: background.Hooks[testmeta]{ + OnTaskAdded: func(ctx context.Context, meta testmeta) { + assert.Equal(t, metaval, meta) + executed = true + wg.Done() + }, + }, + }) - m.OnTaskAdded = func(ctx context.Context, meta bool) { - assert.Equal(t, metaval, meta) - executed = true - } - + wg.Add(1) m.Run(context.Background(), metaval, func(ctx context.Context) error { return nil }) - m.Wait() + + wg.Wait() assert.True(t, executed) } func Test_OnTaskSucceeded(t *testing.T) { - m := background.NewManager[bool]() - metaval := true + var metaval testmeta = true executed := false + var wg sync.WaitGroup + m := background.NewManagerWithOptions(background.Options[testmeta]{ + Hooks: background.Hooks[testmeta]{ + OnTaskSucceeded: func(ctx context.Context, meta testmeta) { + assert.Equal(t, metaval, meta) + executed = true + wg.Done() + }, + }, + }) - m.OnTaskSucceeded = func(ctx context.Context, meta bool) { - assert.Equal(t, metaval, meta) - executed = true - } - + wg.Add(1) m.Run(context.Background(), metaval, func(ctx context.Context) error { return nil }) - m.Wait() + + wg.Wait() assert.True(t, executed) } func Test_OnTaskFailed(t *testing.T) { - m := background.NewManager[bool]() - metaval := true + var metaval testmeta = true executed := false + var wg sync.WaitGroup + m := background.NewManagerWithOptions(background.Options[testmeta]{ + Hooks: background.Hooks[testmeta]{ + OnTaskFailed: func(ctx context.Context, meta testmeta, err error) { + assert.Equal(t, metaval, meta) + assert.Error(t, err) + executed = true + wg.Done() + }, + }, + }) - m.OnTaskFailed = func(ctx context.Context, meta bool, err error) { - assert.Equal(t, metaval, meta) - assert.Error(t, err) - executed = true - } - + wg.Add(1) m.Run(context.Background(), metaval, func(ctx context.Context) error { return assert.AnError }) - m.Wait() + + wg.Wait() assert.True(t, executed) } @@ -167,47 +184,55 @@ func Test_OnGoroutineStalled(t *testing.T) { } for _, test := range tests { - m := background.NewManager[bool]() - m.StalledThreshold = time.Millisecond * 5 - t.Run(fmt.Sprintf("duration of %s)", test.duration.String()), func(t *testing.T) { - var wg sync.WaitGroup + var metaval testmeta = true executed := false + var wg sync.WaitGroup if test.shouldExecute == true { wg.Add(1) } - m.OnGoroutineStalled = func(ctx context.Context, meta bool) { - executed = true - wg.Done() - } + m := background.NewManagerWithOptions(background.Options[testmeta]{ + StalledThreshold: time.Millisecond * 5, + Hooks: background.Hooks[testmeta]{ + OnGoroutineStalled: func(ctx context.Context, meta testmeta) { + assert.Equal(t, metaval, meta) + executed = true + wg.Done() + }, + }, + }) - m.Run(context.Background(), true, func(ctx context.Context) error { + m.Run(context.Background(), metaval, func(ctx context.Context) error { <-time.After(test.duration) return nil }) wg.Wait() - m.Wait() assert.Equal(t, test.shouldExecute, executed) }) } } func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) { - m := background.NewManager[bool]() - m.StalledThreshold = time.Millisecond executed := false + var wg sync.WaitGroup + m := background.NewManagerWithOptions(background.Options[testmeta]{ + StalledThreshold: time.Millisecond, + Hooks: background.Hooks[testmeta]{ + OnTaskSucceeded: func(ctx context.Context, meta testmeta) { + executed = true + wg.Done() + }, + }, + }) - m.OnTaskSucceeded = func(ctx context.Context, meta bool) { - executed = true - } - + wg.Add(1) m.Run(context.Background(), true, func(ctx context.Context) error { <-time.After(time.Millisecond * 3) return nil }) - m.Wait() + wg.Wait() assert.True(t, executed) }