Skip to content

Add flap detection to circuits #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Breaker struct {

consecFailures int64
counts *window
flapper *Flapper
lastFailure int64
halfOpens int64
nextBackOff time.Duration
Expand All @@ -122,6 +123,7 @@ type Options struct {
ShouldTrip TripFunc
WindowTime time.Duration
WindowBuckets int
FlapRate float64
}

// NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc
Expand Down Expand Up @@ -157,6 +159,7 @@ func NewBreakerWithOptions(options *Options) *Breaker {
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
flapper: NewFlapper(options.FlapRate),
}
}

Expand Down Expand Up @@ -228,6 +231,7 @@ func (cb *Breaker) RemoveListener(listener chan ListenerEvent) bool {
// Trip will trip the circuit breaker. After Trip() is called, Tripped() will
// return true.
func (cb *Breaker) Trip() {
cb.flapper.Record(1)
atomic.StoreInt32(&cb.tripped, 1)
now := cb.Clock.Now()
atomic.StoreInt64(&cb.lastFailure, now.Unix())
Expand All @@ -237,6 +241,15 @@ func (cb *Breaker) Trip() {
// Reset will reset the circuit breaker. After Reset() is called, Tripped() will
// return false.
func (cb *Breaker) Reset() {
cb.flapper.Record(0)

if cb.flapper.Flapping() {
// Remain in half open and increase the backoff
atomic.StoreInt64(&cb.halfOpens, 0)
cb.nextBackOff = cb.BackOff.NextBackOff()
return
}

atomic.StoreInt32(&cb.broken, 0)
atomic.StoreInt32(&cb.tripped, 0)
atomic.StoreInt64(&cb.halfOpens, 0)
Expand Down
100 changes: 100 additions & 0 deletions flapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package circuit

import (
"sync"
"sync/atomic"
)

type Flapper struct {
lock sync.Mutex
changes int32
prevState int32
flapRate float64
}

func NewFlapper(flapRate float64) *Flapper {
return &Flapper{
flapRate: flapRate,
}
}

func (f *Flapper) Record(state int32) {
if f.flapRate == 0.0 {
return
}

f.lock.Lock()
f.changes <<= 1

if f.prevState != state {
f.changes |= 1
}

f.prevState = state
f.lock.Unlock()
}

func (f *Flapper) Rate() float64 {
if f.flapRate == 0.0 {
return 0.0
}

changes := atomic.LoadInt32(&f.changes)
changes &= changeMask

var firstBit, lastBit int32
var firstIdx, lastIdx int
var count int

for i := startBit; i > 0; i >>= 1 {
if i&changes > 0 {
if firstBit == 0 {
firstBit = i
firstIdx = count
}

lastBit = i
lastIdx = count
}

count++
}

if firstIdx == 0 && lastIdx == 0 {
return 0.0
}

if firstIdx == lastIdx {
return 1.0 / samples
}

spread := lastIdx - firstIdx
add := valueBase / float64(spread)

value := valueStart
multiplier := 1
for i := firstBit >> 1; i >= lastBit; i >>= 1 {
if i&changes > 0 {
value += (0.8 + (add * float64(multiplier)))
}
multiplier++
}

return value / samples
}

func (f *Flapper) Flapping() bool {
if f.flapRate == 0.0 {
return false
}

return f.Rate() >= f.flapRate
}

const (
changeMask = 0xFFFFF
startBit = int32(0x80000)
valueBase = 0.4
valueStart = 0.8
samples = 20.0
)
37 changes: 37 additions & 0 deletions flapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package circuit

import "testing"

func TestFlapRate(t *testing.T) {
f := NewFlapper(0.2)

f.Record(0) // 0
f.Record(0) // 1
f.Record(1) // 2
f.Record(0) // 3
f.Record(1) // 4
f.Record(1) // 5
f.Record(1) // 6
f.Record(1) // 7
f.Record(0) // 8
f.Record(0) // 9
f.Record(0) // 10
f.Record(1) // 11
f.Record(1) // 12
f.Record(1) // 13
f.Record(1) // 14
f.Record(0) // 15
f.Record(0) // 16
f.Record(0) // 17
f.Record(1) // 18
f.Record(1) // 19

rate := f.Rate()
if rate != 0.33875 {
t.Fatalf("expected 0.33875, got %f", rate)
}

if !f.Flapping() {
t.Fatal("expected flapper to be flapping but it wasn't")
}
}