Skip to content

Commit

Permalink
getting there
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Apr 15, 2022
1 parent e8af5c4 commit f1432ca
Showing 1 changed file with 32 additions and 28 deletions.
60 changes: 32 additions & 28 deletions cluster/gossip_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package cluster

import (
"google.golang.org/protobuf/proto"
"time"

"github.com/asynkron/gofun/set"
Expand Down Expand Up @@ -77,7 +76,7 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C
}

func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
plog.Debug("Gossip request", log.PID("sender", ctx.Sender()))
plog.Debug("OnGossipRequest", log.PID("sender", ctx.Sender()))
ga.ReceiveState(r.State, ctx)

if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) {
Expand All @@ -100,31 +99,36 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
return
}

msg := GossipResponse{
State: memberState.State,
}
future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)

ctx.ReenterAfter(future, func(res interface{}, err error) {
if err != nil {
plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
return
}

if _, ok := res.(*GossipResponseAck); ok {
memberState.CommitOffsets()
return
}

m, ok := res.(proto.Message)
if !ok {
plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
return
}
n := string(proto.MessageName(m).Name())

plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r))
})
ctx.Respond(&GossipResponse{})
return

//turn off acking for now

//msg := GossipResponse{
// State: memberState.State,
//}
//future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)
//
//ctx.ReenterAfter(future, func(res interface{}, err error) {
// if err != nil {
// plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
// return
// }
//
// if _, ok := res.(*GossipResponseAck); ok {
// memberState.CommitOffsets()
// return
// }
//
// m, ok := res.(proto.Message)
// if !ok {
// plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
// return
// }
// n := string(proto.MessageName(m).Name())
//
// plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r))
//})
}

func (ga *GossipActor) onSetGossipStateKey(r *SetGossipStateKey, ctx actor.Context) {
Expand Down Expand Up @@ -153,7 +157,7 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)

func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *MemberStateDelta, ctx actor.Context) {
pid := actor.NewPID(member.Address(), DefaultGossipActorName)
plog.Info("Sending GossipRequest", log.String("MemberId", member.Id))
plog.Debug("Sending GossipRequest", log.String("MemberId", member.Id))

// a short timeout is massively important, we cannot afford hanging around waiting
// for timeout, blocking other gossips from getting through
Expand Down

0 comments on commit f1432ca

Please sign in to comment.