From 0d1a8a84f2ce918e338681fb09e0ff19468b778b Mon Sep 17 00:00:00 2001 From: Roger Johansson Date: Sun, 19 Nov 2023 10:28:27 +0100 Subject: [PATCH] Start using slog --- actor/actor_context.go | 2 +- actor/props.go | 2 +- actor/props_opts.go | 8 ++++---- cluster/cluster_config_context.go | 8 +++++--- cluster/pubsub.go | 2 +- cluster/pubsub_delivery.go | 11 ++++++----- cluster/pubsub_producer.go | 13 +++++++------ cluster/pubsub_publisher.go | 6 ++++++ cluster/pubsub_topic.go | 6 ++++-- 9 files changed, 35 insertions(+), 23 deletions(-) diff --git a/actor/actor_context.go b/actor/actor_context.go index 59503d246..372954f07 100644 --- a/actor/actor_context.go +++ b/actor/actor_context.go @@ -533,7 +533,7 @@ func (ctx *actorContext) processMessage(m interface{}) { func (ctx *actorContext) incarnateActor() { atomic.StoreInt32(&ctx.state, stateAlive) - ctx.actor = ctx.props.producer() + ctx.actor = ctx.props.producer(ctx.actorSystem) metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics) if ok && metricsSystem.enabled { diff --git a/actor/props.go b/actor/props.go index 338420840..da15b99ed 100644 --- a/actor/props.go +++ b/actor/props.go @@ -86,7 +86,7 @@ var ErrNameExists = errors.New("spawn: name exists") // Props represents configuration to define how an actor should be created. type Props struct { spawner SpawnFunc - producer Producer + producer ProducerWithActorSystem mailboxProducer MailboxProducer guardianStrategy SupervisorStrategy supervisionStrategy SupervisorStrategy diff --git a/actor/props_opts.go b/actor/props_opts.go index 6928c93f7..dadb2a08e 100644 --- a/actor/props_opts.go +++ b/actor/props_opts.go @@ -10,7 +10,7 @@ func WithOnInit(init ...func(ctx Context)) PropsOption { func WithProducer(p Producer) PropsOption { return func(props *Props) { - props.producer = p + props.producer = func(*ActorSystem) Actor { return p() } } } @@ -78,7 +78,7 @@ func WithSpawnFunc(spawn SpawnFunc) PropsOption { func WithFunc(f ReceiveFunc) PropsOption { return func(props *Props) { - props.producer = func() Actor { return f } + props.producer = func(system *ActorSystem) Actor { return f } } } @@ -100,7 +100,7 @@ func WithSpawnMiddleware(middleware ...SpawnMiddleware) PropsOption { // PropsFromProducer creates a props with the given actor producer assigned. func PropsFromProducer(producer Producer, opts ...PropsOption) *Props { p := &Props{ - producer: producer, + producer: func(*ActorSystem) Actor { return producer() }, contextDecorator: make([]ContextDecorator, 0), } p.Configure(opts...) @@ -127,7 +127,7 @@ func PropsFromFunc(f ReceiveFunc, opts ...PropsOption) *Props { } func (props *Props) Clone(opts ...PropsOption) *Props { - cp := PropsFromProducer(props.producer, + cp := PropsFromProducerWithActorSystem(props.producer, WithDispatcher(props.dispatcher), WithMailbox(props.mailboxProducer), WithContextDecorator(props.contextDecorator...), diff --git a/cluster/cluster_config_context.go b/cluster/cluster_config_context.go index e0b0764eb..373645010 100644 --- a/cluster/cluster_config_context.go +++ b/cluster/cluster_config_context.go @@ -4,6 +4,7 @@ package cluster import ( "fmt" + "log/slog" "time" "github.com/asynkron/protoactor-go/actor" @@ -32,11 +33,12 @@ func NewDefaultClusterContextConfig() *ClusterContextConfig { RequestsLogThrottlePeriod: defaultRequestsLogThrottlePeriod, MaxNumberOfEventsInRequestLogThrottledPeriod: defaultMaxNumberOfEvetsInRequestLogThrottledPeriod, RetryAction: defaultRetryAction, - requestLogThrottle: actor.NewThrottle( + //TODO: fix this + requestLogThrottle: actor.NewThrottleWithLogger(nil, int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod), defaultRequestsLogThrottlePeriod, - func(i int32) { - plog.Info(fmt.Sprintf("Throttled %d Request logs", i)) + func(logger *slog.Logger, i int32) { + logger.Info(fmt.Sprintf("Throttled %d Request logs", i)) }, ), } diff --git a/cluster/pubsub.go b/cluster/pubsub.go index d141496ce..8db81854a 100644 --- a/cluster/pubsub.go +++ b/cluster/pubsub.go @@ -32,7 +32,7 @@ func (p *PubSub) Start() { if err != nil { panic(err) // let it crash } - plog.Info("Started Cluster PubSub") + p.cluster.ActorSystem.Logger.Info("Started Cluster PubSub") } func (p *PubSub) ExtensionID() extensions.ExtensionID { diff --git a/cluster/pubsub_delivery.go b/cluster/pubsub_delivery.go index 671d4f899..c30d1113c 100644 --- a/cluster/pubsub_delivery.go +++ b/cluster/pubsub_delivery.go @@ -1,15 +1,16 @@ package cluster import ( + "log/slog" "time" "github.com/asynkron/protoactor-go/actor" - "github.com/asynkron/protoactor-go/log" "github.com/asynkron/protoactor-go/remote" ) -var pubsubMemberDeliveryLogThrottle = actor.NewThrottle(10, time.Second, func(i int32) { - plog.Warn("[PubSubMemberDeliveryActor] Throttled logs", log.Int("count", int(i))) +// TODO: fix this +var pubsubMemberDeliveryLogThrottle = actor.NewThrottleWithLogger(nil, 10, time.Second, func(logger *slog.Logger, i int32) { + logger.Warn("[PubSubMemberDeliveryActor] Throttled logs", slog.Int("count", int(i))) }) type PubSubMemberDeliveryActor struct { @@ -46,9 +47,9 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) { identityLog := func(err error) { if pubsubMemberDeliveryLogThrottle() == actor.Open { if fWithIdentity.identity.GetPid() != nil { - plog.Info("Pub-sub message delivered to PID", log.String("pid", fWithIdentity.identity.GetPid().String())) + c.Logger().Info("Pub-sub message delivered to PID", slog.String("pid", fWithIdentity.identity.GetPid().String())) } else if fWithIdentity.identity.GetClusterIdentity() != nil { - plog.Info("Pub-sub message delivered to cluster identity", log.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String())) + c.Logger().Info("Pub-sub message delivered to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String())) } } } diff --git a/cluster/pubsub_producer.go b/cluster/pubsub_producer.go index d29912138..6eafc0594 100644 --- a/cluster/pubsub_producer.go +++ b/cluster/pubsub_producer.go @@ -1,13 +1,13 @@ package cluster import ( + "log/slog" "sync" "sync/atomic" "time" "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/internal/queue/mpsc" - "github.com/asynkron/protoactor-go/log" "golang.org/x/net/context" ) @@ -40,8 +40,9 @@ type BatchingProducerConfig struct { PublisherIdleTimeout time.Duration } -var defaultBatchingProducerLogThrottle = actor.NewThrottle(10, time.Second, func(i int32) { - plog.Info("[BatchingProducer] Throttled logs", log.Int("count", int(i))) +// TODO: fix this +var defaultBatchingProducerLogThrottle = actor.NewThrottleWithLogger(nil, 10, time.Second, func(logger *slog.Logger, i int32) { + logger.Info("[BatchingProducer] Throttled logs", slog.Int("count", int(i))) }) func newBatchingProducerConfig(opts ...BatchingProducerConfigOption) *BatchingProducerConfig { @@ -197,13 +198,13 @@ func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*P func (p *BatchingProducer) publishLoop(ctx context.Context) { defer close(p.loopDone) - plog.Debug("Producer is starting the publisher loop for topic", log.String("topic", p.topic)) + p.publisher.Cluster().ActorSystem.Logger.Debug("Producer is starting the publisher loop for topic", slog.String("topic", p.topic)) batchWrapper := newPubSubBatchWithReceipts() handleUnrecoverableError := func(err error) { p.stopAcceptingNewMessages() if p.config.LogThrottle() == actor.Open { - plog.Error("Error in the publisher loop of Producer for topic", log.String("topic", p.topic), log.Error(err)) + p.publisher.Cluster().ActorSystem.Logger.Error("Error in the publisher loop of Producer for topic", slog.String("topic", p.topic), slog.Any("error", err)) } p.failBatch(batchWrapper, err) p.failPendingMessages(err) @@ -366,7 +367,7 @@ loop: } if p.config.LogThrottle() == actor.Open { - plog.Warn("Error while publishing batch", log.Error(err)) + p.publisher.Cluster().ActorSystem.Logger.Warn("Error while publishing batch", slog.Any("error", err)) } if decision == FailBatchAndContinue { diff --git a/cluster/pubsub_publisher.go b/cluster/pubsub_publisher.go index 84dba0000..c157d99dc 100644 --- a/cluster/pubsub_publisher.go +++ b/cluster/pubsub_publisher.go @@ -20,6 +20,8 @@ type Publisher interface { // Publish publishes a single message to the topic. Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) + + Cluster() *Cluster } type defaultPublisher struct { @@ -32,6 +34,10 @@ func NewPublisher(cluster *Cluster) Publisher { } } +func (p *defaultPublisher) Cluster() *Cluster { + return p.cluster +} + func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) { select { case <-ctx.Done(): diff --git a/cluster/pubsub_topic.go b/cluster/pubsub_topic.go index cb1b5590c..e20548885 100644 --- a/cluster/pubsub_topic.go +++ b/cluster/pubsub_topic.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "log/slog" "strings" "time" @@ -13,8 +14,9 @@ import ( const TopicActorKind = "prototopic" -var topicLogThrottle = actor.NewThrottle(10, time.Second, func(count int32) { - plog.Info("[TopicActor] Throttled logs", log.Int("count", int(count))) +// TODO: fix this +var topicLogThrottle = actor.NewThrottleWithLogger(nil, 10, time.Second, func(logger *slog.Logger, count int32) { + logger.Info("[TopicActor] Throttled logs", slog.Int("count", int(count))) }) type TopicActor struct {