From efdd7ae5c8d590ca65797ee8cd71cd15bda5db92 Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Tue, 21 Nov 2023 06:23:45 +0100 Subject: [PATCH] fix call vs request --- cluster/clusterproviders/zk/log.go | 10 - cluster/clusterproviders/zk/zk_provider.go | 60 +++-- .../partition/identity_actor.go | 211 ------------------ .../partition/identity_actor_test.go | 11 - .../partition/identity_lookup.go | 35 --- cluster/identitylookup/partition/log.go | 14 -- cluster/identitylookup/partition/manager.go | 110 --------- .../identitylookup/partition/partition.puml | 40 ---- .../partition/placement_actor.go | 144 ------------ 9 files changed, 29 insertions(+), 606 deletions(-) delete mode 100644 cluster/identitylookup/partition/identity_actor.go delete mode 100644 cluster/identitylookup/partition/identity_actor_test.go delete mode 100644 cluster/identitylookup/partition/identity_lookup.go delete mode 100644 cluster/identitylookup/partition/log.go delete mode 100644 cluster/identitylookup/partition/manager.go delete mode 100644 cluster/identitylookup/partition/partition.puml delete mode 100644 cluster/identitylookup/partition/placement_actor.go diff --git a/cluster/clusterproviders/zk/log.go b/cluster/clusterproviders/zk/log.go index 302a86ab0..b44c6ce42 100644 --- a/cluster/clusterproviders/zk/log.go +++ b/cluster/clusterproviders/zk/log.go @@ -1,11 +1 @@ package zk - -import "github.com/asynkron/protoactor-go/log" - -var plog = log.New(log.InfoLevel, "[CLU/ZK]") - -// SetLogLevel sets the log level for the logger -// SetLogLevel is safe to be called concurrently -func SetLogLevel(level log.Level) { - plog.SetLevel(level) -} diff --git a/cluster/clusterproviders/zk/zk_provider.go b/cluster/clusterproviders/zk/zk_provider.go index 6cb6eae93..787cec533 100644 --- a/cluster/clusterproviders/zk/zk_provider.go +++ b/cluster/clusterproviders/zk/zk_provider.go @@ -3,13 +3,13 @@ package zk import ( "context" "fmt" + "log/slog" "net" "strconv" "strings" "time" "github.com/asynkron/protoactor-go/cluster" - "github.com/asynkron/protoactor-go/log" "github.com/go-zookeeper/zk" ) @@ -71,12 +71,10 @@ func New(endpoints []string, opts ...Option) (*Provider, error) { } conn, err := connectZk(endpoints, zkCfg.SessionTimeout, WithEventCallback(p.onEvent)) if err != nil { - plog.Error("connect zk fail", log.Error(err)) return nil, err } if auth := zkCfg.Auth; !auth.isEmpty() { if err = conn.AddAuth(auth.Scheme, []byte(auth.Credential)); err != nil { - plog.Error("auth failure.", log.String("scheme", auth.Scheme), log.String("cred", auth.Credential), log.Error(err)) return nil, err } } @@ -113,7 +111,7 @@ func (p *Provider) init(c *cluster.Cluster) error { func (p *Provider) StartMember(c *cluster.Cluster) error { if err := p.init(c); err != nil { - plog.Error("init fail " + err.Error()) + p.cluster.Logger().Error("init fail " + err.Error()) return err } @@ -121,15 +119,15 @@ func (p *Provider) StartMember(c *cluster.Cluster) error { // register self if err := p.registerService(); err != nil { - plog.Error("register service fail " + err.Error()) + p.cluster.Logger().Error("register service fail " + err.Error()) return err } - plog.Info("StartMember register service.", log.String("node", p.self.ID), log.String("seq", p.self.Meta[metaKeySeq])) + p.cluster.Logger().Info("StartMember register service.", slog.String("node", p.self.ID), slog.String("seq", p.self.Meta[metaKeySeq])) // fetch member list nodes, version, err := p.fetchNodes() if err != nil { - plog.Error("fetch nodes fail " + err.Error()) + p.cluster.Logger().Error("fetch nodes fail " + err.Error()) return err } // initialize members @@ -163,7 +161,7 @@ func (p *Provider) Shutdown(graceful bool) error { p.updateLeadership(nil) err := p.deregisterService() if err != nil { - plog.Error("deregisterMember", log.Error(err)) + p.cluster.Logger().Error("deregisterMember", slog.Any("error", err)) return err } p.deregistered = true @@ -178,19 +176,19 @@ func (p *Provider) getID() string { func (p *Provider) registerService() error { data, err := p.self.Serialize() if err != nil { - plog.Error("registerService Serialize fail.", log.Error(err)) + p.cluster.Logger().Error("registerService Serialize fail.", slog.Any("error", err)) return err } path, err := p.createEphemeralChildNode(data) if err != nil { - plog.Error("createEphemeralChildNode fail.", log.String("node", p.clusterKey), log.Error(err)) + p.cluster.Logger().Error("createEphemeralChildNode fail.", slog.String("node", p.clusterKey), slog.Any("error", err)) return err } p.fullpath = path seq, _ := parseSeq(path) p.self.SetMeta(metaKeySeq, intToStr(seq)) - plog.Info("RegisterService.", log.String("id", p.self.ID), log.Int("seq", seq)) + p.cluster.Logger().Info("RegisterService.", slog.String("id", p.self.ID), slog.Int("seq", seq)) return nil } @@ -201,7 +199,7 @@ func (p *Provider) createClusterNode(dir string) error { } exist, _, err := p.conn.Exists(dir) if err != nil { - plog.Error("check exist of node fail", log.String("dir", dir), log.Error(err)) + p.cluster.Logger().Error("check exist of node fail", slog.String("dir", dir), slog.Any("error", err)) return err } if exist { @@ -211,7 +209,7 @@ func (p *Provider) createClusterNode(dir string) error { return err } if _, err = p.conn.Create(dir, []byte{}, 0, zk.WorldACL(zk.PermAll)); err != nil { - plog.Error("create dir node fail", log.String("dir", dir), log.Error(err)) + p.cluster.Logger().Error("create dir node fail", slog.String("dir", dir), slog.Any("error", err)) return err } return nil @@ -229,7 +227,7 @@ func (p *Provider) deregisterService() error { func (p *Provider) keepWatching(ctx context.Context, registerSelf bool) error { evtChan, err := p.addWatcher(ctx, p.clusterKey) if err != nil { - plog.Error("list children fail", log.String("node", p.clusterKey), log.Error(err)) + p.cluster.Logger().Error("list children fail", slog.String("node", p.clusterKey), slog.Any("error", err)) return err } @@ -239,16 +237,16 @@ func (p *Provider) keepWatching(ctx context.Context, registerSelf bool) error { func (p *Provider) addWatcher(ctx context.Context, clusterKey string) (<-chan zk.Event, error) { _, stat, evtChan, err := p.conn.ChildrenW(clusterKey) if err != nil { - plog.Error("list children fail", log.String("node", clusterKey), log.Error(err)) + p.cluster.Logger().Error("list children fail", slog.String("node", clusterKey), slog.Any("error", err)) return nil, err } - plog.Info("KeepWatching cluster.", log.String("cluster", clusterKey), log.Int("children", int(stat.NumChildren))) + p.cluster.Logger().Info("KeepWatching cluster.", slog.String("cluster", clusterKey), slog.Int("children", int(stat.NumChildren))) if !p.isChildrenChanged(ctx, stat) { return evtChan, nil } - plog.Info("Chilren changed, wait 1 sec and watch again", log.Int("old_cversion", int(p.revision)), log.Int("new_revison", int(stat.Cversion))) + p.cluster.Logger().Info("Chilren changed, wait 1 sec and watch again", slog.Int("old_cversion", int(p.revision)), slog.Int("new_revison", int(stat.Cversion))) time.Sleep(1 * time.Second) nodes, version, err := p.fetchNodes() if err != nil { @@ -268,16 +266,16 @@ func (p *Provider) isChildrenChanged(ctx context.Context, stat *zk.Stat) bool { func (p *Provider) _keepWatching(registerSelf bool, stream <-chan zk.Event) error { event := <-stream if err := event.Err; err != nil { - plog.Error("Failure watching service.", log.Error(err)) + p.cluster.Logger().Error("Failure watching service.", slog.Any("error", err)) if registerSelf && p.clusterNotContainsSelfPath() { - plog.Info("Register info lost, register self again") + p.cluster.Logger().Info("Register info lost, register self again") p.registerService() } return err } nodes, version, err := p.fetchNodes() if err != nil { - plog.Error("Failure fetch nodes when watching service.", log.Error(err)) + p.cluster.Logger().Error("Failure fetch nodes when watching service.", slog.Any("error", err)) return err } if !p.containSelf(nodes) && registerSelf { @@ -288,7 +286,7 @@ func (p *Provider) _keepWatching(registerSelf bool, stream <-chan zk.Event) erro // reload nodes nodes, version, err = p.fetchNodes() if err != nil { - plog.Error("Failure fetch nodes when watching service.", log.Error(err)) + p.cluster.Logger().Error("Failure fetch nodes when watching service.", slog.Any("error", err)) return err } } @@ -334,21 +332,21 @@ func (p *Provider) updateLeadership(ns []*Node) { role = Leader } if role != p.role { - plog.Info("Role changed.", log.String("from", p.role.String()), log.String("to", role.String())) + p.cluster.Logger().Info("Role changed.", slog.String("from", p.role.String()), slog.String("to", role.String())) p.role = role p.roleChangedChan <- role } } func (p *Provider) onEvent(evt zk.Event) { - plog.Debug("Zookeeper event.", log.String("type", evt.Type.String()), log.String("state", evt.State.String()), log.String("path", evt.Path)) + p.cluster.Logger().Debug("Zookeeper event.", slog.String("type", evt.Type.String()), slog.String("state", evt.State.String()), slog.String("path", evt.Path)) if evt.Type != zk.EventSession { return } switch evt.State { case zk.StateConnecting, zk.StateDisconnected, zk.StateExpired: if p.role == Leader { - plog.Info("Role changed.", log.String("from", Leader.String()), log.String("to", Follower.String())) + p.cluster.Logger().Info("Role changed.", slog.String("from", Leader.String()), slog.String("to", Follower.String())) p.role = Follower p.roleChangedChan <- Follower } @@ -379,7 +377,7 @@ func (p *Provider) startWatching(registerSelf bool) { go func() { for !p.shutdown { if err := p.keepWatching(ctx, registerSelf); err != nil { - plog.Error("Failed to keepWatching.", log.Error(err)) + p.cluster.Logger().Error("Failed to keepWatching.", slog.Any("error", err)) p.clusterError = err } } @@ -394,7 +392,7 @@ func (p *Provider) GetHealthStatus() error { func (p *Provider) fetchNodes() ([]*Node, int32, error) { children, stat, err := p.conn.Children(p.clusterKey) if err != nil { - plog.Error("FetchNodes fail.", log.String("node", p.clusterKey), log.Error(err)) + p.cluster.Logger().Error("FetchNodes fail.", slog.String("node", p.clusterKey), slog.Any("error", err)) return nil, 0, err } @@ -403,21 +401,21 @@ func (p *Provider) fetchNodes() ([]*Node, int32, error) { long := joinPath(p.clusterKey, short) value, _, err := p.conn.Get(long) if err != nil { - plog.Error("FetchNodes fail.", log.String("node", long), log.Error(err)) + p.cluster.Logger().Error("FetchNodes fail.", slog.String("node", long), slog.Any("error", err)) return nil, stat.Cversion, err } n := Node{Meta: make(map[string]string)} if err := n.Deserialize(value); err != nil { - plog.Error("FetchNodes Deserialize fail.", log.String("node", long), log.String("val", string(value)), log.Error(err)) + p.cluster.Logger().Error("FetchNodes Deserialize fail.", slog.String("node", long), slog.String("val", string(value)), slog.Any("error", err)) return nil, stat.Cversion, err } seq, err := parseSeq(long) if err != nil { - plog.Error("FetchNodes parse seq fail.", log.String("node", long), log.String("val", string(value)), log.Error(err)) + p.cluster.Logger().Error("FetchNodes parse seq fail.", slog.String("node", long), slog.String("val", string(value)), slog.Any("error", err)) } else { n.SetMeta(metaKeySeq, intToStr(seq)) } - plog.Info("FetchNodes new node.", log.String("id", n.ID), log.String("path", long), log.Int("seq", seq)) + p.cluster.Logger().Info("FetchNodes new node.", slog.String("id", n.ID), slog.String("path", long), slog.Int("seq", seq)) nodes = append(nodes, &n) } return p.uniqNodes(nodes), stat.Cversion, nil @@ -469,7 +467,7 @@ func (p *Provider) createClusterTopologyEvent() []*cluster.Member { func (p *Provider) publishClusterTopologyEvent() { res := p.createClusterTopologyEvent() - plog.Info("Update cluster.", log.Int("members", len(res))) + p.cluster.Logger().Info("Update cluster.", slog.Int("members", len(res))) p.cluster.MemberList.UpdateClusterTopology(res) } diff --git a/cluster/identitylookup/partition/identity_actor.go b/cluster/identitylookup/partition/identity_actor.go deleted file mode 100644 index 0eabae956..000000000 --- a/cluster/identitylookup/partition/identity_actor.go +++ /dev/null @@ -1,211 +0,0 @@ -package partition - -import ( - "time" - - "github.com/asynkron/protoactor-go/actor" - clustering "github.com/asynkron/protoactor-go/cluster" - "github.com/asynkron/protoactor-go/log" -) - -// This actor is responsible to keep track of identities owned by this member -// it does not manage the cluster spawned actors itself, only identity->remote PID management -// TLDR; this is a partition/bucket in the distributed hash table which makes up the identity lookup -// -// for spawning/activating cluster actors see PartitionActivator.cs - -type identityActor struct { - cluster *clustering.Cluster - partitionManager *Manager - lookup map[string]*actor.PID - spawns map[string]*actor.Future - topologyHash uint64 - handoverTimeout time.Duration - rdv *clustering.Rendezvous -} - -func newIdentityActor(c *clustering.Cluster, p *Manager) *identityActor { - return &identityActor{ - cluster: c, - partitionManager: p, - handoverTimeout: 10 * time.Second, - lookup: map[string]*actor.PID{}, - spawns: map[string]*actor.Future{}, - } -} - -func (p *identityActor) Receive(ctx actor.Context) { - switch msg := ctx.Message().(type) { - case *actor.Started: - p.onStart(ctx) - case *actor.Stopped: - p.onStopped() - case *clustering.ActivationRequest: - p.onActivationRequest(msg, ctx) - case *clustering.ActivationTerminated: - p.onActivationTerminated(msg) - case *clustering.ClusterTopology: - p.onClusterTopology(msg, ctx) - default: - plog.Error("Invalid message", log.TypeOf("type", msg), log.PID("sender", ctx.Sender())) - } -} - -func (p *identityActor) onStart(ctx actor.Context) { - plog.Debug("Started PartitionIdentity") - self := ctx.Self() - ctx.ActorSystem().EventStream.Subscribe(func(evt interface{}) { - if at, ok := evt.(*clustering.ActivationTerminated); ok { - p.cluster.ActorSystem.Root.Send(self, at) - } - }) -} - -func (p *identityActor) onStopped() { - plog.Info("Stopped PartitionIdentity") -} - -func (p *identityActor) onActivationRequest(msg *clustering.ActivationRequest, ctx actor.Context) { - ownerAddress := p.rdv.GetByClusterIdentity(msg.ClusterIdentity) - - // should I own it? - if ownerAddress != ctx.Self().Address { - ownerPid := p.partitionManager.PidOfIdentityActor(ownerAddress) - ctx.Forward(ownerPid) - return - } - - // do I already own it? - if pid, ok := p.lookup[msg.ClusterIdentity.AsKey()]; ok { - respondActivation(pid, ctx) - return - } - - // Get activator - activatorAddress := p.cluster.MemberList.GetActivatorMember(msg.ClusterIdentity.Kind, ctx.Sender().Address) - activator := p.partitionManager.PidOfActivatorActor(activatorAddress) - - // No activator found, bail out and respond empty - if activator == nil { - respondEmptyActivation(ctx) - return - } - - // What is this? - // in case the actor of msg.Name is not yet spawned. there could be multiple re-entrant - // messages requesting it, we just reuse the same task for all those - // once spawned, the key is removed from this dict - res, ok := p.spawns[msg.ClusterIdentity.AsKey()] - if !ok { - res = p.spawnRemoteActor(msg, activatorAddress) - p.spawns[msg.ClusterIdentity.AsKey()] = res - } - - // execution ends here. context.ReenterAfter is invoked once the task completes - // but still within the actors sequential execution - // but other messages could have been processed in between - // Await SpawningProcess - ctx.ReenterAfter(res, func(res interface{}, err error) { - delete(p.spawns, msg.ClusterIdentity.AsKey()) - - ar, ok := res.(*clustering.ActivationResponse) - if !ok { - // spawn failed, respond empty - respondEmptyActivation(ctx) - return - } - - // do I already own it? - if pid, ok := p.lookup[msg.ClusterIdentity.AsKey()]; ok { - respondActivation(pid, ctx) - return - } - - p.lookup[msg.ClusterIdentity.AsKey()] = ar.Pid - - respondActivation(ar.Pid, ctx) - }) -} - -func respondActivation(pid *actor.PID, ctx actor.Context) { - response := &clustering.ActivationResponse{ - Pid: pid, - } - - ctx.Respond(response) -} - -func respondEmptyActivation(ctx actor.Context) { - response := &clustering.ActivationResponse{ - Pid: nil, - } - ctx.Respond(response) -} - -func (p *identityActor) onActivationTerminated(msg *clustering.ActivationTerminated) { - // //we get this via broadcast to all nodes, remove if we have it, or ignore - key := msg.ClusterIdentity.AsKey() - _, ok := p.spawns[key] - if ok { - return - } - - // Logger.LogDebug("[PartitionIdentityActor] Terminated {Pid}", msg.Pid); - p.cluster.PidCache.RemoveByValue(msg.ClusterIdentity.Identity, msg.ClusterIdentity.Kind, msg.Pid) - delete(p.lookup, key) -} - -func (p *identityActor) onClusterTopology(msg *clustering.ClusterTopology, ctx actor.Context) { - // await _cluster.MemberList.TopologyConsensus(); - if p.topologyHash == msg.TopologyHash { - return - } - - members := msg.Members - p.rdv = clustering.NewRendezvous() - p.rdv.UpdateMembers(members) - p.lookup = map[string]*actor.PID{} - futures := make([]*actor.Future, 0) - - requestMsg := &clustering.IdentityHandoverRequest{ - CurrentTopology: &clustering.IdentityHandoverRequest_Topology{ - Members: msg.Members, - TopologyHash: msg.TopologyHash, - }, - - Address: ctx.Self().Address, - } - - for _, m := range members { - placementPid := p.partitionManager.PidOfActivatorActor(m.Address()) - future := ctx.RequestFuture(placementPid, requestMsg, 5*time.Second) - - futures = append(futures, future) - } - - for _, f := range futures { - res, _ := f.Result() - if response, ok := res.(*clustering.IdentityHandover); ok { - for _, activation := range response.Actors { - p.takeOwnership(activation) - } - } - } -} - -func (p *identityActor) takeOwnership(activation *clustering.Activation) { - key := activation.ClusterIdentity.AsKey() - if existing, ok := p.lookup[key]; ok { - if existing.Address == activation.Pid.Address { - return - } - } - - p.lookup[key] = activation.Pid -} - -func (p *identityActor) spawnRemoteActor(msg *clustering.ActivationRequest, address string) *actor.Future { - activator := p.partitionManager.PidOfActivatorActor(address) - future := p.cluster.ActorSystem.Root.RequestFuture(activator, msg, 5*time.Second) - return future -} diff --git a/cluster/identitylookup/partition/identity_actor_test.go b/cluster/identitylookup/partition/identity_actor_test.go deleted file mode 100644 index a6f0960d6..000000000 --- a/cluster/identitylookup/partition/identity_actor_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package partition - -// func TestPartitionIdentityActor_handleClusterTopology(t *testing.T) { -// assert := assert.New(t) -// members := _newTopologyEventForTest(1) -// cluster := _newClusterForTest("test-partition-identityactor") -// partitionManager := newPartitionManager(cluster) -// partitionManager.StartMember() -// tplg := ClusterTopology{Members: members, EventId: 1} -// cluster.ActorSystem.EventStream.Publish(&tplg) -// } diff --git a/cluster/identitylookup/partition/identity_lookup.go b/cluster/identitylookup/partition/identity_lookup.go deleted file mode 100644 index aa0f561af..000000000 --- a/cluster/identitylookup/partition/identity_lookup.go +++ /dev/null @@ -1,35 +0,0 @@ -package partition - -import ( - "github.com/asynkron/protoactor-go/actor" - "github.com/asynkron/protoactor-go/cluster" -) - -type IdentityLookup struct { - partitionManager *Manager -} - -func (p *IdentityLookup) Get(clusterIdentity *cluster.ClusterIdentity) *actor.PID { - return p.partitionManager.Get(clusterIdentity) -} - -func (p *IdentityLookup) RemovePid(clusterIdentity *cluster.ClusterIdentity, pid *actor.PID) { - activationTerminated := &cluster.ActivationTerminated{ - Pid: pid, - ClusterIdentity: clusterIdentity, - } - p.partitionManager.cluster.MemberList.BroadcastEvent(activationTerminated, true) -} - -func (p *IdentityLookup) Setup(cluster *cluster.Cluster, kinds []string, isClient bool) { - p.partitionManager = newPartitionManager(cluster) - p.partitionManager.Start() -} - -func (p *IdentityLookup) Shutdown() { - p.partitionManager.Stop() -} - -func New() cluster.IdentityLookup { - return &IdentityLookup{} -} diff --git a/cluster/identitylookup/partition/log.go b/cluster/identitylookup/partition/log.go deleted file mode 100644 index 357c89019..000000000 --- a/cluster/identitylookup/partition/log.go +++ /dev/null @@ -1,14 +0,0 @@ -package partition - -import ( - "github.com/asynkron/protoactor-go/log" -) - -var plog = log.New(log.DefaultLevel, "[PARTITION]") - -// SetLogLevel sets the log level for the logger. -// -// SetLogLevel is safe to call concurrently -func SetLogLevel(level log.Level) { - plog.SetLevel(level) -} diff --git a/cluster/identitylookup/partition/manager.go b/cluster/identitylookup/partition/manager.go deleted file mode 100644 index 10617801f..000000000 --- a/cluster/identitylookup/partition/manager.go +++ /dev/null @@ -1,110 +0,0 @@ -package partition - -import ( - "time" - - "github.com/asynkron/protoactor-go/actor" - clustering "github.com/asynkron/protoactor-go/cluster" - "github.com/asynkron/protoactor-go/eventstream" - "github.com/asynkron/protoactor-go/log" -) - -const ( - ActorNameIdentity = "partition" - ActorNamePlacement = "partition-activator" -) - -type Manager struct { - cluster *clustering.Cluster - topologySub *eventstream.Subscription - identityActor *actor.PID - placementActor *actor.PID - rdv *clustering.Rendezvous -} - -func newPartitionManager(c *clustering.Cluster) *Manager { - return &Manager{ - cluster: c, - } -} - -func (pm *Manager) Start() { - plog.Info("Started partition manager") - system := pm.cluster.ActorSystem - - identityProps := actor.PropsFromProducer(func() actor.Actor { return newIdentityActor(pm.cluster, pm) }) - pm.identityActor, _ = system.Root.SpawnNamed(identityProps, ActorNameIdentity) - plog.Info("Started partition identity actor") - - activatorProps := actor.PropsFromProducer(func() actor.Actor { return newPlacementActor(pm.cluster, pm) }) - pm.placementActor, _ = system.Root.SpawnNamed(activatorProps, ActorNamePlacement) - plog.Info("Started partition placement actor") - - pm.topologySub = system.EventStream. - Subscribe(func(ev interface{}) { - // fmt.Printf("PM got event.... %v", ev) - if topology, ok := ev.(*clustering.ClusterTopology); ok { - pm.onClusterTopology(topology) - } - }) -} - -func (pm *Manager) Stop() { - system := pm.cluster.ActorSystem - system.EventStream.Unsubscribe(pm.topologySub) - - err := system.Root.PoisonFuture(pm.placementActor).Wait() - if err != nil { - plog.Error("Failed to shutdown partition placement actor", log.Error(err)) - } - - plog.Info("Stopped PartitionManager") -} - -func (pm *Manager) PidOfIdentityActor(addr string) *actor.PID { - return actor.NewPID(addr, ActorNameIdentity) -} - -func (pm *Manager) PidOfActivatorActor(addr string) *actor.PID { - return actor.NewPID(addr, ActorNamePlacement) -} - -func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) { - plog.Info("onClusterTopology", log.Uint64("eventId", tplg.TopologyHash)) - - for _, m := range tplg.Members { - plog.Info("Got member " + m.Id) - - for _, k := range m.Kinds { - plog.Info("" + m.Id + " - " + k) - } - } - - pm.rdv = clustering.NewRendezvous() - pm.rdv.UpdateMembers(tplg.Members) - pm.cluster.ActorSystem.Root.Send(pm.identityActor, tplg) -} - -func (pm *Manager) Get(identity *clustering.ClusterIdentity) *actor.PID { - ownerAddress := pm.rdv.GetByClusterIdentity(identity) - - if ownerAddress == "" { - return nil - } - - identityOwnerPid := pm.PidOfIdentityActor(ownerAddress) - request := &clustering.ActivationRequest{ - ClusterIdentity: identity, - RequestId: "aaaa", - } - future := pm.cluster.ActorSystem.Root.RequestFuture(identityOwnerPid, request, 5*time.Second) - res, err := future.Result() - if err != nil { - return nil - } - typed, ok := res.(*clustering.ActivationResponse) - if !ok { - return nil - } - return typed.Pid -} diff --git a/cluster/identitylookup/partition/partition.puml b/cluster/identitylookup/partition/partition.puml deleted file mode 100644 index add6e2e23..000000000 --- a/cluster/identitylookup/partition/partition.puml +++ /dev/null @@ -1,40 +0,0 @@ -@startuml - -title "Partition Sequence" - -participant "IdentityLookup@node1" as lookup #LightGreen -participant "IdentityActor@node1" as id1 #LightGreen -participant "PlacementActor@node1" as place1 #LightGreen - -participant "IdentityActor@node2" as id2 #Pink -participant "PlacementActor@node2" as place2 #Pink - - -lookup -> id1: send ActivationRequest -activate id1 - id1 -> id1: owner.address = chash(ClusterIdentity) - activate id1 - -alt owner.address == self.address (owner = chash(id)) - id1 -> place1: forward ActivationRequest - activate place1 - id1 <- place1: respond ActivationResponse - deactivate place1 - deactivate id1 - lookup <- id1: respond ActivationResponse -else - id1 -> id2: send ActivationRequest - activate id2 - id2 -> id2: owner.address = chash(ClusterIdentity) - activate id2 - id2 -> place2: forward ActivationRequest - deactivate id2 - deactivate id2 - activate place2 - id1 <- place2: respond ActivationResponse - deactivate place2 - lookup <- id1: respond ActivationResponse - deactivate id1 -end - -@enduml \ No newline at end of file diff --git a/cluster/identitylookup/partition/placement_actor.go b/cluster/identitylookup/partition/placement_actor.go deleted file mode 100644 index 20962147e..000000000 --- a/cluster/identitylookup/partition/placement_actor.go +++ /dev/null @@ -1,144 +0,0 @@ -package partition - -import ( - "github.com/asynkron/protoactor-go/actor" - clustering "github.com/asynkron/protoactor-go/cluster" - "github.com/asynkron/protoactor-go/log" -) - -type GrainMeta struct { - ID *clustering.ClusterIdentity - PID *actor.PID -} - -type placementActor struct { - cluster *clustering.Cluster - partitionManager *Manager - actors map[string]GrainMeta -} - -func newPlacementActor(c *clustering.Cluster, pm *Manager) *placementActor { - return &placementActor{ - cluster: c, - partitionManager: pm, - actors: map[string]GrainMeta{}, - } -} - -func (p *placementActor) Receive(ctx actor.Context) { - switch msg := ctx.Message().(type) { - case *actor.Stopping: - plog.Info("Placement actor stopping") - p.onStopping(ctx) - case *actor.Stopped: - plog.Info("Placement actor stopped") - case *actor.Terminated: - p.onTerminated(msg, ctx) - case *clustering.IdentityHandoverRequest: - p.onIdentityHandoverRequest(msg, ctx) - case *clustering.ActivationRequest: - p.onActivationRequest(msg, ctx) - default: - plog.Error("Invalid message", log.TypeOf("type", msg), log.PID("sender", ctx.Sender())) - } -} - -func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context) { - found, key, meta := p.pidToMeta(msg.Who) - - activationTerminated := &clustering.ActivationTerminated{ - Pid: msg.Who, - ClusterIdentity: meta.ID, - } - p.partitionManager.cluster.MemberList.BroadcastEvent(activationTerminated, true) - - if found { - delete(p.actors, *key) - } -} - -func (p *placementActor) onStopping(ctx actor.Context) { - futures := make(map[string]*actor.Future, len(p.actors)) - - for key, meta := range p.actors { - futures[key] = ctx.PoisonFuture(meta.PID) - } - - for key, future := range futures { - err := future.Wait() - if err != nil { - plog.Error("Failed to poison actor", log.String("identity", key), log.Error(err)) - } - } -} - -// this is pure, we do not change any state or actually move anything -// the requester also provide its own view of the world in terms of members -// TLDR; we are not using any topology state from this actor itself -func (p *placementActor) onIdentityHandoverRequest(msg *clustering.IdentityHandoverRequest, ctx actor.Context) { - count := 0 - response := &clustering.IdentityHandover{} - requestAddress := ctx.Sender().Address - rdv := clustering.NewRendezvous() - rdv.UpdateMembers(msg.CurrentTopology.Members) - for identity, meta := range p.actors { - // who owns this identity according to the requesters memberlist? - ownerAddress := rdv.GetByIdentity(identity) - // this identity is not owned by the requester - if ownerAddress != requestAddress { - continue - } - // _logger.LogDebug("Transfer {Identity} to {newOwnerAddress} -- {TopologyHash}", clusterIdentity, ownerAddress, - // msg.TopologyHash - // ); - - actorToHandOver := &clustering.Activation{ - ClusterIdentity: meta.ID, - Pid: meta.PID, - } - - response.Actors = append(response.Actors, actorToHandOver) - count++ - } - - plog.Debug("Transferred ownership to other members", log.Int("count", count)) - ctx.Respond(response) -} - -func (p *placementActor) onActivationRequest(msg *clustering.ActivationRequest, ctx actor.Context) { - key := msg.ClusterIdentity.AsKey() - meta, found := p.actors[key] - if found { - response := &clustering.ActivationResponse{ - Pid: meta.PID, - } - ctx.Respond(response) - return - } - - clusterKind := p.cluster.GetClusterKind(msg.ClusterIdentity.Kind) - - props := clustering.WithClusterIdentity(clusterKind.Props, msg.ClusterIdentity) - - pid := ctx.SpawnPrefix(props, msg.ClusterIdentity.Identity) - - p.actors[key] = GrainMeta{ - ID: msg.ClusterIdentity, - PID: pid, - } - - response := &clustering.ActivationResponse{ - Pid: pid, - } - - ctx.Respond(response) -} - -func (p *placementActor) pidToMeta(pid *actor.PID) (bool, *string, *GrainMeta) { - for k, v := range p.actors { - if v.PID == pid { - return true, &k, &v - } - } - return false, nil, nil -}