Skip to content

Commit

Permalink
feat: minor logging changes
Browse files Browse the repository at this point in the history
  • Loading branch information
qazwsxedckll committed Mar 30, 2024
1 parent c318a5a commit 101a5f0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {

if _, isIgnoreDeadLetter := deadLetter.Message.(IgnoreDeadLetterLogging); !isIgnoreDeadLetter {
if shouldThrottle() == Open {
actorSystem.Logger().Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
actorSystem.Logger().Info("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions actor/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBack func(int32)) ShouldThrottle {
currentEvents := int32(0)

startTimer := func(duration time.Duration, back func(int32)) {
startTimer := func(duration time.Duration) {
go func() {
// crete ticker to mimic sleep, we do not want to put the goroutine to sleep
// as it will schedule it out of the P making a syscall, we just want it to
Expand All @@ -42,7 +42,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac
return func() Valve {
tries := atomic.AddInt32(&currentEvents, 1)
if tries == 1 {
startTimer(period, throttledCallBack)
startTimer(period)
}

if tries == maxEventsInPeriod {
Expand Down
7 changes: 4 additions & 3 deletions cluster/identitylookup/disthash/placement_actor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package disthash

import (
"log/slog"

"github.com/asynkron/protoactor-go/actor"
clustering "github.com/asynkron/protoactor-go/cluster"
"log/slog"
)

type GrainMeta struct {
Expand Down Expand Up @@ -35,7 +36,7 @@ func (p *placementActor) Receive(ctx actor.Context) {
case *actor.Stopped:
ctx.Logger().Info("Placement actor stopped")
case *actor.Terminated:
p.onTerminated(msg, ctx)
p.onTerminated(msg)
case *clustering.ActivationRequest:
p.onActivationRequest(msg, ctx)
case *clustering.ClusterTopology:
Expand All @@ -45,7 +46,7 @@ func (p *placementActor) Receive(ctx actor.Context) {
}
}

func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context) {
func (p *placementActor) onTerminated(msg *actor.Terminated) {
found, key, meta := p.pidToMeta(msg.Who)

activationTerminated := &clustering.ActivationTerminated{
Expand Down
14 changes: 11 additions & 3 deletions cluster/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
import (
"context"
"log/slog"
"strings"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -165,7 +164,7 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg
for i, report := range reports {
subscribers[i] = report.Subscriber.String()
}
logger.Error("Topic following subscribers could not process the batch", slog.String("topic", t.topic), slog.String("subscribers", strings.Join(subscribers, ",")))
logger.Error("Topic following subscribers could not process the batch", slog.String("topic", t.topic), slog.Any("subscribers", subscribers))
}
}

Expand Down Expand Up @@ -227,7 +226,16 @@ func (t *TopicActor) removeSubscribers(subscribersThatLeft []subscribeIdentitySt
delete(t.subscribers, subscriber)
}
if t.shouldThrottle() == actor.Open {
logger.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", slog.String("topic", t.topic), slog.Any("subscribers", subscribersThatLeft))
// slog json handler cannot print private fields
ids := make([]string, 0, len(subscribersThatLeft))
for _, subscriber := range subscribersThatLeft {
if subscriber.isPID {
ids = append(ids, subscriber.pid.id)
} else {
ids = append(ids, subscriber.clusterIdentity.identity)
}
}
logger.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", slog.String("topic", t.topic), slog.Any("subscribers", ids))
}
t.saveSubscriptionsInTopicActor(logger)
}
Expand Down

0 comments on commit 101a5f0

Please sign in to comment.