From 55731031d470faa7a717497ff6ad5f0aef574bc1 Mon Sep 17 00:00:00 2001 From: Marek Cermak Date: Mon, 26 Feb 2024 15:53:12 +0100 Subject: [PATCH 1/5] chore: move background package from root to manager package Signed-off-by: Marek Cermak --- background.go => manager/background.go | 4 ++-- .../background_test.go | 21 +++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) rename background.go => manager/background.go (97%) rename background_test.go => manager/background_test.go (87%) diff --git a/background.go b/manager/background.go similarity index 97% rename from background.go rename to manager/background.go index dcc1fde..7239c4d 100644 --- a/background.go +++ b/manager/background.go @@ -1,4 +1,4 @@ -package background +package manager import ( "context" @@ -31,7 +31,7 @@ type Manager[Meta any] struct { type Task func(ctx context.Context) error // NewManager creates a new instance of Manager with the provided generic type for the metadata argument. -func NewManager[Meta any]() *Manager[Meta] { +func New[Meta any]() *Manager[Meta] { return &Manager[Meta]{} } diff --git a/background_test.go b/manager/background_test.go similarity index 87% rename from background_test.go rename to manager/background_test.go index ded37e4..addacf5 100644 --- a/background_test.go +++ b/manager/background_test.go @@ -1,24 +1,23 @@ -package background_test +package manager import ( "context" "testing" "github.com/stretchr/testify/assert" - "go.strv.io/background" ) func Test_New(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() assert.NotNil(t, m) - assert.IsType(t, &background.Manager[bool]{}, m) + assert.IsType(t, &Manager[bool]{}, m) assert.Nil(t, m.OnTaskAdded) assert.Nil(t, m.OnTaskSucceeded) assert.Nil(t, m.OnTaskFailed) } func Test_RunExecutesInGoroutine(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() proceed := make(chan bool, 1) m.Run(context.Background(), true, func(ctx context.Context) error { @@ -36,7 +35,7 @@ func Test_RunExecutesInGoroutine(t *testing.T) { } func Test_WaitWaitsForPendingTasks(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() proceed := make(chan bool, 1) done := make(chan bool, 1) var waited bool @@ -60,7 +59,7 @@ func Test_WaitWaitsForPendingTasks(t *testing.T) { } func Test_CancelledParentContext(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() ctx, cancel := context.WithCancel(context.Background()) proceed := make(chan bool, 1) @@ -76,7 +75,7 @@ func Test_CancelledParentContext(t *testing.T) { } func Test_Len(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() proceed := make(chan bool, 1) remaining := 10 @@ -99,7 +98,7 @@ func Test_Len(t *testing.T) { } func Test_OnTaskAdded(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() metaval := true executed := false @@ -116,7 +115,7 @@ func Test_OnTaskAdded(t *testing.T) { } func Test_OnTaskSucceeded(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() metaval := true executed := false @@ -133,7 +132,7 @@ func Test_OnTaskSucceeded(t *testing.T) { } func Test_OnTaskFailed(t *testing.T) { - m := background.NewManager[bool]() + m := New[bool]() metaval := true executed := false From 29b18cd8a902dbb576b9b48300cfb589d879ef27 Mon Sep 17 00:00:00 2001 From: Marek Cermak Date: Mon, 26 Feb 2024 15:55:08 +0100 Subject: [PATCH 2/5] chore: update .gitignore Signed-off-by: Marek Cermak --- .gitignore | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/.gitignore b/.gitignore index 437aa12..aec4c6c 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,23 @@ coverage.txt **/*.bkp **/*.log **/*.tmp + +# Environment +.env +.env.test +.secret + +# Go modules +go.work +go.work.sum + +# IDE +.idea/ +.vscode/ + +# Debug +__debug* +*.log + +# Playground +./playground.go From 73af88cc1474751d0c15058fbe198baac392c848 Mon Sep 17 00:00:00 2001 From: Marek Cermak Date: Mon, 26 Feb 2024 18:35:51 +0100 Subject: [PATCH 3/5] feat: implement background pooler with retry This commit shifts the initial implementation towards a backend pooler with retry functionality. The assumptions and implementation decisions have been made when implementing the pooler: - the background manager's primary purpose is to run (ideally) permananent background goroutines, as such, there is no need for lifecycle hooks such as OnTaskSucceeded or onTaskFailed, those were removed in favor of the retrier. - the manager is context-based and all go routines spawned by the manager share a single cancellable context (cancellable independantly of their parent context) - when stopped or cancelled, the conc pooler gracefully shuts down dangling goroutines - by default, the pooler returns joined errors, this behaviour can be modifier with WithFirstError Option. Signed-off-by: Marek Cermak --- .gitignore | 2 +- go.mod | 2 + go.sum | 5 + makefile | 2 +- manager/background.go | 130 ++++++++-------- manager/background_test.go | 303 ++++++++++++++++++++++--------------- manager/option.go | 79 ++++++++++ 7 files changed, 335 insertions(+), 188 deletions(-) create mode 100644 manager/option.go diff --git a/.gitignore b/.gitignore index aec4c6c..86d2dcd 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,4 @@ __debug* *.log # Playground -./playground.go +playground.go diff --git a/go.mod b/go.mod index 40f09ff..df86aea 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require github.com/stretchr/testify v1.8.4 require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.6.0 github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fa4b6e6..bbdb192 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,15 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= +github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/makefile b/makefile index 38b0c3a..f151ba9 100644 --- a/makefile +++ b/makefile @@ -4,7 +4,7 @@ lint: force ./tools/golangci-lint run -v test: force - go test -v -timeout 5s -covermode=atomic -coverprofile=coverage.txt ./... + go test -v -timeout 30s -covermode=atomic -coverprofile=coverage.txt ./... clean: rm -rf .cache diff --git a/manager/background.go b/manager/background.go index 7239c4d..1efd0b5 100644 --- a/manager/background.go +++ b/manager/background.go @@ -2,87 +2,95 @@ package manager import ( "context" - "sync" + + "github.com/eapache/go-resiliency/retrier" + "github.com/sourcegraph/conc/pool" ) -// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish. `Meta` is whatever -// you wish to associate with this task, usually something that will help you keep track of the tasks. -// -// This is useful in context of HTTP servers, where a customer request may result in some kind of background processing -// activity that should not block the response and you schedule a goroutine to handle it. However, if your server -// receives a termination signal and you do not wait for these goroutines to finish, the goroutines will be killed -// before they can run to completion. This package is not a replacement for a proper task queue system but it is a great -// package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able -// to wait for all those goroutines to finish before allowing the process to exit. -type Manager[Meta any] struct { - wg sync.WaitGroup - len int - - // OnTaskAdded is called immediately after calling Run(). You can use this for logging, metrics or other purposes. - OnTaskAdded func(ctx context.Context, meta Meta) - // OnTaskSucceeded is called immediately after Task returns. You can use this for logging, metrics or other purposes. - OnTaskSucceeded func(ctx context.Context, meta Meta) - // OnTaskFailed is called immediately after Task returns with an error. You can use this for logging, metrics or other - // purposes. - OnTaskFailed func(ctx context.Context, meta Meta, err error) -} +// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish. +type Manager struct { + pool *pool.ContextPool + stat ManagerStats -// Task is the function to be executed in a goroutine -type Task func(ctx context.Context) error + // retrier is a retrier instance that will be used to run tasks if retry is enabled. + retrier *retrier.Retrier -// NewManager creates a new instance of Manager with the provided generic type for the metadata argument. -func New[Meta any]() *Manager[Meta] { - return &Manager[Meta]{} + // cancelFunc is the cancel function of the manager's context. + // It is called when the manager's Stop method is called. + cancelFunc context.CancelFunc } -// Run schedules the provided task to be executed in a goroutine. `Meta` is whatever you wish to associate with the -// task. -func (m *Manager[Meta]) Run(ctx context.Context, meta Meta, task Task) { - m.callOnTaskAdded(ctx, meta) - m.wg.Add(1) - m.len++ - go m.run(context.WithoutCancel(ctx), meta, task) +// ManagerStats contains statistics about the tasks operated by the manager. +type ManagerStats struct { + RunningTasks int `json:"running_tasks"` } -// Wait blocks until all scheduled tasks have finished. -func (m *Manager[Meta]) Wait() { - m.wg.Wait() +// New creates a new instance of Manager with the provided generic type for the metadata argument. +func New(ctx context.Context, opts ...Option) *Manager { + ctx, cancelFunc := context.WithCancel(ctx) + + p := pool.New().WithContext(ctx) + m := &Manager{ + pool: p, + stat: ManagerStats{}, + cancelFunc: cancelFunc, + } + for _, opt := range opts { + opt.Apply(m) + } + return m } -// Len returns the number of currently running tasks -func (m *Manager[Meta]) Len() int { - return m.len +// TaskFunc is the function to be executed in a goroutine. +type TaskFunc func(ctx context.Context) error + +// Run submits a task to the pool. +// If all workers are busy, Run will block until a worker is available. +func (m *Manager) Run(task TaskFunc) { + taskFunc := m.withStats(m.withRetry(task, m.retrier)) + m.pool.Go(taskFunc) } -func (m *Manager[Meta]) run(ctx context.Context, meta Meta, task Task) { - defer func() { - m.wg.Done() - m.len-- - }() +// RunWithRetry runs a task with a dedicated retrier. +// See [Run] for more details. +func (m *Manager) RunWithRetry(task TaskFunc, retry *retrier.Retrier) { + taskFunc := m.withStats(m.withRetry(task, retry)) + m.pool.Go(taskFunc) +} - err := task(ctx) +// Stop cancels the context of the manager and all its tasks. +func (m *Manager) Stop() error { + m.cancelFunc() + return m.pool.Wait() +} - if err != nil { - m.callOnTaskFailed(ctx, meta, err) - } else { - m.callOnTaskSucceeded(ctx, meta) - } +// Wait blocks until all scheduled tasks have finished and propagate any panics spawned by a child to the caller. +// Wait returns an error if any of the tasks failed. +func (m *Manager) Wait() error { + defer m.cancelFunc() + return m.pool.Wait() } -func (m *Manager[Meta]) callOnTaskFailed(ctx context.Context, meta Meta, err error) { - if m.OnTaskFailed != nil { - m.OnTaskFailed(ctx, meta, err) - } +// Stat returns manager statistics. +func (m *Manager) Stat() ManagerStats { + return m.stat } -func (m *Manager[Meta]) callOnTaskSucceeded(ctx context.Context, meta Meta) { - if m.OnTaskSucceeded != nil { - m.OnTaskSucceeded(ctx, meta) +func (m *Manager) withRetry(task TaskFunc, retry *retrier.Retrier) TaskFunc { + if retry == nil { + return task + } + return func(ctx context.Context) error { + return retry.RunCtx(ctx, task) } } -func (m *Manager[Meta]) callOnTaskAdded(ctx context.Context, meta Meta) { - if m.OnTaskAdded != nil { - m.OnTaskAdded(ctx, meta) +func (m *Manager) withStats(task TaskFunc) TaskFunc { + return func(ctx context.Context) error { + m.stat.RunningTasks++ + defer func() { + m.stat.RunningTasks-- + }() + return task(ctx) } } diff --git a/manager/background_test.go b/manager/background_test.go index addacf5..25aea66 100644 --- a/manager/background_test.go +++ b/manager/background_test.go @@ -2,149 +2,202 @@ package manager import ( "context" + "errors" "testing" + "time" "github.com/stretchr/testify/assert" ) -func Test_New(t *testing.T) { - m := New[bool]() - assert.NotNil(t, m) - assert.IsType(t, &Manager[bool]{}, m) - assert.Nil(t, m.OnTaskAdded) - assert.Nil(t, m.OnTaskSucceeded) - assert.Nil(t, m.OnTaskFailed) +func TestNew(t *testing.T) { + ctx := context.Background() + type args struct { + ctx context.Context + opts []Option + } + tests := []struct { + name string + args args + }{ + { + name: "pass", + args: args{ + ctx: ctx, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := New(tt.args.ctx, tt.args.opts...) + assert.NotNil(t, m) + assert.Equal(t, 0, m.stat.RunningTasks) + assert.Nil(t, m.retrier) + }) + } } -func Test_RunExecutesInGoroutine(t *testing.T) { - m := New[bool]() - proceed := make(chan bool, 1) - - m.Run(context.Background(), true, func(ctx context.Context) error { - // Let the main thread advance a bit - <-proceed - proceed <- true - return nil - }) - - // If the func is not executed in a goroutine the main thread will not be able to advance and the test will time out - assert.Empty(t, proceed) - proceed <- true - m.Wait() - assert.True(t, <-proceed) +func TestNewWithRetry(t *testing.T) { + ctx := context.Background() + type args struct { + ctx context.Context + opts []Option + } + tests := []struct { + name string + args args + }{ + { + name: "pass:with-constant-retry", + args: args{ + ctx: ctx, + opts: []Option{ + WithConstantRetry(5, 1*time.Second), + }, + }, + }, + { + name: "pass:with-expotential-retry", + args: args{ + ctx: ctx, + opts: []Option{ + WithExpotentialRetry(5, 250*time.Second, 1*time.Second), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := New(tt.args.ctx, tt.args.opts...) + assert.NotNil(t, m) + assert.NotNil(t, m.retrier) + }) + } } -func Test_WaitWaitsForPendingTasks(t *testing.T) { - m := New[bool]() - proceed := make(chan bool, 1) - done := make(chan bool, 1) - var waited bool - - m.Run(context.Background(), true, func(ctx context.Context) error { - // Let the main thread advance a bit - <-proceed - return nil - }) - - go func() { - m.Wait() - waited = true - done <- true - }() - - assert.False(t, waited) - proceed <- true - <-done - assert.True(t, waited) +func TestRunWithRetry(t *testing.T) { + expectedAttempts := 5 + ctx := context.Background() + type args struct { + ctx context.Context + opts []Option + } + tests := []struct { + name string + args args + }{ + { + name: "pass:with-constant-retry", + args: args{ + ctx: ctx, + opts: []Option{ + WithConstantRetry(5, 1*time.Second), + }, + }, + }, + { + name: "pass:with-expotential-retry", + args: args{ + ctx: ctx, + opts: []Option{ + WithExpotentialRetry(5, 250*time.Second, 1*time.Second), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := New(tt.args.ctx, tt.args.opts...) + assert.NotNil(t, m) + assert.NotNil(t, m.retrier) + nRuns := -1 + m.Run(func(ctx context.Context) error { + nRuns++ + <-time.After(1 * time.Second) + return errors.New("test") + }) + assert.Equal(t, 1, m.stat.RunningTasks) + err := m.Wait() + assert.Error(t, err) + assert.ErrorContains(t, err, "test") + assert.Equal(t, expectedAttempts, nRuns) + }) + } } -func Test_CancelledParentContext(t *testing.T) { - m := New[bool]() +func TestContextCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - proceed := make(chan bool, 1) - - m.Run(ctx, true, func(ctx context.Context) error { - <-proceed - assert.Nil(t, ctx.Err()) - return nil - }) - - cancel() - proceed <- true - m.Wait() -} - -func Test_Len(t *testing.T) { - m := New[bool]() - proceed := make(chan bool, 1) - remaining := 10 - - m.OnTaskSucceeded = func(ctx context.Context, meta bool) { - assert.Equal(t, remaining, m.Len()) - remaining-- - proceed <- true + defer cancel() + type args struct { + ctx context.Context + opts []Option } - - for range 10 { - m.Run(context.Background(), true, func(ctx context.Context) error { - <-proceed - return nil + tests := []struct { + name string + args args + }{ + { + name: "pass", + args: args{ + ctx: ctx, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := New(tt.args.ctx, tt.args.opts...) + assert.NotNil(t, m) + m.Run(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + return errors.New("timeout") + } + }) + assert.Equal(t, 1, m.stat.RunningTasks) + go time.AfterFunc(1*time.Second, func() { + cancel() + }) + err := m.Wait() + assert.NotNil(t, err) + assert.ErrorContains(t, err, "context canceled") + assert.Equal(t, 0, m.stat.RunningTasks) }) } - - proceed <- true - m.Wait() - assert.Equal(t, 0, m.Len()) } -func Test_OnTaskAdded(t *testing.T) { - m := New[bool]() - metaval := true - executed := false - - m.OnTaskAdded = func(ctx context.Context, meta bool) { - assert.Equal(t, metaval, meta) - executed = true +func TestStop(t *testing.T) { + type args struct { + ctx context.Context + opts []Option } - - m.Run(context.Background(), metaval, func(ctx context.Context) error { - return nil - }) - m.Wait() - assert.True(t, executed) -} - -func Test_OnTaskSucceeded(t *testing.T) { - m := New[bool]() - metaval := true - executed := false - - m.OnTaskSucceeded = func(ctx context.Context, meta bool) { - assert.Equal(t, metaval, meta) - executed = true + tests := []struct { + name string + args args + }{ + { + name: "pass", + args: args{ + ctx: context.Background(), + }, + }, } - - m.Run(context.Background(), metaval, func(ctx context.Context) error { - return nil - }) - m.Wait() - assert.True(t, executed) -} - -func Test_OnTaskFailed(t *testing.T) { - m := New[bool]() - metaval := true - executed := false - - m.OnTaskFailed = func(ctx context.Context, meta bool, err error) { - assert.Equal(t, metaval, meta) - assert.Error(t, err) - executed = true + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := New(tt.args.ctx, tt.args.opts...) + assert.NotNil(t, m) + m.Run(func(ctx context.Context) error { + select { + case <-ctx.Done(): + case <-time.After(1 * time.Second): + return errors.New("timeout") + } + return nil + }) + assert.Equal(t, 1, m.stat.RunningTasks) + err := m.Stop() + assert.Nil(t, err) + assert.Equal(t, 0, m.stat.RunningTasks) + }) } - - m.Run(context.Background(), metaval, func(ctx context.Context) error { - return assert.AnError - }) - m.Wait() - assert.True(t, executed) } diff --git a/manager/option.go b/manager/option.go new file mode 100644 index 0000000..afa163b --- /dev/null +++ b/manager/option.go @@ -0,0 +1,79 @@ +package manager + +import ( + "time" + + "github.com/eapache/go-resiliency/retrier" +) + +type Option interface { + Apply(*Manager) +} + +func WithCancelOnError() Option { + return &OptionWithCancelOnError{} +} + +type OptionWithCancelOnError struct{} + +func (o *OptionWithCancelOnError) Apply(m *Manager) { + *m.pool = *m.pool.WithCancelOnError() +} + +func WithFirstError() Option { + return &OptionWithFirstError{} +} + +type OptionWithFirstError struct{} + +func (o *OptionWithFirstError) Apply(m *Manager) { + *m.pool = *m.pool.WithFirstError() +} + +func WithMaxTasks(n int) Option { + return &OptionWithMaxTasks{n} +} + +type OptionWithMaxTasks struct { + n int +} + +func (o *OptionWithMaxTasks) Apply(m *Manager) { + *m.pool = *m.pool.WithMaxGoroutines(o.n) +} + +func WithConstantRetry(attempts int, backoffDuration time.Duration) Option { + return &OptionWithConstantRetry{ + attempts: attempts, + backoffDuration: backoffDuration, + } +} + +type OptionWithConstantRetry struct { + attempts int + backoffDuration time.Duration +} + +func (o *OptionWithConstantRetry) Apply(m *Manager) { + backoff := retrier.ConstantBackoff(o.attempts, o.backoffDuration) + m.retrier = retrier.New(backoff, retrier.DefaultClassifier{}) +} + +func WithExpotentialRetry(attempts int, minBackoffDuration time.Duration, maxBackoffDuration time.Duration) Option { + return &OptionWithExpotentialRetry{ + attempts: attempts, + minBackoffDuration: minBackoffDuration, + maxBackoffDuration: maxBackoffDuration, + } +} + +type OptionWithExpotentialRetry struct { + attempts int + minBackoffDuration time.Duration + maxBackoffDuration time.Duration +} + +func (o *OptionWithExpotentialRetry) Apply(m *Manager) { + backoff := retrier.LimitedExponentialBackoff(o.attempts, o.minBackoffDuration, o.maxBackoffDuration) + m.retrier = retrier.New(backoff, retrier.DefaultClassifier{}) +} From dc19d4995a4a91477cacf3f2fe9eec0d7cc975e0 Mon Sep 17 00:00:00 2001 From: Marek Cermak Date: Mon, 26 Feb 2024 18:56:11 +0100 Subject: [PATCH 4/5] chore: rename to preserve consistency Signed-off-by: Marek Cermak --- manager/{background.go => manager.go} | 0 manager/{background_test.go => manager_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename manager/{background.go => manager.go} (100%) rename manager/{background_test.go => manager_test.go} (100%) diff --git a/manager/background.go b/manager/manager.go similarity index 100% rename from manager/background.go rename to manager/manager.go diff --git a/manager/background_test.go b/manager/manager_test.go similarity index 100% rename from manager/background_test.go rename to manager/manager_test.go From 6c38ab3a44482e6b51f0e1e9ff72165da4b18ce8 Mon Sep 17 00:00:00 2001 From: Marek Cermak Date: Mon, 26 Feb 2024 19:07:57 +0100 Subject: [PATCH 5/5] doc: update README Signed-off-by: Marek Cermak --- readme.md | 68 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/readme.md b/readme.md index f8820ac..60ad214 100644 --- a/readme.md +++ b/readme.md @@ -6,11 +6,11 @@ ## Purpose -In Go, when the `main` function returns, any pending goroutines are terminated. This means that we need to keep track of them somehow so that `main` can wait for them to finish before returning. This is also useful in the context of servers - when the server receives a terminating signal from the host OS (ie. due to a new release being deployed) the application needs a way to delay the shutdown long enough for the goroutines to finish before allowing itself to be terminated. +In a Go application, there's often the need for background goroutines. These goroutines can be used for various purposes, but this library is focused on managing goroutines that are used for background tasks with nearly infinite lifetimes. These tasks are often used for things like periodic cleanup, background processing, or long-running connections. -This library makes that management process easier and adds some extra functionality on top, for good measure. +This library makes that management process easier by combining the retrier resiliency pattern implemented by the [eapache/go-resiliency](https://github.com/eapache/go-resiliency) package with a pooler from the wonderful [sourcegraph/conc](https://github.com/sourcegraph/conc) library. -> ⚠️ By no means is this a replacement for proper job queue system! The intended use case is for small, relatively fast functions that either do the actual work or schedule a job in some kind of a queue to do that work. Since even putting a job into a queue takes some time, you can remove that time from the client's request/response cycle and make your backend respond faster. +A typical example in production code might be a background task that periodically checks for new data in a queue and processes it. When the application is shutting down, it's important to wait for all these background tasks to finish before the process exits. This library provides a way to do that and a bit more on top of it. ## Installation @@ -27,40 +27,42 @@ import ( "context" "fmt" - "go.strv.io/background" -) - -// Define a type for the metadata that you want to associate with your tasks. -// The metadata is provided by the caller when a task is scheduled and is passed -// to the monitoring functions. -type TaskMetadata string + "go.strv.io/background/manager" func main() { - // Create a new background manager - manager := background.NewManager[TaskMetadata]() - // Define some monitoring functions for logging or error reporting - manager.OnTaskAdded = func(ctx context.Context, meta TaskMetadata) { - fmt.Println("Task added:", meta) - } - manager.OnTaskSucceeded = func(ctx context.Context, meta TaskMetadata) { - fmt.Println("Task succeeded:", meta) - } - manager.OnTaskFailed = func(ctx context.Context, meta TaskMetadata, err error) { - fmt.Println("Task failed:", meta, err) + ctx := context.Background() + // Create a new manager. + // The manager will cancel its context and all its tasks if any of the tasks returns an error. + // The manager will return the first error encountered by any of its tasks. + backgroundManager := manager.New(ctx, manager.WithCancelOnError(), manager.WithFirstError()) + //nolint + backgroundManager.Run( + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + // Fail after 2 seconds. + return context.DeadlineExceeded + } + }, + ) + backgroundManager.Run( + func(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + fmt.Printf("This won't be executed\n") + } + }, + ) + if err := backgroundManager.Wait(); err != nil { + //nolint + fmt.Printf("Error: %v\n", err) // Output: Error: context deadline exceeded } +) - // ... elsewhere in your codebase - manager.Run(context.Background(), "goroutine-1", func(ctx context.Context) error { - // Do some work here - return nil - }) - - - // Wait for all goroutines to finish - // Make sure you stop your components from adding more tasks - manager.Wait() - // Now it's safe to terminate the process -} ``` ## License