Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: minor logging changes #1044

Merged
merged 1 commit into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {

if _, isIgnoreDeadLetter := deadLetter.Message.(IgnoreDeadLetterLogging); !isIgnoreDeadLetter {
if shouldThrottle() == Open {
actorSystem.Logger().Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
actorSystem.Logger().Info("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions actor/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBack func(int32)) ShouldThrottle {
currentEvents := int32(0)

startTimer := func(duration time.Duration, back func(int32)) {
startTimer := func(duration time.Duration) {
go func() {
// crete ticker to mimic sleep, we do not want to put the goroutine to sleep
// as it will schedule it out of the P making a syscall, we just want it to
Expand All @@ -42,7 +42,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac
return func() Valve {
tries := atomic.AddInt32(&currentEvents, 1)
if tries == 1 {
startTimer(period, throttledCallBack)
startTimer(period)
}

if tries == maxEventsInPeriod {
Expand Down
7 changes: 4 additions & 3 deletions cluster/identitylookup/disthash/placement_actor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package disthash

import (
"log/slog"

"github.com/asynkron/protoactor-go/actor"
clustering "github.com/asynkron/protoactor-go/cluster"
"log/slog"
)

type GrainMeta struct {
Expand Down Expand Up @@ -35,7 +36,7 @@ func (p *placementActor) Receive(ctx actor.Context) {
case *actor.Stopped:
ctx.Logger().Info("Placement actor stopped")
case *actor.Terminated:
p.onTerminated(msg, ctx)
p.onTerminated(msg)
case *clustering.ActivationRequest:
p.onActivationRequest(msg, ctx)
case *clustering.ClusterTopology:
Expand All @@ -45,7 +46,7 @@ func (p *placementActor) Receive(ctx actor.Context) {
}
}

func (p *placementActor) onTerminated(msg *actor.Terminated, ctx actor.Context) {
func (p *placementActor) onTerminated(msg *actor.Terminated) {
found, key, meta := p.pidToMeta(msg.Who)

activationTerminated := &clustering.ActivationTerminated{
Expand Down
14 changes: 11 additions & 3 deletions cluster/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cluster
import (
"context"
"log/slog"
"strings"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -165,7 +164,7 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg
for i, report := range reports {
subscribers[i] = report.Subscriber.String()
}
logger.Error("Topic following subscribers could not process the batch", slog.String("topic", t.topic), slog.String("subscribers", strings.Join(subscribers, ",")))
logger.Error("Topic following subscribers could not process the batch", slog.String("topic", t.topic), slog.Any("subscribers", subscribers))
}
}

Expand Down Expand Up @@ -227,7 +226,16 @@ func (t *TopicActor) removeSubscribers(subscribersThatLeft []subscribeIdentitySt
delete(t.subscribers, subscriber)
}
if t.shouldThrottle() == actor.Open {
logger.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", slog.String("topic", t.topic), slog.Any("subscribers", subscribersThatLeft))
// slog json handler cannot print private fields
ids := make([]string, 0, len(subscribersThatLeft))
for _, subscriber := range subscribersThatLeft {
if subscriber.isPID {
ids = append(ids, subscriber.pid.id)
} else {
ids = append(ids, subscriber.clusterIdentity.identity)
}
}
logger.Warn("Topic removed subscribers, because they are dead or they are on members that left the clusterIdentity:", slog.String("topic", t.topic), slog.Any("subscribers", ids))
}
t.saveSubscriptionsInTopicActor(logger)
}
Expand Down
Loading