Skip to content

Commit

Permalink
Use non generic result
Browse files Browse the repository at this point in the history
  • Loading branch information
ferglor committed Apr 5, 2024
1 parent e081699 commit 768c2d1
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 31 deletions.
35 changes: 18 additions & 17 deletions pkg/v3/runner/result.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,81 @@
package runner

import (
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
"sync"
)

type result[T any] struct {
type result struct {
// this struct type isn't expressly defined to run in a single thread or
// multiple threads so internally a mutex provides the thread safety
// guarantees in the case it is used in a multi-threaded way
mu sync.RWMutex
successes int
failures int
err error
values []T
values []ocr2keepers.CheckResult
}

func newResult[T any]() *result[T] {
return &result[T]{
values: make([]T, 0),
func newResult() *result {
return &result{
values: make([]ocr2keepers.CheckResult, 0),
}
}

func (r *result[T]) Successes() int {
func (r *result) Successes() int {
r.mu.RLock()
defer r.mu.RUnlock()

return r.successes
}

func (r *result[T]) AddSuccesses(v int) {
func (r *result) AddSuccesses(v int) {
r.mu.Lock()
defer r.mu.Unlock()

r.successes += v
}

func (r *result[T]) Failures() int {
func (r *result) Failures() int {
r.mu.RLock()
defer r.mu.RUnlock()

return r.failures
}

func (r *result[T]) AddFailures(v int) {
func (r *result) AddFailures(v int) {
r.mu.Lock()
defer r.mu.Unlock()

r.failures += v
}

func (r *result[T]) Err() error {
func (r *result) Err() error {
r.mu.RLock()
defer r.mu.RUnlock()

return r.err
}

func (r *result[T]) SetErr(err error) {
func (r *result) SetErr(err error) {
r.mu.Lock()
defer r.mu.Unlock()

r.err = err
}

func (r *result[T]) Total() int {
func (r *result) Total() int {
r.mu.RLock()
defer r.mu.RUnlock()

return r.successes + r.failures
}

func (r *result[T]) unsafeTotal() int {
func (r *result) unsafeTotal() int {
return r.successes + r.failures
}

func (r *result[T]) SuccessRate() float64 {
func (r *result) SuccessRate() float64 {
r.mu.RLock()
defer r.mu.RUnlock()

Expand All @@ -85,7 +86,7 @@ func (r *result[T]) SuccessRate() float64 {
return float64(r.successes) / float64(r.unsafeTotal())
}

func (r *result[T]) FailureRate() float64 {
func (r *result) FailureRate() float64 {
r.mu.RLock()
defer r.mu.RUnlock()

Expand All @@ -96,14 +97,14 @@ func (r *result[T]) FailureRate() float64 {
return float64(r.failures) / float64(r.unsafeTotal())
}

func (r *result[T]) Add(res T) {
func (r *result) Add(res ocr2keepers.CheckResult) {
r.mu.Lock()
defer r.mu.Unlock()

r.values = append(r.values, res)
}

func (r *result[T]) Values() []T {
func (r *result) Values() []ocr2keepers.CheckResult {
r.mu.RLock()
defer r.mu.RUnlock()

Expand Down
28 changes: 17 additions & 11 deletions pkg/v3/runner/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runner

import (
"fmt"
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
"sync"
"testing"
"time"
Expand All @@ -11,7 +12,7 @@ import (

func TestResultAdder(t *testing.T) {
t.Run("Successes", func(t *testing.T) {
resultStruct := newResult[int]()
resultStruct := newResult()
expected := 100

for x := 0; x < expected; x++ {
Expand All @@ -22,7 +23,7 @@ func TestResultAdder(t *testing.T) {
})

t.Run("Failures", func(t *testing.T) {
resultStruct := newResult[int]()
resultStruct := newResult()
expected := 100

for x := 0; x < expected; x++ {
Expand All @@ -33,7 +34,7 @@ func TestResultAdder(t *testing.T) {
})

t.Run("Errors", func(t *testing.T) {
resultStruct := newResult[int]()
resultStruct := newResult()
expected := fmt.Errorf("expected error")

resultStruct.SetErr(fmt.Errorf("initial error"))
Expand All @@ -43,7 +44,7 @@ func TestResultAdder(t *testing.T) {
})

t.Run("Rates", func(t *testing.T) {
resultStruct := newResult[int]()
resultStruct := newResult()

for x := 1; x <= 100; x++ {
resultStruct.AddSuccesses(1)
Expand All @@ -63,10 +64,13 @@ func TestResultAdder(t *testing.T) {
})

t.Run("AddResults", func(t *testing.T) {
resultStruct := newResult[int]()
resultStruct := newResult()

expected := []int{}
expected := []ocr2keepers.CheckResult{}
for x := 0; x <= 100; x++ {
x := ocr2keepers.CheckResult{
WorkID: fmt.Sprintf("%d", x),
}
resultStruct.Add(x)
expected = append(expected, x)
}
Expand All @@ -78,11 +82,11 @@ func TestResultAdder(t *testing.T) {
func TestConcurrentResult(t *testing.T) {
var wg sync.WaitGroup

resultStruct := newResult[int]()
resultStruct := newResult()

// add successes and failures in one thread
wg.Add(1)
go func(r *result[int]) {
go func(r *result) {
<-time.After(time.Second)
for x := 1; x <= 10000; x++ {
resultStruct.AddSuccesses(1)
Expand All @@ -96,17 +100,19 @@ func TestConcurrentResult(t *testing.T) {

// add values in another
wg.Add(1)
go func(r *result[int]) {
go func(r *result) {
<-time.After(time.Second)
for x := 1; x <= 12000; x++ {
resultStruct.Add(x)
resultStruct.Add(ocr2keepers.CheckResult{
WorkID: fmt.Sprintf("%d", x),
})
}
wg.Done()
}(resultStruct)

// repeatedly ask for the stats in another
wg.Add(1)
go func(r *result[int]) {
go func(r *result) {
<-time.After(time.Second)
for x := 0; x <= 12000; x++ {
_ = resultStruct.Failures()
Expand Down
6 changes: 3 additions & 3 deletions pkg/v3/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (o *Runner) Close() error {
}

// parallelCheck should be satisfied by the Runner
func (o *Runner) parallelCheck(ctx context.Context, payloads []ocr2keepers.UpkeepPayload) (*result[ocr2keepers.CheckResult], error) {
result := newResult[ocr2keepers.CheckResult]()
func (o *Runner) parallelCheck(ctx context.Context, payloads []ocr2keepers.UpkeepPayload) (*result, error) {
result := newResult()

if len(payloads) == 0 {
return result, nil
Expand Down Expand Up @@ -192,7 +192,7 @@ func (o *Runner) wrapWorkerFunc() func(context.Context, []ocr2keepers.UpkeepPayl
}
}

func (o *Runner) wrapAggregate(r *result[ocr2keepers.CheckResult]) func([]ocr2keepers.CheckResult, error) {
func (o *Runner) wrapAggregate(r *result) func([]ocr2keepers.CheckResult, error) {
return func(results []ocr2keepers.CheckResult, err error) {
if err == nil {
r.AddSuccesses(1)
Expand Down

0 comments on commit 768c2d1

Please sign in to comment.