From 101a5f070ee93f85bd6008c766e87ce594b6e5bf Mon Sep 17 00:00:00 2001 From: evilolipop Date: Sat, 30 Mar 2024 13:19:36 +0800 Subject: [PATCH] feat: minor logging changes --- actor/deadletter.go | 2 +- actor/throttler.go | 4 ++-- cluster/identitylookup/disthash/placement_actor.go | 7 ++++--- cluster/pubsub_topic.go | 14 +++++++++++--- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/actor/deadletter.go b/actor/deadletter.go index d197c530..b0cbf7fd 100644 --- a/actor/deadletter.go +++ b/actor/deadletter.go @@ -42,7 +42,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess { if _, isIgnoreDeadLetter := deadLetter.Message.(IgnoreDeadLetterLogging); !isIgnoreDeadLetter { if shouldThrottle() == Open { - actorSystem.Logger().Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender)) + actorSystem.Logger().Info("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender)) } } } diff --git a/actor/throttler.go b/actor/throttler.go index e925c7ab..45785b8e 100644 --- a/actor/throttler.go +++ b/actor/throttler.go @@ -23,7 +23,7 @@ const ( func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBack func(int32)) ShouldThrottle { currentEvents := int32(0) - startTimer := func(duration time.Duration, back func(int32)) { + startTimer := func(duration time.Duration) { go func() { // crete ticker to mimic sleep, we do not want to put the goroutine to sleep // as it will schedule it out of the P making a syscall, we just want it to @@ -42,7 +42,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac return func() Valve { tries := atomic.AddInt32(¤tEvents, 1) if tries == 1 { - startTimer(period, throttledCallBack) + startTimer(period) } if tries == maxEventsInPeriod { diff --git a/cluster/identitylookup/disthash/placement_actor.go b/cluster/identitylookup/disthash/placement_actor.go index 40831f94..92c251e5 100644 --- a/cluster/identitylookup/disthash/placement_actor.go +++ b/cluster/identitylookup/disthash/placement_actor.go @@ -1,9 +1,10 @@ package disthash import ( + "log/slog" + "github.com/asynkron/protoactor-go/actor" clustering "github.com/asynkron/protoactor-go/cluster" - "log/slog" ) type GrainMeta struct { @@ -35,7 +36,7 @@ func (p *placementActor) Receive(ctx actor.Context) { case *actor.Stopped: ctx.Logger().Info("Placement actor stopped") case *actor.Terminated: - p.onTerminated(msg, ctx) + p.onTerminated(msg) case *clustering.ActivationRequest: p.onActivationRequest(msg, ctx) case *clustering.ClusterTopology: @@ -45,7 +46,7 @@ func (p *placementActor) Receive(ctx actor.Context) { } } -func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context) { +func (p *placementActor) onTerminated(msg *actor.Terminated) { found, key, meta := p.pidToMeta(msg.Who) activationTerminated := &clustering.ActivationTerminated{ diff --git a/cluster/pubsub_topic.go b/cluster/pubsub_topic.go index 76feba20..d8c56106 100644 --- a/cluster/pubsub_topic.go +++ b/cluster/pubsub_topic.go @@ -3,7 +3,6 @@ package cluster import ( "context" "log/slog" - "strings" "time" "github.com/asynkron/protoactor-go/actor" @@ -165,7 +164,7 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg for i, report := range reports { subscribers[i] = report.Subscriber.String() } - logger.Error("Topic following subscribers could not process the batch", slog.String("topic", t.topic), slog.String("subscribers", strings.Join(subscribers, ","))) + logger.Error("Topic following subscribers could not process the batch", slog.String("topic", t.topic), slog.Any("subscribers", subscribers)) } } @@ -227,7 +226,16 @@ func (t *TopicActor) removeSubscribers(subscribersThatLeft []subscribeIdentitySt delete(t.subscribers, subscriber) } if t.shouldThrottle() == actor.Open { - logger.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", slog.String("topic", t.topic), slog.Any("subscribers", subscribersThatLeft)) + // slog json handler cannot print private fields + ids := make([]string, 0, len(subscribersThatLeft)) + for _, subscriber := range subscribersThatLeft { + if subscriber.isPID { + ids = append(ids, subscriber.pid.id) + } else { + ids = append(ids, subscriber.clusterIdentity.identity) + } + } + logger.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", slog.String("topic", t.topic), slog.Any("subscribers", ids)) } t.saveSubscriptionsInTopicActor(logger) }