Skip to content

Commit

Permalink
Merge pull request #635 from asynkron/enable_gossip
Browse files Browse the repository at this point in the history
Enable gossip
  • Loading branch information
rogeralsing authored Apr 15, 2022
2 parents 77b540e + f1432ca commit e9a39cd
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 66 deletions.
2 changes: 1 addition & 1 deletion _examples/cluster-broadcast/node1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
console "github.com/asynkron/goconsole"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
"github.com/asynkron/protoactor-go/remote"
)
Expand Down
2 changes: 1 addition & 1 deletion _examples/cluster-broadcast/node2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"cluster-broadcast/shared"

automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"

console "github.com/asynkron/goconsole"
Expand Down
2 changes: 1 addition & 1 deletion _examples/cluster-eventstream-broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"time"

automanaged "github.com/asynkron/protoactor-go/cluster/clusterproviders/_automanaged"
"github.com/asynkron/protoactor-go/cluster/clusterproviders/automanaged"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"

console "github.com/asynkron/goconsole"
Expand Down
18 changes: 9 additions & 9 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/asynkron/protoactor-go/remote"
)

var extensionId = extensions.NextExtensionID()
var extensionID = extensions.NextExtensionID()

type Cluster struct {
ActorSystem *actor.ActorSystem
Expand Down Expand Up @@ -63,12 +63,12 @@ func (c *Cluster) subscribeToTopologyEvents() {
}

func (c *Cluster) ExtensionID() extensions.ExtensionID {
return extensionId
return extensionID
}

//goland:noinspection GoUnusedExportedFunction
func GetCluster(actorSystem *actor.ActorSystem) *Cluster {
c := actorSystem.Extensions.Get(extensionId)
c := actorSystem.Extensions.Get(extensionID)

return c.(*Cluster)
}
Expand All @@ -87,17 +87,17 @@ func (c *Cluster) StartMember() {
c.Remote.Start()

address := c.ActorSystem.Address()
plog.Info("Starting Proto.Actor cluster member", log.String("address", address))
plog.Info("Starting Proto.Actor cluster member", log.String("id", c.ActorSystem.ID), log.String("address", address))

c.IdentityLookup = cfg.IdentityLookup
c.IdentityLookup.Setup(c, c.GetClusterKinds(), false)

// TODO: Disable Gossip for now until API changes are done
// gossiper must be started whenever any topology events starts flowing
// if err := c.Gossip.StartGossiping(); err != nil {
// panic(err)
// }
// c.MemberList.InitializeTopologyConsensus()
//gossiper must be started whenever any topology events starts flowing
if err := c.Gossip.StartGossiping(); err != nil {
panic(err)
}
c.MemberList.InitializeTopologyConsensus()

if err := cfg.ClusterProvider.StartMember(c); err != nil {
panic(err)
Expand Down
3 changes: 2 additions & 1 deletion cluster/clusterproviders/automanaged/automanaged.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,5 +380,6 @@ func (p *AutoManagedProvider) isActiveProviderRunning() bool {
}

func (p *AutoManagedProvider) getCurrentNode() *NodeModel {
return NewNode(p.clusterName, p.address, p.memberPort, p.autoManagePort, p.knownKinds)

return NewNode(p.clusterName, p.cluster.ActorSystem.ID, p.address, p.memberPort, p.autoManagePort, p.knownKinds)
}
10 changes: 2 additions & 8 deletions cluster/clusterproviders/automanaged/node_model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package automanaged

import "fmt"

// NodeModel represents a node in the cluster
type NodeModel struct {
ID string `json:"id"`
Expand All @@ -13,17 +11,13 @@ type NodeModel struct {
}

// NewNode returns a new node for the cluster
func NewNode(clusterName string, address string, port int, autoManPort int, kind []string) *NodeModel {
func NewNode(clusterName string, id string, address string, port int, autoManPort int, kind []string) *NodeModel {
return &NodeModel{
ID: createNodeID(clusterName, address, port),
ID: id,
ClusterName: clusterName,
Address: address,
Port: port,
AutoManagePort: autoManPort,
Kinds: kind,
}
}

func createNodeID(clusterName string, address string, port int) string {
return fmt.Sprintf("%v@%v:%v", clusterName, address, port)
}
98 changes: 53 additions & 45 deletions cluster/gossip_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers
// Receive method.
func (ga *GossipActor) Receive(ctx actor.Context) {
switch r := ctx.Message().(type) {
case *actor.Started:
//pass
case *SetGossipStateKey:
ga.onSetGossipStateKey(r, ctx)
case *GetGossipStateRequest:
Expand All @@ -49,7 +51,7 @@ func (ga *GossipActor) Receive(ctx actor.Context) {
case *ClusterTopology:
ga.onClusterTopology(r)
case *GossipResponse:
// noop: review after roger's work is done
plog.Error("GossipResponse should not be received by GossipActor") //it should be a response to a request
default:
plog.Warn("Gossip received unknown message request", log.Message(r))
}
Expand All @@ -74,7 +76,7 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C
}

func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
plog.Debug("Gossip request", log.PID("sender", ctx.Sender()))
plog.Debug("OnGossipRequest", log.PID("sender", ctx.Sender()))
ga.ReceiveState(r.State, ctx)

if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) {
Expand All @@ -97,26 +99,36 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
return
}

msg := GossipResponse{
State: memberState.State,
}
future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)

// wait until we get a response or an error from the future
resp, err := future.Result()
if err != nil {
plog.Error("onSendGossipState failed", log.Error(err))

return
}

if _, ok := resp.(*GossipResponseAck); ok {
memberState.CommitOffsets()

return
}

plog.Error("onSendGossipState received unknown response message", log.Message(r))
ctx.Respond(&GossipResponse{})
return

//turn off acking for now

//msg := GossipResponse{
// State: memberState.State,
//}
//future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)
//
//ctx.ReenterAfter(future, func(res interface{}, err error) {
// if err != nil {
// plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
// return
// }
//
// if _, ok := res.(*GossipResponseAck); ok {
// memberState.CommitOffsets()
// return
// }
//
// m, ok := res.(proto.Message)
// if !ok {
// plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
// return
// }
// n := string(proto.MessageName(m).Name())
//
// plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r))
//})
}

func (ga *GossipActor) onSetGossipStateKey(r *SetGossipStateKey, ctx actor.Context) {
Expand Down Expand Up @@ -145,43 +157,39 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)

func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *MemberStateDelta, ctx actor.Context) {
pid := actor.NewPID(member.Address(), DefaultGossipActorName)
plog.Info("Sending GossipRequest", log.String("MemberId", member.Id))
plog.Debug("Sending GossipRequest", log.String("MemberId", member.Id))

// a short timeout is massively important, we cannot afford hanging around waiting
// for timeout, blocking other gossips from getting through

msg := GossipRequest{
// TODO: Uncomment this line when we replace the current "address:port" as ID
// with the proper ActorSystem.ID after new API refactor changes
// Oscar Campos: 2022-04-09
// MemberId: ctx.ActorSystem().ID,
MemberId: member.Address(),
MemberId: member.Id,
State: memberStateDelta.State,
}
future := ctx.RequestFuture(pid, &msg, ga.gossipRequestTimeout)

// wait until we get a response or an error from the future
r, err := future.Result()
if err != nil {
plog.Error("onSendGossipState failed", log.Error(err))
ctx.ReenterAfter(future, func(res interface{}, err error) {
if ctx.Sender() != nil {
ctx.Send(ctx.Sender(), &GossipResponseAck{})
}

return
}
if err != nil {
plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err))
return
}

resp, ok := r.(*GossipResponse)
if !ok {
plog.Error("onSendGossipState received unknown response message", log.Message(r))
resp, ok := res.(*GossipResponse)
if !ok {
plog.Error("sendGossipForMember received unknown response message", log.Message(resp))

return
}
return
}

memberStateDelta.CommitOffsets()
memberStateDelta.CommitOffsets()

if resp.State != nil {
ga.ReceiveState(resp.State, ctx)
if resp.State != nil {
ga.ReceiveState(resp.State, ctx)

if ctx.Sender() != nil {
ctx.Send(ctx.Sender(), &GossipResponseAck{})
}
}
})
}

0 comments on commit e9a39cd

Please sign in to comment.