Skip to content

Commit

Permalink
feat: refactor, restructure, polish 🎨 (#7)
Browse files Browse the repository at this point in the history
- the generic parameter has been removed. It was complicating things
quite significantly and brought very little benefit.
- the package's code has been split into multiple files to make it
easier to navigate the codebase
- a ton of stuff has been renamed, hopefully to make it more clear and
understandable
- internal task lifecycle monitoring has been refactored in order to
make room for long-running/permanent background tasks
- task count is now calculated using `sync/atomic`
  • Loading branch information
robertrossmann authored Mar 2, 2024
2 parents a48eb15 + 5030f3c commit af5f300
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 183 deletions.
197 changes: 67 additions & 130 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package background
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/kamilsk/retry/v5"
Expand All @@ -18,47 +19,34 @@ import (
// before they can run to completion. This package is not a replacement for a proper task queue system but it is a great
// package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able
// to wait for all those goroutines to finish before allowing the process to exit.
type Manager[Meta any] struct {
type Manager struct {
wg sync.WaitGroup
len int
len atomic.Int32
stalledThreshold time.Duration
hooks Hooks[Meta]
strategies Retry
observer Observer
retry Retry
}

// Options provides a means for configuring the background manager and attaching hooks to it.
type Options[Meta any] struct {
// StalledThreshold is the amount of time within which the goroutine should return before it is considered stalled.
// Note that no effort is made to actually kill the goroutine.
type Options struct {
// StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note
// that no effort is made to actually stop or kill the task.
StalledThreshold time.Duration
// Hooks allow you to register monitoring functions that are called when something happens with the goroutines that
// you schedule. These are useful for logging, monitoring, etc.
Hooks Hooks[Meta]
// DefaultRetry defines the default retry strategies that will be used for all tasks unless overridden by the task.
// Several strategies are provided by github.com/kamilsk/retry/v5/strategy package.
DefaultRetry Retry
}

// Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of
// them are optional; implement only those you need.
type Hooks[Meta any] struct {
// OnTaskAdded is called immediately after calling Run().
OnTaskAdded func(ctx context.Context, meta Meta)
// OnTaskSucceeded is called immediately after Task returns with no error.
OnTaskSucceeded func(ctx context.Context, meta Meta)
// OnTaskFailed is called immediately after Task returns with an error.
OnTaskFailed func(ctx context.Context, meta Meta, err error)
// OnGoroutineStalled is called when the goroutine does not return within the StalledThreshold. You can use this to
// make sure your goroutines do not take excessive amounts of time to run to completion.
OnGoroutineStalled func(ctx context.Context, meta Meta)
// Observer allow you to register monitoring functions that are called when something happens with the tasks that you
// schedule. These are useful for logging, monitoring, etc.
Observer Observer
// Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several
// strategies are provided by github.com/kamilsk/retry/v5/strategy package.
Retry Retry
}

// TaskDefinition describes how a unit of work (a Task) should be executed.
type TaskDefinition[Meta any] struct {
// Task is the function to be executed in a goroutine.
Task Task
// Meta is whatever you wish to associate with the task.
Meta Meta
// Task describes how a unit of work (a function) should be executed.
type Task struct {
// Fn is the function to be executed in a goroutine.
Fn Fn
// Meta is whatever custom information you wish to associate with the task. This will be passed to the observer's
// functions.
Meta Metadata
// Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry
// strategies you might have configured in the Manager. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
Expand All @@ -69,129 +57,78 @@ type TaskDefinition[Meta any] struct {
// github.com/kamilsk/retry/v5/strategy package.
type Retry []strategy.Strategy

// Task is the function to be executed in a goroutine
type Task func(ctx context.Context) error
// Fn is the function to be executed in a goroutine.
type Fn func(ctx context.Context) error

// NilMeta is a type that can be used as Meta generic type when you do not need to associate any metadata with the task.
type NilMeta *struct{}
// Metadata is whatever custom information you wish to associate with a task. This information will be available in your
// lifecycle hooks to help you identify which task is being processed.
type Metadata map[string]string

// NewManager creates a new instance of Manager with default options and no hooks.
func NewManager() *Manager[NilMeta] {
return &Manager[NilMeta]{}
// NewManager creates a new instance of Manager with default options and no observer.
func NewManager() *Manager {
return &Manager{}
}

// NewManagerWithOptions creates a new instance of Manager with the provided options and hooks.
func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] {
return &Manager[Meta]{
// NewManagerWithOptions creates a new instance of Manager with the provided options and observer.
func NewManagerWithOptions(options Options) *Manager {
return &Manager{
stalledThreshold: options.StalledThreshold,
hooks: options.Hooks,
strategies: options.DefaultRetry,
observer: options.Observer,
retry: options.Retry,
}
}

// Run schedules the provided task to be executed in a goroutine.
func (m *Manager[Meta]) Run(ctx context.Context, task Task) {
definition := TaskDefinition[Meta]{Task: task}
m.RunTaskDefinition(ctx, definition)
// Run schedules the provided function to be executed in a goroutine.
func (m *Manager) Run(ctx context.Context, fn Fn) {
task := Task{Fn: fn}
m.RunTask(ctx, task)
}

// RunTaskDefinition schedules the provided task definition to be executed in a goroutine.
func (m *Manager[Meta]) RunTaskDefinition(ctx context.Context, definition TaskDefinition[Meta]) {
m.callOnTaskAdded(ctx, definition.Meta)
// RunTask schedules the provided task to be executed in a goroutine.
func (m *Manager) RunTask(ctx context.Context, task Task) {
m.observer.callOnTaskAdded(ctx, task)
m.wg.Add(1)
m.len++
m.len.Add(1)

ctx = context.WithoutCancel(ctx)
done := make(chan bool, 1)
done := make(chan error, 1)

go m.run(ctx, definition, done)
go m.ticktock(ctx, definition.Meta, done)
go m.monitor(ctx, task, done)
go m.run(ctx, task, done)
}

// Wait blocks until all scheduled tasks have finished.
func (m *Manager[Meta]) Wait() {
func (m *Manager) Wait() {
m.wg.Wait()
}

// Len returns the number of currently running tasks.
func (m *Manager[Meta]) Len() int {
return m.len
func (m *Manager) Len() int32 {
return m.len.Load()
}

func (m *Manager[Meta]) run(ctx context.Context, definition TaskDefinition[Meta], done chan<- bool) {
strategies := mkstrategies(m.strategies, definition.Retry)
err := retry.Do(ctx, definition.Task, strategies...)
done <- true
m.wg.Done()
m.len--

if err != nil {
m.callOnTaskFailed(ctx, definition.Meta, err)
} else {
m.callOnTaskSucceeded(ctx, definition.Meta)
}
func (m *Manager) run(ctx context.Context, task Task, done chan<- error) {
strategies := mkstrategies(m.retry, task.Retry)
done <- retry.Do(ctx, task.Fn, strategies...)
}

func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan bool) {
func (m *Manager) monitor(ctx context.Context, task Task, done <-chan error) {
timeout := mktimeout(m.stalledThreshold)
select {
case <-done:
return
case <-timeout:
m.callOnGoroutineStalled(ctx, meta)
return
}
}

func (m *Manager[Meta]) callOnTaskFailed(ctx context.Context, meta Meta, err error) {
if m.hooks.OnTaskFailed != nil {
m.hooks.OnTaskFailed(ctx, meta, err)
for {
select {
case <-timeout:
m.observer.callOnTaskStalled(ctx, task)
case err := <-done:
if err != nil {
m.observer.callOnTaskFailed(ctx, task, err)
} else {
m.observer.callOnTaskSucceeded(ctx, task)
}

m.wg.Done()
m.len.Add(-1)
return
}
}
}

func (m *Manager[Meta]) callOnTaskSucceeded(ctx context.Context, meta Meta) {
if m.hooks.OnTaskSucceeded != nil {
m.hooks.OnTaskSucceeded(ctx, meta)
}
}

func (m *Manager[Meta]) callOnTaskAdded(ctx context.Context, meta Meta) {
if m.hooks.OnTaskAdded != nil {
m.hooks.OnTaskAdded(ctx, meta)
}
}

func (m *Manager[Meta]) callOnGoroutineStalled(ctx context.Context, meta Meta) {
if m.hooks.OnGoroutineStalled != nil {
m.hooks.OnGoroutineStalled(ctx, meta)
}
}

// mktimeout returns a channel that will receive the current time after the specified duration. If the duration is 0,
// the channel will never receive any message.
func mktimeout(duration time.Duration) <-chan time.Time {
if duration == 0 {
return make(<-chan time.Time)
}
return time.After(duration)
}

// mkstrategies prepares the retry strategies to be used for the task. If no defaults and no overrides are provided, a
// single execution attempt retry strategy is used. This is because the retry package would retry indefinitely on
// failure if no strategy is provided.
func mkstrategies(defaults []strategy.Strategy, overrides []strategy.Strategy) []strategy.Strategy {
result := make([]strategy.Strategy, 0, max(len(defaults), len(overrides)))

if len(overrides) > 0 {
result = append(result, overrides...)
} else {
result = append(result, defaults...)
}

// If no retry strategies are provided we default to a single execution attempt
if len(result) == 0 {
result = append(result, strategy.Limit(1))
}

return result
}
Loading

0 comments on commit af5f300

Please sign in to comment.