Skip to content

Commit

Permalink
initial gossip map support
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Apr 7, 2024
1 parent f2d271d commit e69fa22
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
3 changes: 3 additions & 0 deletions cluster/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type LocalStateSender func(memberStateDelta *MemberStateDelta, member *Member)
type GossipStateStorer interface {
GetState(key string) map[string]*GossipKeyValue
SetState(key string, value proto.Message)
SetMapState(stateKey string, mapKey string, value proto.Message)
RemoveMapState(stateKey string, mapKey string)
GetMapKeys(stateKey string) []string
}

// This interface must be implemented by any value that
Expand Down
14 changes: 6 additions & 8 deletions cluster/gossip_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (ga *GossipActor) Receive(ctx actor.Context) {
case *actor.Started, *actor.Stopping, *actor.Stopped:
// pass
case *SetGossipState:
ga.onSetGossipStateKey(r, ctx)
ga.onSetGossipState(r, ctx)
case *SetGossipMapState:
ga.onSetGossipMapState(r, ctx)
case *RemoveGossipMapState:
Expand Down Expand Up @@ -85,7 +85,7 @@ func (ga *GossipActor) onRemoveConsensusCheck(r *RemoveConsensusCheck) {
}

func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.Context) {
state := ga.gossip.GetState(r.Key)
state := ga.gossip.GetState(r.GossipStateKey)
res := NewGetGossipStateResponse(state)
ctx.Respond(&res)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
//})
}

func (ga *GossipActor) onSetGossipStateKey(r *SetGossipState, ctx actor.Context) {
func (ga *GossipActor) onSetGossipState(r *SetGossipState, ctx actor.Context) {
key, message := r.GossipStateKey, r.Value
ctx.Logger().Debug("Setting GossipState", slog.String("key", key), slog.Any("message", message))
ga.gossip.SetState(key, message)
Expand Down Expand Up @@ -210,18 +210,16 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem
}

func (ga *GossipActor) onSetGossipMapState(r *SetGossipMapState, ctx actor.Context) {

ga.gossip.SetMapState(r.GossipStateKey, r.MapKey, r.Value)
}

func (ga *GossipActor) onRemoveGossipMapState(r *RemoveGossipMapState, ctx actor.Context) {

ga.gossip.RemoveMapState(r.GossipStateKey, r.MapKey)
}

func (ga *GossipActor) onGetGossipMapKeys(r *GetGossipMapKeysRequest, ctx actor.Context) {

res := &GetGossipMapKeysResponse{
Keys: []string{},
}
res := ga.gossip.GetMapKeys(r.GossipStateKey)

ctx.Respond(res)
}
15 changes: 15 additions & 0 deletions cluster/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,18 @@ func (inf *Informer) commitPendingOffsets(offsets map[string]int64) {
func (inf *Informer) throttledLog(counter int32) {
inf.logger.Debug("[Gossip] Setting State", slog.Int("throttled", int(counter)))
}

func (inf *Informer) SetMapState(stateKey string, mapKey string, value proto.Message) {
//TODO implement me
panic("implement me")
}

func (inf *Informer) RemoveMapState(stateKey string, mapKey string) {
//TODO implement me
panic("implement me")
}

func (inf *Informer) GetMapKeys(stateKey string) []string {
//TODO implement me
panic("implement me")
}
6 changes: 3 additions & 3 deletions cluster/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

// Used to query the GossipActor about a given key status
type GetGossipStateRequest struct {
Key string
GossipStateKey string
}

// Create a new GetGossipStateRequest value and return it back
func NewGetGossipStateRequest(key string) GetGossipStateRequest {
request := GetGossipStateRequest{Key: key}
request := GetGossipStateRequest{GossipStateKey: key}
return request
}

Expand Down Expand Up @@ -53,7 +53,7 @@ type GetGossipMapKeysRequest struct {

// Used by the GossipActor to send back the keys in a GossipMap
type GetGossipMapKeysResponse struct {
Keys []string
MapKeys []string
}

// Create a new SetGossipState value with the given data and return it back
Expand Down

0 comments on commit e69fa22

Please sign in to comment.