Skip to content

Commit

Permalink
feat: overhaul construction patterns (.NewManagerWithOptions()) (#4)
Browse files Browse the repository at this point in the history
The way the `Manager` was constructed was a bit too rough, with the
consumer being forced to update fields on the initialised struct to
attach the hooks.

With this PR, we now have a new function
`background.NewManagerWithOptions()` that should make the initialisation
process a bit more clear as to what can go in. 💪
  • Loading branch information
robertrossmann authored Feb 28, 2024
2 parents 8026c21 + 8e8a051 commit 026eb05
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 78 deletions.
56 changes: 40 additions & 16 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,28 @@ import (
// package to schedule the queue jobs without the customer waiting for that to happen while at the same time being able
// to wait for all those goroutines to finish before allowing the process to exit.
type Manager[Meta any] struct {
wg sync.WaitGroup
len int
wg sync.WaitGroup
len int
stalledThreshold time.Duration
hooks Hooks[Meta]
}

// Options provides a means for configuring the background manager and attaching hooks to it.
type Options[Meta any] struct {
// StalledThreshold is the amount of time within which the goroutine should return before it is considered stalled.
// Note that no effort is made to actually kill the goroutine.
StalledThreshold time.Duration
// Hooks allow you to register monitoring functions that are called when something happens with the goroutines that
// you schedule. These are useful for logging, monitoring, etc.
Hooks Hooks[Meta]
}

// Hooks are a set of functions that are called when certain events happen with the goroutines that you schedule. All of
// them are optional; implement only those you need.
type Hooks[Meta any] struct {
// OnTaskAdded is called immediately after calling Run().
OnTaskAdded func(ctx context.Context, meta Meta)
// OnTaskSucceeded is called immediately after Task returns.
// OnTaskSucceeded is called immediately after Task returns with no error.
OnTaskSucceeded func(ctx context.Context, meta Meta)
// OnTaskFailed is called immediately after Task returns with an error.
OnTaskFailed func(ctx context.Context, meta Meta, err error)
Expand All @@ -36,9 +49,20 @@ type Manager[Meta any] struct {
// Task is the function to be executed in a goroutine
type Task func(ctx context.Context) error

// NewManager creates a new instance of Manager with the provided generic type for the metadata argument.
func NewManager[Meta any]() *Manager[Meta] {
return &Manager[Meta]{}
// NilMeta is a type that can be used as Meta generic type when you do not need to associate any metadata with the task.
type NilMeta *struct{}

// NewManager creates a new instance of Manager with default options and no hooks.
func NewManager() *Manager[NilMeta] {
return &Manager[NilMeta]{}
}

// NewManagerWithOptions creates a new instance of Manager with the provided options and hooks.
func NewManagerWithOptions[Meta any](options Options[Meta]) *Manager[Meta] {
return &Manager[Meta]{
stalledThreshold: options.StalledThreshold,
hooks: options.Hooks,
}
}

// Run schedules the provided task to be executed in a goroutine. `Meta` is whatever you wish to associate with the
Expand All @@ -60,7 +84,7 @@ func (m *Manager[Meta]) Wait() {
m.wg.Wait()
}

// Len returns the number of currently running tasks
// Len returns the number of currently running tasks.
func (m *Manager[Meta]) Len() int {
return m.len
}
Expand All @@ -79,7 +103,7 @@ func (m *Manager[Meta]) run(ctx context.Context, meta Meta, task Task, done chan
}

func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan bool) {
timeout := mktimeout(m.StalledThreshold)
timeout := mktimeout(m.stalledThreshold)
select {
case <-done:
return
Expand All @@ -90,26 +114,26 @@ func (m *Manager[Meta]) ticktock(ctx context.Context, meta Meta, done <-chan boo
}

func (m *Manager[Meta]) callOnTaskFailed(ctx context.Context, meta Meta, err error) {
if m.OnTaskFailed != nil {
m.OnTaskFailed(ctx, meta, err)
if m.hooks.OnTaskFailed != nil {
m.hooks.OnTaskFailed(ctx, meta, err)
}
}

func (m *Manager[Meta]) callOnTaskSucceeded(ctx context.Context, meta Meta) {
if m.OnTaskSucceeded != nil {
m.OnTaskSucceeded(ctx, meta)
if m.hooks.OnTaskSucceeded != nil {
m.hooks.OnTaskSucceeded(ctx, meta)
}
}

func (m *Manager[Meta]) callOnTaskAdded(ctx context.Context, meta Meta) {
if m.OnTaskAdded != nil {
m.OnTaskAdded(ctx, meta)
if m.hooks.OnTaskAdded != nil {
m.hooks.OnTaskAdded(ctx, meta)
}
}

func (m *Manager[Meta]) callOnGoroutineStalled(ctx context.Context, meta Meta) {
if m.OnGoroutineStalled != nil {
m.OnGoroutineStalled(ctx, meta)
if m.hooks.OnGoroutineStalled != nil {
m.hooks.OnGoroutineStalled(ctx, meta)
}
}

Expand Down
149 changes: 87 additions & 62 deletions background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,20 @@ import (
"go.strv.io/background"
)

type testmeta bool

func Test_New(t *testing.T) {
m := background.NewManager[bool]()
m := background.NewManager()
assert.NotNil(t, m)
assert.IsType(t, &background.Manager[bool]{}, m)
assert.Equal(t, m.StalledThreshold, time.Duration(0))
assert.Nil(t, m.OnTaskAdded)
assert.Nil(t, m.OnTaskSucceeded)
assert.Nil(t, m.OnTaskFailed)
assert.Nil(t, m.OnGoroutineStalled)
assert.IsType(t, &background.Manager[background.NilMeta]{}, m)
assert.Equal(t, 0, m.Len())
}

func Test_RunExecutesInGoroutine(t *testing.T) {
m := background.NewManager[bool]()
m := background.NewManager()
proceed := make(chan bool, 1)

m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), nil, func(ctx context.Context) error {
// Let the main thread advance a bit
<-proceed
proceed <- true
Expand All @@ -41,12 +39,12 @@ func Test_RunExecutesInGoroutine(t *testing.T) {
}

func Test_WaitWaitsForPendingTasks(t *testing.T) {
m := background.NewManager[bool]()
m := background.NewManager()
proceed := make(chan bool, 1)
done := make(chan bool, 1)
var waited bool

m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), nil, func(ctx context.Context) error {
// Let the main thread advance a bit
<-proceed
return nil
Expand All @@ -65,11 +63,11 @@ func Test_WaitWaitsForPendingTasks(t *testing.T) {
}

func Test_CancelledParentContext(t *testing.T) {
m := background.NewManager[bool]()
m := background.NewManager()
ctx, cancel := context.WithCancel(context.Background())
proceed := make(chan bool, 1)

m.Run(ctx, true, func(ctx context.Context) error {
m.Run(ctx, nil, func(ctx context.Context) error {
<-proceed
assert.Nil(t, ctx.Err())
return nil
Expand All @@ -81,15 +79,16 @@ func Test_CancelledParentContext(t *testing.T) {
}

func Test_Len(t *testing.T) {
m := background.NewManager[bool]()
proceed := make(chan bool, 1)
remaining := 10

m.OnTaskSucceeded = func(ctx context.Context, meta bool) {
remaining--
assert.Equal(t, remaining, m.Len())
proceed <- true
}
m := background.NewManagerWithOptions(background.Options[testmeta]{
Hooks: background.Hooks[testmeta]{
OnTaskSucceeded: func(ctx context.Context, meta testmeta) {
remaining--
proceed <- true
},
},
})

for range 10 {
m.Run(context.Background(), true, func(ctx context.Context) error {
Expand All @@ -104,54 +103,72 @@ func Test_Len(t *testing.T) {
}

func Test_OnTaskAdded(t *testing.T) {
m := background.NewManager[bool]()
metaval := true
var metaval testmeta = true
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options[testmeta]{
Hooks: background.Hooks[testmeta]{
OnTaskAdded: func(ctx context.Context, meta testmeta) {
assert.Equal(t, metaval, meta)
executed = true
wg.Done()
},
},
})

m.OnTaskAdded = func(ctx context.Context, meta bool) {
assert.Equal(t, metaval, meta)
executed = true
}

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return nil
})
m.Wait()

wg.Wait()
assert.True(t, executed)
}

func Test_OnTaskSucceeded(t *testing.T) {
m := background.NewManager[bool]()
metaval := true
var metaval testmeta = true
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options[testmeta]{
Hooks: background.Hooks[testmeta]{
OnTaskSucceeded: func(ctx context.Context, meta testmeta) {
assert.Equal(t, metaval, meta)
executed = true
wg.Done()
},
},
})

m.OnTaskSucceeded = func(ctx context.Context, meta bool) {
assert.Equal(t, metaval, meta)
executed = true
}

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return nil
})
m.Wait()

wg.Wait()
assert.True(t, executed)
}

func Test_OnTaskFailed(t *testing.T) {
m := background.NewManager[bool]()
metaval := true
var metaval testmeta = true
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options[testmeta]{
Hooks: background.Hooks[testmeta]{
OnTaskFailed: func(ctx context.Context, meta testmeta, err error) {
assert.Equal(t, metaval, meta)
assert.Error(t, err)
executed = true
wg.Done()
},
},
})

m.OnTaskFailed = func(ctx context.Context, meta bool, err error) {
assert.Equal(t, metaval, meta)
assert.Error(t, err)
executed = true
}

wg.Add(1)
m.Run(context.Background(), metaval, func(ctx context.Context) error {
return assert.AnError
})
m.Wait()

wg.Wait()
assert.True(t, executed)
}

Expand All @@ -167,47 +184,55 @@ func Test_OnGoroutineStalled(t *testing.T) {
}

for _, test := range tests {
m := background.NewManager[bool]()
m.StalledThreshold = time.Millisecond * 5

t.Run(fmt.Sprintf("duration of %s)", test.duration.String()), func(t *testing.T) {
var wg sync.WaitGroup
var metaval testmeta = true
executed := false
var wg sync.WaitGroup
if test.shouldExecute == true {
wg.Add(1)
}

m.OnGoroutineStalled = func(ctx context.Context, meta bool) {
executed = true
wg.Done()
}
m := background.NewManagerWithOptions(background.Options[testmeta]{
StalledThreshold: time.Millisecond * 5,
Hooks: background.Hooks[testmeta]{
OnGoroutineStalled: func(ctx context.Context, meta testmeta) {
assert.Equal(t, metaval, meta)
executed = true
wg.Done()
},
},
})

m.Run(context.Background(), true, func(ctx context.Context) error {
m.Run(context.Background(), metaval, func(ctx context.Context) error {
<-time.After(test.duration)
return nil
})

wg.Wait()
m.Wait()
assert.Equal(t, test.shouldExecute, executed)
})
}
}

func Test_StalledGoroutineStillCallsOnTaskSucceeded(t *testing.T) {
m := background.NewManager[bool]()
m.StalledThreshold = time.Millisecond
executed := false
var wg sync.WaitGroup
m := background.NewManagerWithOptions(background.Options[testmeta]{
StalledThreshold: time.Millisecond,
Hooks: background.Hooks[testmeta]{
OnTaskSucceeded: func(ctx context.Context, meta testmeta) {
executed = true
wg.Done()
},
},
})

m.OnTaskSucceeded = func(ctx context.Context, meta bool) {
executed = true
}

wg.Add(1)
m.Run(context.Background(), true, func(ctx context.Context) error {
<-time.After(time.Millisecond * 3)
return nil
})

m.Wait()
wg.Wait()
assert.True(t, executed)
}

0 comments on commit 026eb05

Please sign in to comment.