Skip to content

Commit

Permalink
fix: correct algorithms for exponential and linear backoff and channe…
Browse files Browse the repository at this point in the history
…l drain (#77)
  • Loading branch information
cgorenflo authored Jul 6, 2023
1 parent c74bd35 commit b7aacc1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
16 changes: 11 additions & 5 deletions backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ type BackOff func(currentRetryCount int) time.Duration
func ExponentialBackOff(minTimeout time.Duration) BackOff {
return func(currentRetryCount int) time.Duration {
jitter := rand.Float64()
strategy := 1 << currentRetryCount
backoff := (1 + float64(strategy)*jitter) * minTimeout.Seconds() * float64(time.Second)
jitterMax := 200 * time.Millisecond
if currentRetryCount < 1 {
currentRetryCount = 1
}
strategy := 1 << (currentRetryCount - 1)
backoff := (float64(strategy) * float64(minTimeout.Nanoseconds())) + (jitter * float64(jitterMax.Nanoseconds()))

return time.Duration(backoff)
}
Expand All @@ -23,9 +27,11 @@ func ExponentialBackOff(minTimeout time.Duration) BackOff {
func LinearBackOff(minTimeout time.Duration) BackOff {
return func(currentRetryCount int) time.Duration {
jitter := rand.Float64()
strategy := float64(currentRetryCount)

backoff := (1 + strategy*jitter) * minTimeout.Seconds() * float64(time.Second)
jitterMax := 200 * time.Millisecond
if currentRetryCount < 1 {
currentRetryCount = 1
}
backoff := (float64(currentRetryCount) * float64(minTimeout.Nanoseconds())) + (jitter * float64(jitterMax.Nanoseconds()))
return time.Duration(backoff)
}
}
21 changes: 20 additions & 1 deletion chans/chans.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,27 @@ func RangeBig(from, to *big.Int) <-chan *big.Int {
return newCh
}

// DrainOpen enumerates all items from the channel and discards them.
// Returns number of items drained as soon as channel is empty.
func DrainOpen[T any](channel <-chan T) int {
drained := 0

for {
select {
case _, ok := <-channel:
if ok {
drained++
} else {
return drained
}
default:
return drained
}
}
}

// Drain enumerates all items from the channel and discards them.
// Returns number of items drained.
// Blocks until the channel is closed and returns number of items drained.
func Drain[T any](channel <-chan T) int {
drained := 0
for range channel {
Expand Down
13 changes: 12 additions & 1 deletion chans/chans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestPushPop(t *testing.T) {

func TestDrain(t *testing.T) {
c := make(chan string, 10)

c <- "a"
c <- "b"
c <- "c"
Expand All @@ -214,3 +214,14 @@ func TestDrain(t *testing.T) {
_, ok := <-c
assert.False(t, ok)
}

func TestDrainOpen(t *testing.T) {
o := make(chan string, 10)

o <- "a"
o <- "b"
o <- "c"
o <- "d"

assert.Equal(t, 4, chans.DrainOpen(o))
}

0 comments on commit b7aacc1

Please sign in to comment.