Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Title: Implement Distributed Circuit Breaker #70

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go.mod
call-stack marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ module github.com/sony/gobreaker

go 1.12

require github.com/stretchr/testify v1.3.0
require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/redis/go-redis/v9 v9.6.2
github.com/stretchr/testify v1.3.0
)
19 changes: 19 additions & 0 deletions go.sum
call-stack marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,7 +1,26 @@
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.6.2 h1:w0uvkRbc9KpgD98zcvo5IrVUsn0lXpRMuhNgiHDJzdk=
github.com/redis/go-redis/v9 v9.6.2/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
9 changes: 9 additions & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ go 1.21
require github.com/stretchr/testify v1.8.4

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
)

require (
github.com/alicebob/miniredis/v2 v2.33.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.7.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 12 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.33.0 h1:uvTF0EDeu9RLnUEG27Db5I68ESoIxTiXbNUiji6lZrA=
github.com/alicebob/miniredis/v2 v2.33.0/go.mod h1:MhP4a3EU7aENRi9aO+tHfTBZicLqQevyi/DJpoj6mi0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
250 changes: 250 additions & 0 deletions v2/redis_circuit_breaker.go
call-stack marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package gobreaker

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

type CacheClient interface {
Get(ctx context.Context, key string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
}

// RedisCircuitBreaker extends CircuitBreaker with Redis-based state storage
type RedisCircuitBreaker[T any] struct {
*CircuitBreaker[T]
redisClient CacheClient
}

// RedisSettings extends Settings with Redis configuration
type RedisSettings struct {
Settings
RedisKey string
}

// NewRedisCircuitBreaker returns a new RedisCircuitBreaker configured with the given RedisSettings
func NewRedisCircuitBreaker[T any](redisClient CacheClient, settings RedisSettings) *RedisCircuitBreaker[T] {
cb := NewCircuitBreaker[T](settings.Settings)
return &RedisCircuitBreaker[T]{
CircuitBreaker: cb,
redisClient: redisClient,
}
}

// RedisState represents the CircuitBreaker state stored in Redis
type RedisState struct {
State State `json:"state"`
Generation uint64 `json:"generation"`
Counts Counts `json:"counts"`
Expiry time.Time `json:"expiry"`
}

func (rcb *RedisCircuitBreaker[T]) State(ctx context.Context) State {
if rcb.redisClient == nil {
return rcb.CircuitBreaker.State()
}

state, err := rcb.getRedisState(ctx)
if err != nil {
// Fallback to in-memory state if Redis fails
return rcb.CircuitBreaker.State()
}

now := time.Now()
currentState, _ := rcb.currentState(state, now)

// Update the state in Redis if it has changed
if currentState != state.State {
state.State = currentState
if err := rcb.setRedisState(ctx, state); err != nil {
// Log the error, but continue with the current state
fmt.Printf("Failed to update state in Redis: %v\n", err)
}
}

return state.State
}

// Execute runs the given request if the RedisCircuitBreaker accepts it
func (rcb *RedisCircuitBreaker[T]) Execute(ctx context.Context, req func() (T, error)) (T, error) {
if rcb.redisClient == nil {
return rcb.CircuitBreaker.Execute(req)
}
generation, err := rcb.beforeRequest(ctx)
if err != nil {
var zero T
return zero, err
}

defer func() {
e := recover()
if e != nil {
rcb.afterRequest(ctx, generation, false)
panic(e)
}
}()

result, err := req()
rcb.afterRequest(ctx, generation, rcb.isSuccessful(err))

return result, err
}

func (rcb *RedisCircuitBreaker[T]) beforeRequest(ctx context.Context) (uint64, error) {
state, err := rcb.getRedisState(ctx)
if err != nil {
return 0, err
}
now := time.Now()
currentState, generation := rcb.currentState(state, now)

if currentState != state.State {
rcb.setState(&state, currentState, now)
err = rcb.setRedisState(ctx, state)
if err != nil {
return 0, err
}
}

if currentState == StateOpen {
return generation, ErrOpenState
} else if currentState == StateHalfOpen && state.Counts.Requests >= rcb.maxRequests {
return generation, ErrTooManyRequests
}

state.Counts.onRequest()
err = rcb.setRedisState(ctx, state)
if err != nil {
return 0, err
}

return generation, nil
}

func (rcb *RedisCircuitBreaker[T]) afterRequest(ctx context.Context, before uint64, success bool) {
state, err := rcb.getRedisState(ctx)
if err != nil {
return
}
now := time.Now()
currentState, generation := rcb.currentState(state, now)
if generation != before {
return
}

if success {
rcb.onSuccess(&state, currentState, now)
} else {
rcb.onFailure(&state, currentState, now)
}

rcb.setRedisState(ctx, state)
}

func (rcb *RedisCircuitBreaker[T]) onSuccess(state *RedisState, currentState State, now time.Time) {
if state.State == StateOpen {
state.State = currentState
}

switch currentState {
case StateClosed:
state.Counts.onSuccess()
case StateHalfOpen:
state.Counts.onSuccess()
if state.Counts.ConsecutiveSuccesses >= rcb.maxRequests {
rcb.setState(state, StateClosed, now)
}
}
}

func (rcb *RedisCircuitBreaker[T]) onFailure(state *RedisState, currentState State, now time.Time) {
switch currentState {
case StateClosed:
state.Counts.onFailure()
if rcb.readyToTrip(state.Counts) {
rcb.setState(state, StateOpen, now)
}
case StateHalfOpen:
rcb.setState(state, StateOpen, now)
}
}

func (rcb *RedisCircuitBreaker[T]) currentState(state RedisState, now time.Time) (State, uint64) {
switch state.State {
case StateClosed:
if !state.Expiry.IsZero() && state.Expiry.Before(now) {
rcb.toNewGeneration(&state, now)
}
case StateOpen:
if state.Expiry.Before(now) {
rcb.setState(&state, StateHalfOpen, now)
}
}
return state.State, state.Generation
}

func (rcb *RedisCircuitBreaker[T]) setState(state *RedisState, newState State, now time.Time) {
if state.State == newState {
return
}

prev := state.State
state.State = newState

rcb.toNewGeneration(state, now)

if rcb.onStateChange != nil {
rcb.onStateChange(rcb.name, prev, newState)
}
}

func (rcb *RedisCircuitBreaker[T]) toNewGeneration(state *RedisState, now time.Time) {

state.Generation++
state.Counts.clear()

var zero time.Time
switch state.State {
case StateClosed:
if rcb.interval == 0 {
state.Expiry = zero
} else {
state.Expiry = now.Add(rcb.interval)
}
case StateOpen:
state.Expiry = now.Add(rcb.timeout)
default: // StateHalfOpen
state.Expiry = zero
}
}

func (rcb *RedisCircuitBreaker[T]) getRedisKey() string {
return "cb:" + rcb.name
}

func (rcb *RedisCircuitBreaker[T]) getRedisState(ctx context.Context) (RedisState, error) {
var state RedisState
data, err := rcb.redisClient.Get(ctx, rcb.getRedisKey()).Bytes()
if err == redis.Nil {
// Key doesn't exist, return default state
return RedisState{State: StateClosed}, nil
} else if err != nil {
return state, err
}

err = json.Unmarshal(data, &state)
return state, err
}

func (rcb *RedisCircuitBreaker[T]) setRedisState(ctx context.Context, state RedisState) error {
data, err := json.Marshal(state)
if err != nil {
return err
}

return rcb.redisClient.Set(ctx, rcb.getRedisKey(), data, 0).Err()
}
Loading
Loading