diff --git a/.github/workflows/test.yaml b/.github/workflows/ci.yaml similarity index 61% rename from .github/workflows/test.yaml rename to .github/workflows/ci.yaml index 7dee960..b5c2ebc 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/ci.yaml @@ -1,5 +1,5 @@ --- -name: Tests +name: Continuous Integration on: push: @@ -8,7 +8,20 @@ on: pull_request: jobs: - tests: + lint: + runs-on: ubuntu-22.04 + timeout-minutes: 5 + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + - uses: actions/setup-go@v5 + with: + go-version-file: go.mod + - run: go version + - run: make lint + + test: runs-on: ubuntu-22.04 timeout-minutes: 5 steps: diff --git a/background.go b/background.go index 6452949..f18944f 100644 --- a/background.go +++ b/background.go @@ -2,39 +2,16 @@ package background import ( "context" - "errors" + "sync" "time" "github.com/kamilsk/retry/v5" "github.com/kamilsk/retry/v5/strategy" ) -// TaskType determines how the task will be executed by the manager. -type TaskType int - -const ( - // TaskTypeOneOff is the default task type. It will be executed only once. - TaskTypeOneOff TaskType = iota - // TaskTypeLoop will be executed in an infinite loop until the manager's Cancel() method is called. The task will - // restart immediately after the previous iteration returns. - TaskTypeLoop -) - -var ( - // ErrUnknownTaskType is returned when the task type is not a valid value of TaskType. - ErrUnknownTaskType = errors.New("unknown task type") -) - // Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their // execution. `Meta` is whatever you wish to associate with this task, usually something that will help you keep track // of the tasks in the observer. -// -// This is useful in context of HTTP servers, where a customer request may result in some kind of background processing -// activity that should not block the response and you schedule a goroutine to handle it. However, if your server -// receives a termination signal and you do not wait for these goroutines to finish, the goroutines will be killed -// before they can run to completion. This package is not a replacement for a proper task queue system but it is a great -// package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able -// to wait for all those goroutines to finish before allowing the process to exit. type Manager struct { stalledThreshold time.Duration observer Observer @@ -43,7 +20,7 @@ type Manager struct { loopmgr loopmgr } -// Options provides a means for configuring the background manager and attaching hooks to it. +// Options provides a means for configuring the background manager and providing the observer to it. type Options struct { // StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note // that no effort is made to actually stop or kill the task. @@ -56,32 +33,10 @@ type Options struct { Retry Retry } -// Task describes how a unit of work (a function) should be executed. -type Task struct { - // Fn is the function to be executed in a goroutine. - Fn Fn - // Type is the type of the task. It determines how the task will be executed by the manager. Default is TaskTypeOneOff. - Type TaskType - // Meta is whatever custom information you wish to associate with the task. This will be passed to the observer's - // functions. - Meta Metadata - // Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry - // strategies you might have configured in the Manager. Several strategies are provided by - // github.com/kamilsk/retry/v5/strategy package. - Retry Retry -} - // Retry defines the functions that control the retry behavior of a task. Several strategies are provided by // github.com/kamilsk/retry/v5/strategy package. type Retry []strategy.Strategy -// Fn is the function to be executed in a goroutine. -type Fn func(ctx context.Context) error - -// Metadata is whatever custom information you wish to associate with a task. This information will be available in your -// lifecycle hooks to help you identify which task is being processed. -type Metadata map[string]string - // NewManager creates a new instance of Manager with default options and no observer. func NewManager() *Manager { return NewManagerWithOptions(Options{}) @@ -89,10 +44,15 @@ func NewManager() *Manager { // NewManagerWithOptions creates a new instance of Manager with the provided options and observer. func NewManagerWithOptions(options Options) *Manager { + observer := options.Observer + if observer == nil { + observer = DefaultObserver{} + } + return &Manager{ stalledThreshold: options.StalledThreshold, - observer: options.Observer, retry: options.Retry, + observer: observer, loopmgr: mkloopmgr(), } } @@ -108,7 +68,7 @@ func (m *Manager) Run(ctx context.Context, fn Fn) { func (m *Manager) RunTask(ctx context.Context, task Task) { ctx = context.WithoutCancel(ctx) done := make(chan error, 1) - m.observer.callOnTaskAdded(ctx, task) + m.observer.OnTaskAdded(ctx, task) switch task.Type { case TaskTypeOneOff: @@ -121,7 +81,7 @@ func (m *Manager) RunTask(ctx context.Context, task Task) { go m.loop(ctx, task, done) default: - m.observer.callOnTaskFailed(ctx, task, ErrUnknownTaskType) + m.observer.OnTaskFailed(ctx, task, ErrUnknownTaskType) } } @@ -136,6 +96,22 @@ func (m *Manager) Cancel() { m.loopmgr.cancel() } +// Close is a convenience method that calls Wait() and Cancel() in parallel. It blocks until all tasks have finished. +func (m *Manager) Close() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + m.Wait() + wg.Done() + }() + wg.Add(1) + go func() { + m.Cancel() + wg.Done() + }() + wg.Wait() +} + // CountOf returns the number of tasks of the specified type that are currently running. When the TaskType is invalid it // returns 0. func (m *Manager) CountOf(t TaskType) int { @@ -165,7 +141,7 @@ func (m *Manager) loop(ctx context.Context, task Task, done chan error) { m.run(ctx, task, done) err := <-done if err != nil { - m.observer.callOnTaskFailed(ctx, task, err) + m.observer.OnTaskFailed(ctx, task, err) } } } @@ -177,12 +153,12 @@ func (m *Manager) observe(ctx context.Context, task Task, done <-chan error) { for { select { case <-timeout: - m.observer.callOnTaskStalled(ctx, task) + m.observer.OnTaskStalled(ctx, task) case err := <-done: if err != nil { - m.observer.callOnTaskFailed(ctx, task, err) + m.observer.OnTaskFailed(ctx, task, err) } else { - m.observer.callOnTaskSucceeded(ctx, task) + m.observer.OnTaskSucceeded(ctx, task) } return diff --git a/background_test.go b/background_test.go index 6b9c256..71d6705 100644 --- a/background_test.go +++ b/background_test.go @@ -83,8 +83,8 @@ func Test_OnTaskAdded(t *testing.T) { executed := false var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ - Observer: background.Observer{ - OnTaskAdded: func(ctx context.Context, task background.Task) { + Observer: background.DefaultObserver{ + HandleOnTaskAdded: func(ctx context.Context, task background.Task) { assert.Equal(t, metadata, task.Meta) executed = true wg.Done() @@ -110,8 +110,8 @@ func Test_OnTaskSucceeded(t *testing.T) { executed := false var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ - Observer: background.Observer{ - OnTaskSucceeded: func(ctx context.Context, task background.Task) { + Observer: background.DefaultObserver{ + HandleOnTaskSucceeded: func(ctx context.Context, task background.Task) { assert.Equal(t, metadata, task.Meta) executed = true wg.Done() @@ -137,8 +137,8 @@ func Test_OnTaskFailed(t *testing.T) { executed := false var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ - Observer: background.Observer{ - OnTaskFailed: func(ctx context.Context, task background.Task, err error) { + Observer: background.DefaultObserver{ + HandleOnTaskFailed: func(ctx context.Context, task background.Task, err error) { assert.Equal(t, metadata, task.Meta) assert.Error(t, err) executed = true @@ -182,8 +182,8 @@ func Test_OnTaskStalled(t *testing.T) { m := background.NewManagerWithOptions(background.Options{ StalledThreshold: time.Millisecond * 5, - Observer: background.Observer{ - OnTaskStalled: func(ctx context.Context, task background.Task) { + Observer: background.DefaultObserver{ + HandleOnTaskStalled: func(ctx context.Context, task background.Task) { assert.Equal(t, metadata, task.Meta) executed = true wg.Done() @@ -214,8 +214,8 @@ func Test_StalledTaskStillCallsOnTaskSucceeded(t *testing.T) { var wg sync.WaitGroup m := background.NewManagerWithOptions(background.Options{ StalledThreshold: time.Millisecond, - Observer: background.Observer{ - OnTaskSucceeded: func(ctx context.Context, task background.Task) { + Observer: background.DefaultObserver{ + HandleOnTaskSucceeded: func(ctx context.Context, task background.Task) { executed = true wg.Done() }, @@ -293,8 +293,8 @@ func Test_RunTaskTypeLoop_RetryStrategies(t *testing.T) { count := 0 m := background.NewManagerWithOptions(background.Options{ - Observer: background.Observer{ - OnTaskFailed: func(ctx context.Context, task background.Task, err error) { + Observer: background.DefaultObserver{ + HandleOnTaskFailed: func(ctx context.Context, task background.Task, err error) { done <- err }, }, @@ -389,3 +389,36 @@ func Test_CountOf(t *testing.T) { assert.Equal(t, 0, m.CountOf(background.TaskTypeLoop)) assert.Equal(t, 0, m.CountOf(background.TaskType(3))) } + +func Test_Close(t *testing.T) { + m := background.NewManager() + proceed := make(chan bool, 1) + + def := background.Task{ + Type: background.TaskTypeLoop, + Fn: func(ctx context.Context) error { + <-proceed + return nil + }, + } + m.RunTask(context.Background(), def) + def = background.Task{ + Type: background.TaskTypeOneOff, + Fn: func(ctx context.Context) error { + <-proceed + return nil + }, + } + m.RunTask(context.Background(), def) + assert.Equal(t, 1, m.CountOf(background.TaskTypeOneOff)) + assert.Equal(t, 1, m.CountOf(background.TaskTypeLoop)) + + go func() { + proceed <- true + proceed <- true + }() + + m.Close() + assert.Equal(t, 0, m.CountOf(background.TaskTypeOneOff)) + assert.Equal(t, 0, m.CountOf(background.TaskTypeLoop)) +} diff --git a/observer.go b/observer.go index a92bdd4..fd94a0d 100644 --- a/observer.go +++ b/observer.go @@ -2,40 +2,61 @@ package background import "context" -// Observer includes a set of functions that are called when certain events happen with the tasks that you schedule. All -// of them are optional; implement only those you need. -type Observer struct { - // OnTaskAdded is called immediately after adding the Task to the background manager. - OnTaskAdded func(ctx context.Context, task Task) +// Observer implements a set of methods that are called when certain events happen with the tasks that you schedule. +type Observer interface { + // OnTaskAdded is called immediately after scheduling the Task for execution. + OnTaskAdded(ctx context.Context, task Task) // OnTaskSucceeded is called immediately after Task returns with no error. - OnTaskSucceeded func(ctx context.Context, task Task) - // OnTaskFailed is called immediately after Task returns with an error. - OnTaskFailed func(ctx context.Context, task Task, err error) + // + // Ignored for TaskTypeLoop. + OnTaskSucceeded(ctx context.Context, task Task) + // OnTaskFailed is called after Task returns an error and all retry policy attempts have been exhausted. + OnTaskFailed(ctx context.Context, task Task, err error) // OnTaskStalled is called when the task does not return within the StalledThreshold. You can use this to make sure // your tasks do not take excessive amounts of time to run to completion. - OnTaskStalled func(ctx context.Context, task Task) + // + // Ignored for TaskTypeLoop. + OnTaskStalled(ctx context.Context, task Task) } -func (h Observer) callOnTaskFailed(ctx context.Context, task Task, err error) { - if h.OnTaskFailed != nil { - h.OnTaskFailed(ctx, task, err) +// DefaultObserver is an implementation of the Observer interface that allows you to observe only the events you are +// interested in. +type DefaultObserver struct { + // OnTaskAdded is called immediately after scheduling the Task for execution. + HandleOnTaskAdded func(ctx context.Context, task Task) + // OnTaskSucceeded is called immediately after Task returns with no error. + // + // Ignored for TaskTypeLoop. + HandleOnTaskSucceeded func(ctx context.Context, task Task) + // OnTaskFailed is called after Task returns an error and all retry policy attempts have been exhausted. + HandleOnTaskFailed func(ctx context.Context, task Task, err error) + // OnTaskStalled is called when the task does not return within the StalledThreshold. You can use this to make sure + // your tasks do not take excessive amounts of time to run to completion. + // + // Ignored for TaskTypeLoop. + HandleOnTaskStalled func(ctx context.Context, task Task) +} + +func (o DefaultObserver) OnTaskFailed(ctx context.Context, task Task, err error) { + if o.HandleOnTaskFailed != nil { + o.HandleOnTaskFailed(ctx, task, err) } } -func (h Observer) callOnTaskSucceeded(ctx context.Context, task Task) { - if h.OnTaskSucceeded != nil { - h.OnTaskSucceeded(ctx, task) +func (o DefaultObserver) OnTaskSucceeded(ctx context.Context, task Task) { + if o.HandleOnTaskSucceeded != nil { + o.HandleOnTaskSucceeded(ctx, task) } } -func (h Observer) callOnTaskAdded(ctx context.Context, task Task) { - if h.OnTaskAdded != nil { - h.OnTaskAdded(ctx, task) +func (o DefaultObserver) OnTaskAdded(ctx context.Context, task Task) { + if o.HandleOnTaskAdded != nil { + o.HandleOnTaskAdded(ctx, task) } } -func (h Observer) callOnTaskStalled(ctx context.Context, task Task) { - if h.OnTaskStalled != nil { - h.OnTaskStalled(ctx, task) +func (o DefaultObserver) OnTaskStalled(ctx context.Context, task Task) { + if o.HandleOnTaskStalled != nil { + o.HandleOnTaskStalled(ctx, task) } } diff --git a/task.go b/task.go new file mode 100644 index 0000000..eeafc6f --- /dev/null +++ b/task.go @@ -0,0 +1,44 @@ +package background + +import ( + "context" + "errors" +) + +// TaskType determines how the task will be executed by the manager. +type TaskType int + +const ( + // TaskTypeOneOff is the default task type. It will be executed only once. + TaskTypeOneOff TaskType = iota + // TaskTypeLoop will be executed in an infinite loop until the manager's Cancel() method is called. The task will + // restart immediately after the previous iteration returns. + TaskTypeLoop +) + +var ( + // ErrUnknownTaskType is returned when the task type is not a valid value of TaskType. + ErrUnknownTaskType = errors.New("unknown task type") +) + +// Task describes how a unit of work (a function) should be executed. +type Task struct { + // Fn is the function to be executed in a goroutine. + Fn Fn + // Type is the type of the task. It determines how the task will be executed by the manager. Default is TaskTypeOneOff. + Type TaskType + // Meta is whatever custom information you wish to associate with the task. This will be passed to the observer's + // functions. + Meta Metadata + // Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry + // strategies you might have configured in the Manager. Several strategies are provided by + // github.com/kamilsk/retry/v5/strategy package. + Retry Retry +} + +// Fn is the function to be executed in a goroutine. +type Fn func(ctx context.Context) error + +// Metadata is whatever custom information you wish to associate with a task. You can access this data in the observer's +// methods to help you identify the task or get more context about it. +type Metadata map[string]string