Skip to content

Commit

Permalink
gossip map fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Apr 8, 2024
1 parent f4cee9b commit 70c8559
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 51 deletions.
2 changes: 2 additions & 0 deletions cluster/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cluster

import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

// customary type that defines a states sender callback.
Expand All @@ -17,6 +18,7 @@ type GossipStateStorer interface {
SetMapState(stateKey string, mapKey string, value proto.Message)
RemoveMapState(stateKey string, mapKey string)
GetMapKeys(stateKey string) []string
GetMapState(stateKey string, mapKey string) *anypb.Any
}

// This interface must be implemented by any value that
Expand Down
27 changes: 25 additions & 2 deletions cluster/gossip_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (ga *GossipActor) Receive(ctx actor.Context) {
ga.onSetGossipState(r, ctx)
case *SetGossipMapState:
ga.onSetGossipMapState(r, ctx)
case *GetGossipMapStateRequest:
ga.onGetGossipMapStateRequest(r, ctx)
case *RemoveGossipMapState:
ga.onRemoveGossipMapState(r, ctx)
case *GetGossipMapKeysRequest:
Expand Down Expand Up @@ -210,16 +212,37 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem
}

func (ga *GossipActor) onSetGossipMapState(r *SetGossipMapState, ctx actor.Context) {
if ga.throttler() == actor.Open {
ctx.Logger().Debug("Setting GossipMapState", slog.String("key", r.MapKey), slog.Any("message", r.Value))
}
ga.gossip.SetMapState(r.GossipStateKey, r.MapKey, r.Value)
}

func (ga *GossipActor) onRemoveGossipMapState(r *RemoveGossipMapState, ctx actor.Context) {
if ga.throttler() == actor.Open {
ctx.Logger().Debug("Removing GossipMapState", slog.String("key", r.MapKey))
}
ga.gossip.RemoveMapState(r.GossipStateKey, r.MapKey)
}

func (ga *GossipActor) onGetGossipMapKeys(r *GetGossipMapKeysRequest, ctx actor.Context) {
if ga.throttler() == actor.Open {
ctx.Logger().Debug("Getting GossipMapKeys", slog.String("key", r.GossipStateKey))
}
keys := ga.gossip.GetMapKeys(r.GossipStateKey)
res := &GetGossipMapKeysResponse{
MapKeys: keys,
}
ctx.Respond(res)
}

res := ga.gossip.GetMapKeys(r.GossipStateKey)

func (ga *GossipActor) onGetGossipMapStateRequest(r *GetGossipMapStateRequest, ctx actor.Context) {
if ga.throttler() == actor.Open {
ctx.Logger().Debug("Setting GossipMapState", slog.String("key", r.MapKey))
}
a := ga.gossip.GetMapState(r.GossipStateKey, r.MapKey)
res := &GetGossipMapStateResponse{
Value: a,
}
ctx.Respond(res)
}
115 changes: 102 additions & 13 deletions cluster/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newGossiper(cl *Cluster, opts ...Option) (*Gossiper, error) {

func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug("Gossiper getting state", slog.String("key", key), slog.String("remote", g.pid.String()))
g.cluster.Logger().Debug("Gossiper getting state", slog.String("key", key), slog.String("gossipPid", g.pid.String()))
}

msg := NewGetGossipStateRequest(key)
Expand All @@ -79,13 +79,13 @@ func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
if err != nil {
switch err {
case actor.ErrTimeout:
g.cluster.Logger().Error("Could not get a response from GossipActor: request timeout", slog.Any("error", err), slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("Could not get a response from GossipActor: request timeout", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return nil, err
case actor.ErrDeadLetter:
g.cluster.Logger().Error("remote no longer exists", slog.Any("error", err), slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("remote no longer exists", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return nil, err
default:
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return nil, err
}
}
Expand All @@ -94,31 +94,120 @@ func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
response, ok := r.(*GetGossipStateResponse)
if !ok {
err := fmt.Errorf("could not promote %T interface to GetGossipStateResponse", r)
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return nil, err
}

return response.State, nil
}

// SetState Sends fire and forget message to update member state
func (g *Gossiper) SetState(key string, value proto.Message) {
func (g *Gossiper) SetState(gossipStateKey string, value proto.Message) {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
g.cluster.Logger().Info("Gossiper setting state", slog.String("gossipStateKey", gossipStateKey), slog.String("gossipPid", g.pid.String()))
}

if g.pid == nil {
return
}

msg := NewGossipStateKey(key, value)
msg := NewGossipStateKey(gossipStateKey, value)
g.cluster.ActorSystem.Root.Send(g.pid, &msg)
}

func (g *Gossiper) SetMapState(gossipStateKey string, mapKey string, value proto.Message) {
if g.throttler() == actor.Open {
g.cluster.Logger().Info("Gossiper setting map state", slog.String("gossipStateKey", gossipStateKey), slog.String("gossipPid", g.pid.String()))
}

if g.pid == nil {
return
}

msg := SetGossipMapState{
GossipStateKey: gossipStateKey,
MapKey: mapKey,
Value: value,
}

g.cluster.ActorSystem.Root.Send(g.pid, &msg)
}

func (g *Gossiper) GetMapState(gossipStateKey string, mapKey string) *anypb.Any {
if g.throttler() == actor.Open {
g.cluster.Logger().Info("Gossiper setting map state", slog.String("gossipStateKey", gossipStateKey), slog.String("gossipPid", g.pid.String()))
}

msg := GetGossipMapStateRequest{
GossipStateKey: gossipStateKey,
MapKey: mapKey,
}

x, err := g.cluster.ActorSystem.Root.RequestFuture(g.pid, &msg, g.cluster.Config.TimeoutTime).Result()
if err != nil {
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return nil
}
//cast x to GetGossipMapStateResponse
response, ok := x.(*GetGossipMapStateResponse)
if !ok {
err := fmt.Errorf("could not promote %T interface to GetGossipMapStateResponse", x)
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return nil
}
return response.Value
}

func (g *Gossiper) RemoveMapState(gossipStateKey string, mapKey string) {
if g.throttler() == actor.Open {
g.cluster.Logger().Info("Gossiper setting map state", slog.String("gossipStateKey", gossipStateKey), slog.String("gossipPid", g.pid.String()))
}

if g.pid == nil {
return
}

msg := RemoveGossipMapState{
GossipStateKey: gossipStateKey,
MapKey: mapKey,
}

g.cluster.ActorSystem.Root.Send(g.pid, &msg)
}

func (g *Gossiper) GetMapKeys(gossipStateKey string) []string {
if g.throttler() == actor.Open {
g.cluster.Logger().Info("Gossiper setting map state", slog.String("gossipStateKey", gossipStateKey), slog.String("gossipPid", g.pid.String()))
}

if g.pid == nil {
return []string{}
}

msg := GetGossipMapKeysRequest{
GossipStateKey: gossipStateKey,
}

res, err := g.cluster.ActorSystem.Root.RequestFuture(g.pid, &msg, g.cluster.Config.TimeoutTime).Result()

if err != nil {
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return []string{}
}
//cast res to GetGossipMapKeysResponse
response, ok := res.(*GetGossipMapKeysResponse)
if !ok {
err := fmt.Errorf("could not promote %T interface to GetGossipMapKeysResponse", res)
g.cluster.Logger().Error("Could not get a response from GossipActor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return []string{}
}
return response.MapKeys
}

// SetStateRequest Sends a Request (that blocks) to update member state
func (g *Gossiper) SetStateRequest(key string, value proto.Message) error {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("gossipPid", g.pid.String()))
}

if g.pid == nil {
Expand All @@ -129,18 +218,18 @@ func (g *Gossiper) SetStateRequest(key string, value proto.Message) error {
r, err := g.cluster.ActorSystem.Root.RequestFuture(g.pid, &msg, g.cluster.Config.TimeoutTime).Result()
if err != nil {
if err == actor.ErrTimeout {
g.cluster.Logger().Error("Could not get a response from Gossiper Actor: request timeout", slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("Could not get a response from Gossiper Actor: request timeout", slog.String("gossipPid", g.pid.String()))
return err
}
g.cluster.Logger().Error("Could not get a response from Gossiper Actor", slog.Any("error", err), slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("Could not get a response from Gossiper Actor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return err
}

// try to cast the response to SetGossipStateResponse concrete value
_, ok := r.(*SetGossipStateResponse)
if !ok {
err := fmt.Errorf("could not promote %T interface to SetGossipStateResponse", r)
g.cluster.Logger().Error("Could not get a response from Gossip Actor", slog.Any("error", err), slog.String("remote", g.pid.String()))
g.cluster.Logger().Error("Could not get a response from Gossip Actor", slog.Any("error", err), slog.String("gossipPid", g.pid.String()))
return err
}
return nil
Expand Down Expand Up @@ -311,5 +400,5 @@ func (g *Gossiper) blockGracefullyLeft() {
}

func (g *Gossiper) throttledLog(counter int32) {
g.cluster.Logger().Debug("Gossiper Setting State", slog.String("remote", g.pid.String()), slog.Int("throttled", int(counter)))
g.cluster.Logger().Debug("Gossiper Setting State", slog.String("gossipPid", g.pid.String()), slog.Int("throttled", int(counter)))
}
9 changes: 9 additions & 0 deletions cluster/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,19 @@ func (inf *Informer) SetMapState(stateKey string, mapKey string, value proto.Mes
if err != nil {
inf.logger.Error("Failed to create Any", slog.Any("error", err))
}

gmap.Items[mapKey] = v

inf.SetState(stateKey, gmap)
}

func (inf *Informer) GetMapState(stateKey string, mapKey string) *anypb.Any {
gmap := inf.getGossipMap(stateKey)

a := gmap.Items[mapKey]
return a
}

func (inf *Informer) RemoveMapState(stateKey string, mapKey string) {
gmap := inf.getGossipMap(stateKey)

Expand All @@ -328,6 +336,7 @@ func (inf *Informer) getGossipMap(stateKey string) *GossipMap {
s := inf.GetState(stateKey)
mys := s[inf.myID]
gmap := &GossipMap{}
gmap.Items = make(map[string]*anypb.Any)
if mys != nil {
err := mys.Value.UnmarshalTo(gmap)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cluster/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

// Used to query the GossipActor about a given key status
Expand Down Expand Up @@ -40,6 +41,17 @@ type SetGossipMapState struct {
Value proto.Message
}

// Used to query the Gossip State containing GossipMap data type in the GossipActor
type GetGossipMapStateRequest struct {
GossipStateKey string
MapKey string
}

// Used by the GossipActor to send back the GossipMap value of a given key
type GetGossipMapStateResponse struct {
Value *anypb.Any
}

// Used to remove Gossip State containing GossipMap data type in the GossipActor
type RemoveGossipMapState struct {
GossipStateKey string
Expand Down
7 changes: 3 additions & 4 deletions examples/cluster-gossip/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ require (

replace github.com/asynkron/protoactor-go => ../..

require (
github.com/asynkron/protoactor-go v0.0.0-20240406090656-8c90bda12e81
github.com/lmittmann/tint v1.0.3
)
require github.com/asynkron/protoactor-go v0.0.0-20240408071539-f4cee9b2a813

require github.com/lmittmann/tint v1.0.3

require (
github.com/Workiva/go-datastructures v1.1.1 // indirect
Expand Down
Loading

0 comments on commit 70c8559

Please sign in to comment.