Skip to content

Commit

Permalink
fix: restart a endpoint writer will cause a starting loop
Browse files Browse the repository at this point in the history
  • Loading branch information
qazwsxedckll committed Jan 5, 2024
1 parent 697f729 commit 9685d4e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
5 changes: 1 addition & 4 deletions cluster/identitylookup/disthash/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) {
pm.cluster.Logger().Info("onClusterTopology", slog.Uint64("topology-hash", tplg.TopologyHash))

for _, m := range tplg.Members {
pm.cluster.Logger().Info("Got member ", slog.String("MemberId", m.Id))
for _, k := range m.Kinds {
pm.cluster.Logger().Info("" + m.Id + " - " + k)
}
pm.cluster.Logger().Info("Got member", slog.Any("member", m))
}

pm.rdv = clustering.NewRendezvous()
Expand Down
14 changes: 10 additions & 4 deletions remote/endpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type endpointLazy struct {
// valueFunc func() *endpoint
unloaded uint32
unloaded atomic.Bool
once sync.Once
endpoint atomic.Value
manager *endpointManager
Expand All @@ -27,6 +27,7 @@ func NewEndpointLazy(em *endpointManager, address string) *endpointLazy {
}

func (el *endpointLazy) connect() {
el.manager.remote.actorSystem.Logger().Debug("connecting to remote address", slog.String("address", el.address))
em := el.manager
system := em.remote.actorSystem
rst, _ := system.Root.RequestFuture(em.endpointSupervisor, el.address, -1).Result()
Expand Down Expand Up @@ -247,10 +248,10 @@ func (em *endpointManager) removeEndpoint(msg *EndpointTerminatedEvent) {
v, ok := em.connections.Load(msg.Address)
if ok {
le := v.(*endpointLazy)
if atomic.CompareAndSwapUint32(&le.unloaded, 0, 1) {
if le.unloaded.CompareAndSwap(false, true) {
em.connections.Delete(msg.Address)
ep := le.Get()
em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher ans EndpointWriter", slog.String("address", msg.Address))
em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher and EndpointWriter", slog.String("address", msg.Address))
em.remote.actorSystem.Root.Send(ep.watcher, msg)
em.remote.actorSystem.Root.Send(ep.writer, msg)
}
Expand All @@ -274,13 +275,18 @@ func (state *endpointSupervisor) Receive(ctx actor.Context) {
writer: state.spawnEndpointWriter(state.remote, address, ctx),
watcher: state.spawnEndpointWatcher(state.remote, address, ctx),
}
ctx.Logger().Debug("id", slog.String("ewr", e.writer.Id), slog.String("ewa", e.watcher.Id))
ctx.Respond(e)
}
}

func (state *endpointSupervisor) HandleFailure(actorSystem *actor.ActorSystem, supervisor actor.Supervisor, child *actor.PID, rs *actor.RestartStatistics, reason interface{}, message interface{}) {
actorSystem.Logger().Debug("EndpointSupervisor handling failure", slog.Any("reason", reason), slog.Any("message", message))
supervisor.RestartChildren(child)
// use restart will cause a start loop, just stop it for now
// supervisor.RestartChildren(child)

// TODO: an extra stop is sent to the deadletter caused by EndpointTerminatedEvent
supervisor.StopChildren(child)
}

func (state *endpointSupervisor) spawnEndpointWriter(remote *Remote, address string, ctx actor.Context) *actor.PID {
Expand Down

0 comments on commit 9685d4e

Please sign in to comment.