Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cleanup & tweaks, vol.2 #9

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions .github/workflows/test.yaml → .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
name: Tests
name: Continuous Integration

on:
push:
Expand All @@ -8,7 +8,20 @@ on:
pull_request:

jobs:
tests:
lint:
runs-on: ubuntu-22.04
timeout-minutes: 5
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
- run: go version
- run: make lint

test:
runs-on: ubuntu-22.04
timeout-minutes: 5
steps:
Expand Down
84 changes: 30 additions & 54 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,16 @@ package background

import (
"context"
"errors"
"sync"
"time"

"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)

// TaskType determines how the task will be executed by the manager.
type TaskType int

const (
// TaskTypeOneOff is the default task type. It will be executed only once.
TaskTypeOneOff TaskType = iota
// TaskTypeLoop will be executed in an infinite loop until the manager's Cancel() method is called. The task will
// restart immediately after the previous iteration returns.
TaskTypeLoop
)

var (
// ErrUnknownTaskType is returned when the task type is not a valid value of TaskType.
ErrUnknownTaskType = errors.New("unknown task type")
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their
// execution. `Meta` is whatever you wish to associate with this task, usually something that will help you keep track
// of the tasks in the observer.
//
// This is useful in context of HTTP servers, where a customer request may result in some kind of background processing
// activity that should not block the response and you schedule a goroutine to handle it. However, if your server
// receives a termination signal and you do not wait for these goroutines to finish, the goroutines will be killed
// before they can run to completion. This package is not a replacement for a proper task queue system but it is a great
// package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able
// to wait for all those goroutines to finish before allowing the process to exit.
type Manager struct {
stalledThreshold time.Duration
observer Observer
Expand All @@ -43,7 +20,7 @@ type Manager struct {
loopmgr loopmgr
}

// Options provides a means for configuring the background manager and attaching hooks to it.
// Options provides a means for configuring the background manager and providing the observer to it.
type Options struct {
// StalledThreshold is the amount of time within which the task should return before it is considered stalled. Note
// that no effort is made to actually stop or kill the task.
Expand All @@ -56,43 +33,26 @@ type Options struct {
Retry Retry
}

// Task describes how a unit of work (a function) should be executed.
type Task struct {
// Fn is the function to be executed in a goroutine.
Fn Fn
// Type is the type of the task. It determines how the task will be executed by the manager. Default is TaskTypeOneOff.
Type TaskType
// Meta is whatever custom information you wish to associate with the task. This will be passed to the observer's
// functions.
Meta Metadata
// Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry
// strategies you might have configured in the Manager. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
Retry Retry
}

// Retry defines the functions that control the retry behavior of a task. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
type Retry []strategy.Strategy

// Fn is the function to be executed in a goroutine.
type Fn func(ctx context.Context) error

// Metadata is whatever custom information you wish to associate with a task. This information will be available in your
// lifecycle hooks to help you identify which task is being processed.
type Metadata map[string]string

// NewManager creates a new instance of Manager with default options and no observer.
func NewManager() *Manager {
return NewManagerWithOptions(Options{})
}

// NewManagerWithOptions creates a new instance of Manager with the provided options and observer.
func NewManagerWithOptions(options Options) *Manager {
observer := options.Observer
if observer == nil {
observer = DefaultObserver{}
}

return &Manager{
stalledThreshold: options.StalledThreshold,
observer: options.Observer,
retry: options.Retry,
observer: observer,
loopmgr: mkloopmgr(),
}
}
Expand All @@ -108,7 +68,7 @@ func (m *Manager) Run(ctx context.Context, fn Fn) {
func (m *Manager) RunTask(ctx context.Context, task Task) {
ctx = context.WithoutCancel(ctx)
done := make(chan error, 1)
m.observer.callOnTaskAdded(ctx, task)
m.observer.OnTaskAdded(ctx, task)

switch task.Type {
case TaskTypeOneOff:
Expand All @@ -121,7 +81,7 @@ func (m *Manager) RunTask(ctx context.Context, task Task) {
go m.loop(ctx, task, done)

default:
m.observer.callOnTaskFailed(ctx, task, ErrUnknownTaskType)
m.observer.OnTaskFailed(ctx, task, ErrUnknownTaskType)
}
}

Expand All @@ -136,6 +96,22 @@ func (m *Manager) Cancel() {
m.loopmgr.cancel()
}

// Close is a convenience method that calls Wait() and Cancel() in parallel. It blocks until all tasks have finished.
func (m *Manager) Close() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
m.Wait()
wg.Done()
}()
wg.Add(1)
go func() {
m.Cancel()
wg.Done()
}()
wg.Wait()
}

// CountOf returns the number of tasks of the specified type that are currently running. When the TaskType is invalid it
// returns 0.
func (m *Manager) CountOf(t TaskType) int {
Expand Down Expand Up @@ -165,7 +141,7 @@ func (m *Manager) loop(ctx context.Context, task Task, done chan error) {
m.run(ctx, task, done)
err := <-done
if err != nil {
m.observer.callOnTaskFailed(ctx, task, err)
m.observer.OnTaskFailed(ctx, task, err)
}
}
}
Expand All @@ -177,12 +153,12 @@ func (m *Manager) observe(ctx context.Context, task Task, done <-chan error) {
for {
select {
case <-timeout:
m.observer.callOnTaskStalled(ctx, task)
m.observer.OnTaskStalled(ctx, task)
case err := <-done:
if err != nil {
m.observer.callOnTaskFailed(ctx, task, err)
m.observer.OnTaskFailed(ctx, task, err)
} else {
m.observer.callOnTaskSucceeded(ctx, task)
m.observer.OnTaskSucceeded(ctx, task)
}

return
Expand Down
57 changes: 45 additions & 12 deletions background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func Test_OnTaskAdded(t *testing.T) {
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options{
Observer: background.Observer{
OnTaskAdded: func(ctx context.Context, task background.Task) {
Observer: background.DefaultObserver{
HandleOnTaskAdded: func(ctx context.Context, task background.Task) {
assert.Equal(t, metadata, task.Meta)
executed = true
wg.Done()
Expand All @@ -110,8 +110,8 @@ func Test_OnTaskSucceeded(t *testing.T) {
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options{
Observer: background.Observer{
OnTaskSucceeded: func(ctx context.Context, task background.Task) {
Observer: background.DefaultObserver{
HandleOnTaskSucceeded: func(ctx context.Context, task background.Task) {
assert.Equal(t, metadata, task.Meta)
executed = true
wg.Done()
Expand All @@ -137,8 +137,8 @@ func Test_OnTaskFailed(t *testing.T) {
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options{
Observer: background.Observer{
OnTaskFailed: func(ctx context.Context, task background.Task, err error) {
Observer: background.DefaultObserver{
HandleOnTaskFailed: func(ctx context.Context, task background.Task, err error) {
assert.Equal(t, metadata, task.Meta)
assert.Error(t, err)
executed = true
Expand Down Expand Up @@ -182,8 +182,8 @@ func Test_OnTaskStalled(t *testing.T) {

m := background.NewManagerWithOptions(background.Options{
StalledThreshold: time.Millisecond * 5,
Observer: background.Observer{
OnTaskStalled: func(ctx context.Context, task background.Task) {
Observer: background.DefaultObserver{
HandleOnTaskStalled: func(ctx context.Context, task background.Task) {
assert.Equal(t, metadata, task.Meta)
executed = true
wg.Done()
Expand Down Expand Up @@ -214,8 +214,8 @@ func Test_StalledTaskStillCallsOnTaskSucceeded(t *testing.T) {
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options{
StalledThreshold: time.Millisecond,
Observer: background.Observer{
OnTaskSucceeded: func(ctx context.Context, task background.Task) {
Observer: background.DefaultObserver{
HandleOnTaskSucceeded: func(ctx context.Context, task background.Task) {
executed = true
wg.Done()
},
Expand Down Expand Up @@ -293,8 +293,8 @@ func Test_RunTaskTypeLoop_RetryStrategies(t *testing.T) {
count := 0

m := background.NewManagerWithOptions(background.Options{
Observer: background.Observer{
OnTaskFailed: func(ctx context.Context, task background.Task, err error) {
Observer: background.DefaultObserver{
HandleOnTaskFailed: func(ctx context.Context, task background.Task, err error) {
done <- err
},
},
Expand Down Expand Up @@ -389,3 +389,36 @@ func Test_CountOf(t *testing.T) {
assert.Equal(t, 0, m.CountOf(background.TaskTypeLoop))
assert.Equal(t, 0, m.CountOf(background.TaskType(3)))
}

func Test_Close(t *testing.T) {
m := background.NewManager()
proceed := make(chan bool, 1)

def := background.Task{
Type: background.TaskTypeLoop,
Fn: func(ctx context.Context) error {
<-proceed
return nil
},
}
m.RunTask(context.Background(), def)
def = background.Task{
Type: background.TaskTypeOneOff,
Fn: func(ctx context.Context) error {
<-proceed
return nil
},
}
m.RunTask(context.Background(), def)
assert.Equal(t, 1, m.CountOf(background.TaskTypeOneOff))
assert.Equal(t, 1, m.CountOf(background.TaskTypeLoop))

go func() {
proceed <- true
proceed <- true
}()

m.Close()
assert.Equal(t, 0, m.CountOf(background.TaskTypeOneOff))
assert.Equal(t, 0, m.CountOf(background.TaskTypeLoop))
}
63 changes: 42 additions & 21 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,61 @@ package background

import "context"

// Observer includes a set of functions that are called when certain events happen with the tasks that you schedule. All
// of them are optional; implement only those you need.
type Observer struct {
// OnTaskAdded is called immediately after adding the Task to the background manager.
OnTaskAdded func(ctx context.Context, task Task)
// Observer implements a set of methods that are called when certain events happen with the tasks that you schedule.
type Observer interface {
// OnTaskAdded is called immediately after scheduling the Task for execution.
OnTaskAdded(ctx context.Context, task Task)
// OnTaskSucceeded is called immediately after Task returns with no error.
OnTaskSucceeded func(ctx context.Context, task Task)
// OnTaskFailed is called immediately after Task returns with an error.
OnTaskFailed func(ctx context.Context, task Task, err error)
//
// Ignored for TaskTypeLoop.
OnTaskSucceeded(ctx context.Context, task Task)
// OnTaskFailed is called after Task returns an error and all retry policy attempts have been exhausted.
OnTaskFailed(ctx context.Context, task Task, err error)
// OnTaskStalled is called when the task does not return within the StalledThreshold. You can use this to make sure
// your tasks do not take excessive amounts of time to run to completion.
OnTaskStalled func(ctx context.Context, task Task)
//
// Ignored for TaskTypeLoop.
OnTaskStalled(ctx context.Context, task Task)
}

func (h Observer) callOnTaskFailed(ctx context.Context, task Task, err error) {
if h.OnTaskFailed != nil {
h.OnTaskFailed(ctx, task, err)
// DefaultObserver is an implementation of the Observer interface that allows you to observe only the events you are
// interested in.
type DefaultObserver struct {
// OnTaskAdded is called immediately after scheduling the Task for execution.
HandleOnTaskAdded func(ctx context.Context, task Task)
// OnTaskSucceeded is called immediately after Task returns with no error.
//
// Ignored for TaskTypeLoop.
HandleOnTaskSucceeded func(ctx context.Context, task Task)
// OnTaskFailed is called after Task returns an error and all retry policy attempts have been exhausted.
HandleOnTaskFailed func(ctx context.Context, task Task, err error)
// OnTaskStalled is called when the task does not return within the StalledThreshold. You can use this to make sure
// your tasks do not take excessive amounts of time to run to completion.
//
// Ignored for TaskTypeLoop.
HandleOnTaskStalled func(ctx context.Context, task Task)
}

func (o DefaultObserver) OnTaskFailed(ctx context.Context, task Task, err error) {
if o.HandleOnTaskFailed != nil {
o.HandleOnTaskFailed(ctx, task, err)
}
}

func (h Observer) callOnTaskSucceeded(ctx context.Context, task Task) {
if h.OnTaskSucceeded != nil {
h.OnTaskSucceeded(ctx, task)
func (o DefaultObserver) OnTaskSucceeded(ctx context.Context, task Task) {
if o.HandleOnTaskSucceeded != nil {
o.HandleOnTaskSucceeded(ctx, task)
}
}

func (h Observer) callOnTaskAdded(ctx context.Context, task Task) {
if h.OnTaskAdded != nil {
h.OnTaskAdded(ctx, task)
func (o DefaultObserver) OnTaskAdded(ctx context.Context, task Task) {
if o.HandleOnTaskAdded != nil {
o.HandleOnTaskAdded(ctx, task)
}
}

func (h Observer) callOnTaskStalled(ctx context.Context, task Task) {
if h.OnTaskStalled != nil {
h.OnTaskStalled(ctx, task)
func (o DefaultObserver) OnTaskStalled(ctx context.Context, task Task) {
if o.HandleOnTaskStalled != nil {
o.HandleOnTaskStalled(ctx, task)
}
}
Loading