Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: background pooler with retry #5

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .gitignore
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to add all these, apart from .idea and .vscode.
For example, .env.test is something that we would want to have in repo, right? (if it did exist at all)

Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,23 @@ coverage.txt
**/*.bkp
**/*.log
**/*.tmp

# Environment
.env
.env.test
.secret

# Go modules
go.work
go.work.sum

# IDE
.idea/
.vscode/

# Debug
__debug*
*.log

# Playground
playground.go
88 changes: 0 additions & 88 deletions background.go

This file was deleted.

151 changes: 0 additions & 151 deletions background_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require github.com/stretchr/testify v1.8.4

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.6.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ lint: force
./tools/golangci-lint run -v

test: force
go test -v -timeout 5s -covermode=atomic -coverprofile=coverage.txt ./...
go test -v -timeout 30s -covermode=atomic -coverprofile=coverage.txt ./...

clean:
rm -rf .cache
Expand Down
96 changes: 96 additions & 0 deletions manager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package manager

import (
"context"

"github.com/eapache/go-resiliency/retrier"
"github.com/sourcegraph/conc/pool"
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish.
type Manager struct {
pool *pool.ContextPool
stat ManagerStats

// retrier is a retrier instance that will be used to run tasks if retry is enabled.
retrier *retrier.Retrier

// cancelFunc is the cancel function of the manager's context.
// It is called when the manager's Stop method is called.
cancelFunc context.CancelFunc
}

// ManagerStats contains statistics about the tasks operated by the manager.
type ManagerStats struct {
RunningTasks int `json:"running_tasks"`
}

// New creates a new instance of Manager with the provided generic type for the metadata argument.
func New(ctx context.Context, opts ...Option) *Manager {
ctx, cancelFunc := context.WithCancel(ctx)

p := pool.New().WithContext(ctx)
m := &Manager{
pool: p,
stat: ManagerStats{},
cancelFunc: cancelFunc,
}
for _, opt := range opts {
opt.Apply(m)
}
return m
}

// TaskFunc is the function to be executed in a goroutine.
type TaskFunc func(ctx context.Context) error

// Run submits a task to the pool.
// If all workers are busy, Run will block until a worker is available.
func (m *Manager) Run(task TaskFunc) {
taskFunc := m.withStats(m.withRetry(task, m.retrier))
m.pool.Go(taskFunc)
}

// RunWithRetry runs a task with a dedicated retrier.
// See [Run] for more details.
func (m *Manager) RunWithRetry(task TaskFunc, retry *retrier.Retrier) {
taskFunc := m.withStats(m.withRetry(task, retry))
m.pool.Go(taskFunc)
}

// Stop cancels the context of the manager and all its tasks.
func (m *Manager) Stop() error {
m.cancelFunc()
return m.pool.Wait()
}

// Wait blocks until all scheduled tasks have finished and propagate any panics spawned by a child to the caller.
// Wait returns an error if any of the tasks failed.
func (m *Manager) Wait() error {
defer m.cancelFunc()
return m.pool.Wait()
}

// Stat returns manager statistics.
func (m *Manager) Stat() ManagerStats {
return m.stat
}

func (m *Manager) withRetry(task TaskFunc, retry *retrier.Retrier) TaskFunc {
if retry == nil {
return task
}
return func(ctx context.Context) error {
return retry.RunCtx(ctx, task)
}
}

func (m *Manager) withStats(task TaskFunc) TaskFunc {
return func(ctx context.Context) error {
m.stat.RunningTasks++
Copy link

@Fazt01 Fazt01 Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a race condition here (not atomic operation).
simple: don't have stat (no adding and subtracting 1)
less-but-still-simple: add mutex or use atomic operation

defer func() {
m.stat.RunningTasks--
}()
return task(ctx)
}
}
Loading