Skip to content

Commit

Permalink
initial gossip map support
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Apr 7, 2024
1 parent e69fa22 commit 59d175c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
4 changes: 2 additions & 2 deletions cluster/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ breakLoop:
case <-ticker.C:
g.blockExpiredHeartbeats()
g.blockGracefullyLeft()
g.SetState(HearthbeatKey, &MemberHeartbeat{
g.SetState(HeartbeatKey, &MemberHeartbeat{
ActorStatistics: &ActorStatistics{
ActorCount: g.GetActorCount(),
},
Expand All @@ -264,7 +264,7 @@ func (g *Gossiper) blockExpiredHeartbeats() {
if g.cluster.Config.GossipInterval == 0 {
return
}
t, err := g.GetState(HearthbeatKey)
t, err := g.GetState(HeartbeatKey)
if err != nil {
g.cluster.Logger().Error("Could not get heartbeat state", slog.Any("error", err))
return
Expand Down
43 changes: 36 additions & 7 deletions cluster/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cluster

import (
"fmt"
"google.golang.org/protobuf/types/known/anypb"
"log/slog"
"math/rand"
"reflect"
Expand All @@ -16,7 +17,7 @@ import (

const (
TopologyKey string = "topology"
HearthbeatKey string = "heathbeat"
HeartbeatKey string = "heartbeat"
GracefullyLeftKey string = "left"
)

Expand Down Expand Up @@ -294,16 +295,44 @@ func (inf *Informer) throttledLog(counter int32) {
}

func (inf *Informer) SetMapState(stateKey string, mapKey string, value proto.Message) {
//TODO implement me
panic("implement me")
gmap := inf.getGossipMap(stateKey)
v, err := anypb.New(value)
if err != nil {
inf.logger.Error("Failed to create Any", slog.Any("error", err))
}
gmap.Items[mapKey] = v

inf.SetState(stateKey, gmap)
}

func (inf *Informer) RemoveMapState(stateKey string, mapKey string) {
//TODO implement me
panic("implement me")
gmap := inf.getGossipMap(stateKey)

delete(gmap.Items, mapKey)

inf.SetState(stateKey, gmap)
}

func (inf *Informer) GetMapKeys(stateKey string) []string {
//TODO implement me
panic("implement me")
gmap := inf.getGossipMap(stateKey)

keys := make([]string, 0, len(gmap.Items))
for k := range gmap.Items {
keys = append(keys, k)
}

return keys
}

func (inf *Informer) getGossipMap(stateKey string) *GossipMap {
s := inf.GetState(stateKey)
mys := s[inf.myID]
gmap := &GossipMap{}
if mys != nil {
err := mys.Value.UnmarshalTo(gmap)
if err != nil {
inf.logger.Error("Failed to unmarshal GossipMap", slog.Any("error", err))
}
}
return gmap
}

0 comments on commit 59d175c

Please sign in to comment.