From 3a36d1b68bdcb0be4440ad9c0980966929fd5e83 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 16:20:15 +0200 Subject: [PATCH 1/7] enable gossip again --- _examples/cluster-broadcast/node1/main.go | 2 +- _examples/cluster-broadcast/node2/main.go | 2 +- cluster/cluster.go | 18 +++++++++--------- .../automanaged/automanaged.go | 3 ++- .../clusterproviders/automanaged/node_model.go | 10 ++-------- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/_examples/cluster-broadcast/node1/main.go b/_examples/cluster-broadcast/node1/main.go index 78dae54e0..94e2a2f01 100644 --- a/_examples/cluster-broadcast/node1/main.go +++ b/_examples/cluster-broadcast/node1/main.go @@ -9,7 +9,7 @@ import ( console "github.com/asynkron/goconsole" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/cluster" - automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged" + "github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged" "github.com/asynkron/protoactor-go/cluster/identitylookup/disthash" "github.com/asynkron/protoactor-go/remote" ) diff --git a/_examples/cluster-broadcast/node2/main.go b/_examples/cluster-broadcast/node2/main.go index 3cf24d4cb..15accce30 100644 --- a/_examples/cluster-broadcast/node2/main.go +++ b/_examples/cluster-broadcast/node2/main.go @@ -6,7 +6,7 @@ import ( "cluster-broadcast/shared" - automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged" + "github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged" "github.com/asynkron/protoactor-go/cluster/identitylookup/disthash" console "github.com/asynkron/goconsole" diff --git a/cluster/cluster.go b/cluster/cluster.go index 50f93db55..85f1fee64 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -11,7 +11,7 @@ import ( "github.com/asynkron/protoactor-go/remote" ) -var extensionId = extensions.NextExtensionID() +var extensionID = extensions.NextExtensionID() type Cluster struct { ActorSystem *actor.ActorSystem @@ -63,12 +63,12 @@ func (c *Cluster) subscribeToTopologyEvents() { } func (c *Cluster) ExtensionID() extensions.ExtensionID { - return extensionId + return extensionID } //goland:noinspection GoUnusedExportedFunction func GetCluster(actorSystem *actor.ActorSystem) *Cluster { - c := actorSystem.Extensions.Get(extensionId) + c := actorSystem.Extensions.Get(extensionID) return c.(*Cluster) } @@ -87,17 +87,17 @@ func (c *Cluster) StartMember() { c.Remote.Start() address := c.ActorSystem.Address() - plog.Info("Starting Proto.Actor cluster member", log.String("address", address)) + plog.Info("Starting Proto.Actor cluster member", log.String("id", c.ActorSystem.ID), log.String("address", address)) c.IdentityLookup = cfg.IdentityLookup c.IdentityLookup.Setup(c, c.GetClusterKinds(), false) // TODO: Disable Gossip for now until API changes are done - // gossiper must be started whenever any topology events starts flowing - // if err := c.Gossip.StartGossiping(); err != nil { - // panic(err) - // } - // c.MemberList.InitializeTopologyConsensus() + //gossiper must be started whenever any topology events starts flowing + if err := c.Gossip.StartGossiping(); err != nil { + panic(err) + } + c.MemberList.InitializeTopologyConsensus() if err := cfg.ClusterProvider.StartMember(c); err != nil { panic(err) diff --git a/cluster/clusterproviders/automanaged/automanaged.go b/cluster/clusterproviders/automanaged/automanaged.go index c4fe2c8d6..600a33cb2 100644 --- a/cluster/clusterproviders/automanaged/automanaged.go +++ b/cluster/clusterproviders/automanaged/automanaged.go @@ -380,5 +380,6 @@ func (p *AutoManagedProvider) isActiveProviderRunning() bool { } func (p *AutoManagedProvider) getCurrentNode() *NodeModel { - return NewNode(p.clusterName, p.address, p.memberPort, p.autoManagePort, p.knownKinds) + + return NewNode(p.clusterName, p.cluster.ActorSystem.ID, p.address, p.memberPort, p.autoManagePort, p.knownKinds) } diff --git a/cluster/clusterproviders/automanaged/node_model.go b/cluster/clusterproviders/automanaged/node_model.go index f68d71644..a06c4b226 100644 --- a/cluster/clusterproviders/automanaged/node_model.go +++ b/cluster/clusterproviders/automanaged/node_model.go @@ -1,7 +1,5 @@ package automanaged -import "fmt" - // NodeModel represents a node in the cluster type NodeModel struct { ID string `json:"id"` @@ -13,9 +11,9 @@ type NodeModel struct { } // NewNode returns a new node for the cluster -func NewNode(clusterName string, address string, port int, autoManPort int, kind []string) *NodeModel { +func NewNode(clusterName string, id string, address string, port int, autoManPort int, kind []string) *NodeModel { return &NodeModel{ - ID: createNodeID(clusterName, address, port), + ID: id, ClusterName: clusterName, Address: address, Port: port, @@ -23,7 +21,3 @@ func NewNode(clusterName string, address string, port int, autoManPort int, kind Kinds: kind, } } - -func createNodeID(clusterName string, address string, port int) string { - return fmt.Sprintf("%v@%v:%v", clusterName, address, port) -} From 565605a1064398348fe43e795d4aad3c5fa921db Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 16:29:00 +0200 Subject: [PATCH 2/7] enable gossip again --- cluster/gossip_actor.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index 4abd203bb..be05f2023 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -151,11 +151,7 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem // for timeout, blocking other gossips from getting through msg := GossipRequest{ - // TODO: Uncomment this line when we replace the current "address:port" as ID - // with the proper ActorSystem.ID after new API refactor changes - // Oscar Campos: 2022-04-09 - // MemberId: ctx.ActorSystem().ID, - MemberId: member.Address(), + MemberId: member.Id, State: memberStateDelta.State, } future := ctx.RequestFuture(pid, &msg, ga.gossipRequestTimeout) From 14b9eddd84ebe18f5022091c3f6456c70d1ced42 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 16:49:50 +0200 Subject: [PATCH 3/7] getting there --- cluster/gossip_actor.go | 56 +++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index be05f2023..d6ed8c46f 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -102,21 +102,19 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { } future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout) - // wait until we get a response or an error from the future - resp, err := future.Result() - if err != nil { - plog.Error("onSendGossipState failed", log.Error(err)) - - return - } + ctx.ReenterAfter(future, func(res interface{}, err error) { + if err != nil { + plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) + } - if _, ok := resp.(*GossipResponseAck); ok { - memberState.CommitOffsets() + if _, ok := res.(*GossipResponseAck); ok { + memberState.CommitOffsets() - return - } + return + } - plog.Error("onSendGossipState received unknown response message", log.Message(r)) + plog.Error("onGossipRequest received unknown response message", log.Message(r)) + }) } func (ga *GossipActor) onSetGossipStateKey(r *SetGossipStateKey, ctx actor.Context) { @@ -156,28 +154,26 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem } future := ctx.RequestFuture(pid, &msg, ga.gossipRequestTimeout) - // wait until we get a response or an error from the future - r, err := future.Result() - if err != nil { - plog.Error("onSendGossipState failed", log.Error(err)) - - return - } + ctx.ReenterAfter(future, func(res interface{}, err error) { + if err != nil { + plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err)) + } - resp, ok := r.(*GossipResponse) - if !ok { - plog.Error("onSendGossipState received unknown response message", log.Message(r)) + resp, ok := res.(*GossipResponse) + if !ok { + plog.Error("sendGossipForMember received unknown response message", log.Message(resp)) - return - } + return + } - memberStateDelta.CommitOffsets() + memberStateDelta.CommitOffsets() - if resp.State != nil { - ga.ReceiveState(resp.State, ctx) + if resp.State != nil { + ga.ReceiveState(resp.State, ctx) - if ctx.Sender() != nil { - ctx.Send(ctx.Sender(), &GossipResponseAck{}) + if ctx.Sender() != nil { + ctx.Send(ctx.Sender(), &GossipResponseAck{}) + } } - } + }) } From bc410a9961ebe6d54be1821b51d911734e7c8ff7 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 17:14:10 +0200 Subject: [PATCH 4/7] getting there --- cluster/gossip_actor.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index d6ed8c46f..149fb98dd 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -3,6 +3,7 @@ package cluster import ( + "google.golang.org/protobuf/proto" "time" "github.com/asynkron/gofun/set" @@ -34,6 +35,8 @@ func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers // Receive method. func (ga *GossipActor) Receive(ctx actor.Context) { switch r := ctx.Message().(type) { + case *actor.Started: + //pass case *SetGossipStateKey: ga.onSetGossipStateKey(r, ctx) case *GetGossipStateRequest: @@ -105,15 +108,22 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { ctx.ReenterAfter(future, func(res interface{}, err error) { if err != nil { plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) + return } if _, ok := res.(*GossipResponseAck); ok { memberState.CommitOffsets() + return + } + m, ok := res.(proto.Message) + if !ok { + plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) return } + n := string(proto.MessageName(m).Name()) - plog.Error("onGossipRequest received unknown response message", log.Message(r)) + plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r)) }) } @@ -157,6 +167,7 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem ctx.ReenterAfter(future, func(res interface{}, err error) { if err != nil { plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err)) + return } resp, ok := res.(*GossipResponse) @@ -171,9 +182,9 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem if resp.State != nil { ga.ReceiveState(resp.State, ctx) - if ctx.Sender() != nil { - ctx.Send(ctx.Sender(), &GossipResponseAck{}) - } + } + if ctx.Sender() != nil { + ctx.Send(ctx.Sender(), &GossipResponseAck{}) } }) } From dd8ad5a09532b258ed02873fc79a08cc03e83568 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 17:44:44 +0200 Subject: [PATCH 5/7] getting there --- _examples/cluster-eventstream-broadcast/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_examples/cluster-eventstream-broadcast/main.go b/_examples/cluster-eventstream-broadcast/main.go index 9ed058f2e..048ef1929 100644 --- a/_examples/cluster-eventstream-broadcast/main.go +++ b/_examples/cluster-eventstream-broadcast/main.go @@ -6,7 +6,7 @@ import ( "strings" "time" - automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged" + "github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged" "github.com/asynkron/protoactor-go/cluster/identitylookup/disthash" console "github.com/asynkron/goconsole" From e8af5c4ba1f533a11fa7d2cc47a33b5396a687f2 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 19:42:03 +0200 Subject: [PATCH 6/7] getting there --- cluster/gossip_actor.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index 149fb98dd..68180d478 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -52,7 +52,7 @@ func (ga *GossipActor) Receive(ctx actor.Context) { case *ClusterTopology: ga.onClusterTopology(r) case *GossipResponse: - // noop: review after roger's work is done + plog.Error("GossipResponse should not be received by GossipActor") //it should be a response to a request default: plog.Warn("Gossip received unknown message request", log.Message(r)) } @@ -165,6 +165,10 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem future := ctx.RequestFuture(pid, &msg, ga.gossipRequestTimeout) ctx.ReenterAfter(future, func(res interface{}, err error) { + if ctx.Sender() != nil { + ctx.Send(ctx.Sender(), &GossipResponseAck{}) + } + if err != nil { plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err)) return @@ -183,8 +187,5 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem ga.ReceiveState(resp.State, ctx) } - if ctx.Sender() != nil { - ctx.Send(ctx.Sender(), &GossipResponseAck{}) - } }) } From f1432ca9b6e1dc98599f3f0a0652d3bd3774bcec Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Fri, 15 Apr 2022 19:51:20 +0200 Subject: [PATCH 7/7] getting there --- cluster/gossip_actor.go | 60 ++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index 68180d478..aa686d868 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -3,7 +3,6 @@ package cluster import ( - "google.golang.org/protobuf/proto" "time" "github.com/asynkron/gofun/set" @@ -77,7 +76,7 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C } func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { - plog.Debug("Gossip request", log.PID("sender", ctx.Sender())) + plog.Debug("OnGossipRequest", log.PID("sender", ctx.Sender())) ga.ReceiveState(r.State, ctx) if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) { @@ -100,31 +99,36 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { return } - msg := GossipResponse{ - State: memberState.State, - } - future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout) - - ctx.ReenterAfter(future, func(res interface{}, err error) { - if err != nil { - plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) - return - } - - if _, ok := res.(*GossipResponseAck); ok { - memberState.CommitOffsets() - return - } - - m, ok := res.(proto.Message) - if !ok { - plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) - return - } - n := string(proto.MessageName(m).Name()) - - plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r)) - }) + ctx.Respond(&GossipResponse{}) + return + + //turn off acking for now + + //msg := GossipResponse{ + // State: memberState.State, + //} + //future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout) + // + //ctx.ReenterAfter(future, func(res interface{}, err error) { + // if err != nil { + // plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) + // return + // } + // + // if _, ok := res.(*GossipResponseAck); ok { + // memberState.CommitOffsets() + // return + // } + // + // m, ok := res.(proto.Message) + // if !ok { + // plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err)) + // return + // } + // n := string(proto.MessageName(m).Name()) + // + // plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r)) + //}) } func (ga *GossipActor) onSetGossipStateKey(r *SetGossipStateKey, ctx actor.Context) { @@ -153,7 +157,7 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context) func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *MemberStateDelta, ctx actor.Context) { pid := actor.NewPID(member.Address(), DefaultGossipActorName) - plog.Info("Sending GossipRequest", log.String("MemberId", member.Id)) + plog.Debug("Sending GossipRequest", log.String("MemberId", member.Id)) // a short timeout is massively important, we cannot afford hanging around waiting // for timeout, blocking other gossips from getting through