Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spike: remove generics #320

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/Maldris/mathparse v0.0.0-20170508133428-f0d009a7a773
github.com/ethereum/go-ethereum v1.13.8
github.com/go-echarts/go-echarts/v2 v2.2.6
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.1
github.com/jedib0t/go-pretty/v6 v6.4.7
github.com/pkg/errors v0.9.1
Expand Down
23 changes: 12 additions & 11 deletions internal/util/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ func TestNewSyncedArray(t *testing.T) {
})
}

func TestUnflatten(t *testing.T) {
groups := Unflatten[int]([]int{0, 1, 1, 2, 2, 3, 4, 4, 5, 5, 6, 7, 8, 9, 111, 999, 999}, 3)

assert.Equal(t, 6, len(groups))
assert.Equal(t, []int{0, 1, 1}, groups[0])
assert.Equal(t, []int{2, 2, 3}, groups[1])
assert.Equal(t, []int{4, 4, 5}, groups[2])
assert.Equal(t, []int{5, 6, 7}, groups[3])
assert.Equal(t, []int{8, 9, 111}, groups[4])
assert.Equal(t, []int{999, 999}, groups[5])
}
//
//func TestUnflatten(t *testing.T) {
// groups := Unflatten[int]([]ocr2keepers.UpkeepKey{0, 1, 1, 2, 2, 3, 4, 4, 5, 5, 6, 7, 8, 9, 111, 999, 999}, 3)
//
// assert.Equal(t, 6, len(groups))
// assert.Equal(t, []int{0, 1, 1}, groups[0])
// assert.Equal(t, []int{2, 2, 3}, groups[1])
// assert.Equal(t, []int{4, 4, 5}, groups[2])
// assert.Equal(t, []int{5, 6, 7}, groups[3])
// assert.Equal(t, []int{8, 9, 111}, groups[4])
// assert.Equal(t, []int{999, 999}, groups[5])
//}
109 changes: 55 additions & 54 deletions pkg/util/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"fmt"
ocr2keepers "github.com/smartcontractkit/chainlink-automation/pkg/v2"
"math/rand"
"sync"
"sync/atomic"
Expand All @@ -14,24 +15,24 @@ var (
ErrContextCancelled = fmt.Errorf("worker context cancelled")
)

type WorkItemResult[T any] struct {
type WorkItemResult struct {
Worker string
Data T
Data []ocr2keepers.UpkeepResult
Err error
Time time.Duration
}

type WorkItem[T any] func(context.Context) (T, error)
type WorkItem func(context.Context) ([]ocr2keepers.UpkeepResult, error)

type worker[T any] struct {
type worker struct {
Name string
Queue chan *worker[T]
Queue chan *worker
}

func (w *worker[T]) Do(ctx context.Context, r func(WorkItemResult[T]), wrk WorkItem[T]) {
func (w *worker) Do(ctx context.Context, r func(WorkItemResult), wrk WorkItem) {
start := time.Now()

var data T
var data []ocr2keepers.UpkeepResult
var err error

if ctx.Err() != nil {
Expand All @@ -40,7 +41,7 @@ func (w *worker[T]) Do(ctx context.Context, r func(WorkItemResult[T]), wrk WorkI
data, err = wrk(ctx)
}

r(WorkItemResult[T]{
r(WorkItemResult{
Worker: w.Name,
Data: data,
Err: err,
Expand All @@ -54,21 +55,21 @@ func (w *worker[T]) Do(ctx context.Context, r func(WorkItemResult[T]), wrk WorkI
}
}

type GroupedItem[T any] struct {
type GroupedItem struct {
Group int
Item WorkItem[T]
Item WorkItem
}

type WorkerGroup[T any] struct {
type WorkerGroup struct {
maxWorkers int
activeWorkers int
workers chan *worker[T]
workers chan *worker

queue *Queue[GroupedItem[T]]
input chan GroupedItem[T]
queue *Queue
input chan GroupedItem
chInputNotify chan struct{}
mu sync.Mutex
resultData map[int][]WorkItemResult[T]
resultData map[int][]WorkItemResult
resultNotify map[int]chan struct{}

// channels used to stop processing
Expand All @@ -82,15 +83,15 @@ type WorkerGroup[T any] struct {
once sync.Once
}

func NewWorkerGroup[T any](workers int, queue int) *WorkerGroup[T] {
func NewWorkerGroup(workers int, queue int) *WorkerGroup {
svcCtx, svcCancel := context.WithCancel(context.Background())
wg := &WorkerGroup[T]{
wg := &WorkerGroup{
maxWorkers: workers,
workers: make(chan *worker[T], workers),
queue: &Queue[GroupedItem[T]]{},
input: make(chan GroupedItem[T], 1),
workers: make(chan *worker, workers),
queue: &Queue{},
input: make(chan GroupedItem, 1),
chInputNotify: make(chan struct{}, 1),
resultData: map[int][]WorkItemResult[T]{},
resultData: map[int][]WorkItemResult{},
resultNotify: map[int]chan struct{}{},
chStopInputs: make(chan struct{}),
chStopProcessing: make(chan struct{}),
Expand All @@ -105,7 +106,7 @@ func NewWorkerGroup[T any](workers int, queue int) *WorkerGroup[T] {

// Do adds a new work item onto the work queue. This function blocks until
// the work queue clears up or the context is cancelled.
func (wg *WorkerGroup[T]) Do(ctx context.Context, w WorkItem[T], group int) error {
func (wg *WorkerGroup) Do(ctx context.Context, w WorkItem, group int) error {

if ctx.Err() != nil {
return fmt.Errorf("%w; work not added to queue", ErrContextCancelled)
Expand All @@ -115,14 +116,14 @@ func (wg *WorkerGroup[T]) Do(ctx context.Context, w WorkItem[T], group int) erro
return fmt.Errorf("%w; work not added to queue", ErrProcessStopped)
}

gi := GroupedItem[T]{
gi := GroupedItem{
Group: group,
Item: w,
}

wg.mu.Lock()
if _, ok := wg.resultData[group]; !ok {
wg.resultData[group] = make([]WorkItemResult[T], 0)
wg.resultData[group] = make([]WorkItemResult, 0)
}

if _, ok := wg.resultNotify[group]; !ok {
Expand All @@ -140,7 +141,7 @@ func (wg *WorkerGroup[T]) Do(ctx context.Context, w WorkItem[T], group int) erro
}
}

func (wg *WorkerGroup[T]) NotifyResult(group int) <-chan struct{} {
func (wg *WorkerGroup) NotifyResult(group int) <-chan struct{} {
wg.mu.Lock()
defer wg.mu.Unlock()

Expand All @@ -155,18 +156,18 @@ func (wg *WorkerGroup[T]) NotifyResult(group int) <-chan struct{} {
return ch
}

func (wg *WorkerGroup[T]) Results(group int) []WorkItemResult[T] {
func (wg *WorkerGroup) Results(group int) []WorkItemResult {
wg.mu.Lock()
defer wg.mu.Unlock()

resultData, ok := wg.resultData[group]
if !ok {
wg.resultData[group] = []WorkItemResult[T]{}
wg.resultData[group] = []WorkItemResult{}

return wg.resultData[group]
}

wg.resultData[group] = []WorkItemResult[T]{}
wg.resultData[group] = []WorkItemResult{}

// results are stored as latest first
// switch the order to provide oldest first
Expand All @@ -179,23 +180,23 @@ func (wg *WorkerGroup[T]) Results(group int) []WorkItemResult[T] {
return resultData
}

func (wg *WorkerGroup[T]) RemoveGroup(group int) {
func (wg *WorkerGroup) RemoveGroup(group int) {
wg.mu.Lock()
defer wg.mu.Unlock()

delete(wg.resultData, group)
delete(wg.resultNotify, group)
}

func (wg *WorkerGroup[T]) Stop() {
func (wg *WorkerGroup) Stop() {
wg.once.Do(func() {
wg.svcCancel()
wg.queueClosed.Store(true)
wg.chStopInputs <- struct{}{}
})
}

func (wg *WorkerGroup[T]) processQueue() {
func (wg *WorkerGroup) processQueue() {
for {
if wg.queue.Len() == 0 {
break
Expand All @@ -214,7 +215,7 @@ func (wg *WorkerGroup[T]) processQueue() {
}
}

func (wg *WorkerGroup[T]) runQueuing() {
func (wg *WorkerGroup) runQueuing() {
for {
select {
case item := <-wg.input:
Expand All @@ -233,7 +234,7 @@ func (wg *WorkerGroup[T]) runQueuing() {
}
}

func (wg *WorkerGroup[T]) runProcessing() {
func (wg *WorkerGroup) runProcessing() {
for {
select {
// watch notification channel and begin processing queue
Expand All @@ -246,7 +247,7 @@ func (wg *WorkerGroup[T]) runProcessing() {
}
}

func (wg *WorkerGroup[T]) run() {
func (wg *WorkerGroup) run() {
// start listening on the input channel for new jobs
go wg.runQueuing()

Expand All @@ -258,14 +259,14 @@ func (wg *WorkerGroup[T]) run() {
wg.processQueue()
}

func (wg *WorkerGroup[T]) doJob(item GroupedItem[T]) {
var wkr *worker[T]
func (wg *WorkerGroup) doJob(item GroupedItem) {
var wkr *worker

// no read or write locks on activeWorkers or maxWorkers because it's
// assumed the job loop is a single process reading from the job queue
if wg.activeWorkers < wg.maxWorkers {
// create a new worker
wkr = &worker[T]{
wkr = &worker{
Name: fmt.Sprintf("worker-%d", wg.activeWorkers+1),
Queue: wg.workers,
}
Expand All @@ -279,22 +280,22 @@ func (wg *WorkerGroup[T]) doJob(item GroupedItem[T]) {
go wkr.Do(wg.svcCtx, wg.storeResult(item.Group), item.Item)
}

func (wg *WorkerGroup[T]) storeResult(group int) func(result WorkItemResult[T]) {
return func(result WorkItemResult[T]) {
func (wg *WorkerGroup) storeResult(group int) func(result WorkItemResult) {
return func(result WorkItemResult) {
wg.mu.Lock()
defer wg.mu.Unlock()

_, ok := wg.resultData[group]
if !ok {
wg.resultData[group] = make([]WorkItemResult[T], 0)
wg.resultData[group] = make([]WorkItemResult, 0)
}

_, ok = wg.resultNotify[group]
if !ok {
wg.resultNotify[group] = make(chan struct{}, 1)
}

wg.resultData[group] = append([]WorkItemResult[T]{result}, wg.resultData[group]...)
wg.resultData[group] = append([]WorkItemResult{result}, wg.resultData[group]...)

select {
case wg.resultNotify[group] <- struct{}{}:
Expand All @@ -303,16 +304,16 @@ func (wg *WorkerGroup[T]) storeResult(group int) func(result WorkItemResult[T])
}
}

type JobFunc[T, K any] func(context.Context, T) (K, error)
type JobResultFunc[T any] func(T, error)
type JobFunc func(context.Context, []ocr2keepers.UpkeepKey) ([]ocr2keepers.UpkeepResult, error)
type JobResultFunc func([]ocr2keepers.UpkeepResult, error)

func RunJobs[T, K any](ctx context.Context, wg *WorkerGroup[T], jobs []K, jobFunc JobFunc[K, T], resFunc JobResultFunc[T]) {
func RunJobs(ctx context.Context, wg *WorkerGroup, jobs [][]ocr2keepers.UpkeepKey, jobFunc JobFunc, resFunc JobResultFunc) {
var wait sync.WaitGroup
end := make(chan struct{}, 1)

group := rand.Intn(1_000_000_000)

go func(g *WorkerGroup[T], w *sync.WaitGroup, ch chan struct{}) {
go func(g *WorkerGroup, w *sync.WaitGroup, ch chan struct{}) {
for {
select {
case <-g.NotifyResult(group):
Expand Down Expand Up @@ -348,8 +349,8 @@ func RunJobs[T, K any](ctx context.Context, wg *WorkerGroup[T], jobs []K, jobFun
close(end)
}

func makeJobFunc[T, K any](jobCtx context.Context, value T, jobFunc JobFunc[T, K]) WorkItem[K] {
return func(svcCtx context.Context) (K, error) {
func makeJobFunc(jobCtx context.Context, value []ocr2keepers.UpkeepKey, jobFunc JobFunc) WorkItem {
return func(svcCtx context.Context) ([]ocr2keepers.UpkeepResult, error) {
// the jobFunc should exit in the case that either the job context
// cancels or the worker service context cancels. To ensure we don't end
// up with memory leaks, cancel the merged context to release resources.
Expand All @@ -359,38 +360,38 @@ func makeJobFunc[T, K any](jobCtx context.Context, value T, jobFunc JobFunc[T, K
}
}

type Queue[T any] struct {
type Queue struct {
mu sync.RWMutex
values []T
values []GroupedItem
}

func (q *Queue[T]) Add(values ...T) {
func (q *Queue) Add(values ...GroupedItem) {
q.mu.Lock()
defer q.mu.Unlock()

q.values = append(q.values, values...)
}

func (q *Queue[T]) Pop() (T, error) {
func (q *Queue) Pop() (GroupedItem, error) {
q.mu.Lock()
defer q.mu.Unlock()

if len(q.values) == 0 {
return getZero[T](), fmt.Errorf("no values to return")
return GroupedItem{}, fmt.Errorf("no values to return")
}

val := q.values[0]

if len(q.values) > 1 {
q.values = q.values[1:]
} else {
q.values = []T{}
q.values = []GroupedItem{}
}

return val, nil
}

func (q *Queue[T]) Len() int {
func (q *Queue) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()

Expand Down
Loading
Loading