Skip to content

Commit

Permalink
Start using slog
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Nov 19, 2023
1 parent 2a61330 commit 0d1a8a8
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 23 deletions.
2 changes: 1 addition & 1 deletion actor/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func (ctx *actorContext) processMessage(m interface{}) {

func (ctx *actorContext) incarnateActor() {
atomic.StoreInt32(&ctx.state, stateAlive)
ctx.actor = ctx.props.producer()
ctx.actor = ctx.props.producer(ctx.actorSystem)

metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
if ok && metricsSystem.enabled {
Expand Down
2 changes: 1 addition & 1 deletion actor/props.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var ErrNameExists = errors.New("spawn: name exists")
// Props represents configuration to define how an actor should be created.
type Props struct {
spawner SpawnFunc
producer Producer
producer ProducerWithActorSystem
mailboxProducer MailboxProducer
guardianStrategy SupervisorStrategy
supervisionStrategy SupervisorStrategy
Expand Down
8 changes: 4 additions & 4 deletions actor/props_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func WithOnInit(init ...func(ctx Context)) PropsOption {

func WithProducer(p Producer) PropsOption {
return func(props *Props) {
props.producer = p
props.producer = func(*ActorSystem) Actor { return p() }
}
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func WithSpawnFunc(spawn SpawnFunc) PropsOption {

func WithFunc(f ReceiveFunc) PropsOption {
return func(props *Props) {
props.producer = func() Actor { return f }
props.producer = func(system *ActorSystem) Actor { return f }
}
}

Expand All @@ -100,7 +100,7 @@ func WithSpawnMiddleware(middleware ...SpawnMiddleware) PropsOption {
// PropsFromProducer creates a props with the given actor producer assigned.
func PropsFromProducer(producer Producer, opts ...PropsOption) *Props {
p := &Props{
producer: producer,
producer: func(*ActorSystem) Actor { return producer() },
contextDecorator: make([]ContextDecorator, 0),
}
p.Configure(opts...)
Expand All @@ -127,7 +127,7 @@ func PropsFromFunc(f ReceiveFunc, opts ...PropsOption) *Props {
}

func (props *Props) Clone(opts ...PropsOption) *Props {
cp := PropsFromProducer(props.producer,
cp := PropsFromProducerWithActorSystem(props.producer,
WithDispatcher(props.dispatcher),
WithMailbox(props.mailboxProducer),
WithContextDecorator(props.contextDecorator...),
Expand Down
8 changes: 5 additions & 3 deletions cluster/cluster_config_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cluster

import (
"fmt"
"log/slog"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -32,11 +33,12 @@ func NewDefaultClusterContextConfig() *ClusterContextConfig {
RequestsLogThrottlePeriod: defaultRequestsLogThrottlePeriod,
MaxNumberOfEventsInRequestLogThrottledPeriod: defaultMaxNumberOfEvetsInRequestLogThrottledPeriod,
RetryAction: defaultRetryAction,
requestLogThrottle: actor.NewThrottle(
//TODO: fix this
requestLogThrottle: actor.NewThrottleWithLogger(nil,
int32(defaultMaxNumberOfEvetsInRequestLogThrottledPeriod),
defaultRequestsLogThrottlePeriod,
func(i int32) {
plog.Info(fmt.Sprintf("Throttled %d Request logs", i))
func(logger *slog.Logger, i int32) {
logger.Info(fmt.Sprintf("Throttled %d Request logs", i))
},
),
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (p *PubSub) Start() {
if err != nil {
panic(err) // let it crash
}
plog.Info("Started Cluster PubSub")
p.cluster.ActorSystem.Logger.Info("Started Cluster PubSub")
}

func (p *PubSub) ExtensionID() extensions.ExtensionID {
Expand Down
11 changes: 6 additions & 5 deletions cluster/pubsub_delivery.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package cluster

import (
"log/slog"
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/log"
"github.com/asynkron/protoactor-go/remote"
)

var pubsubMemberDeliveryLogThrottle = actor.NewThrottle(10, time.Second, func(i int32) {
plog.Warn("[PubSubMemberDeliveryActor] Throttled logs", log.Int("count", int(i)))
// TODO: fix this
var pubsubMemberDeliveryLogThrottle = actor.NewThrottleWithLogger(nil, 10, time.Second, func(logger *slog.Logger, i int32) {
logger.Warn("[PubSubMemberDeliveryActor] Throttled logs", slog.Int("count", int(i)))
})

type PubSubMemberDeliveryActor struct {
Expand Down Expand Up @@ -46,9 +47,9 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) {
identityLog := func(err error) {
if pubsubMemberDeliveryLogThrottle() == actor.Open {
if fWithIdentity.identity.GetPid() != nil {
plog.Info("Pub-sub message delivered to PID", log.String("pid", fWithIdentity.identity.GetPid().String()))
c.Logger().Info("Pub-sub message delivered to PID", slog.String("pid", fWithIdentity.identity.GetPid().String()))
} else if fWithIdentity.identity.GetClusterIdentity() != nil {
plog.Info("Pub-sub message delivered to cluster identity", log.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()))
c.Logger().Info("Pub-sub message delivered to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()))
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions cluster/pubsub_producer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package cluster

import (
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/internal/queue/mpsc"
"github.com/asynkron/protoactor-go/log"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -40,8 +40,9 @@ type BatchingProducerConfig struct {
PublisherIdleTimeout time.Duration
}

var defaultBatchingProducerLogThrottle = actor.NewThrottle(10, time.Second, func(i int32) {
plog.Info("[BatchingProducer] Throttled logs", log.Int("count", int(i)))
// TODO: fix this
var defaultBatchingProducerLogThrottle = actor.NewThrottleWithLogger(nil, 10, time.Second, func(logger *slog.Logger, i int32) {
logger.Info("[BatchingProducer] Throttled logs", slog.Int("count", int(i)))
})

func newBatchingProducerConfig(opts ...BatchingProducerConfigOption) *BatchingProducerConfig {
Expand Down Expand Up @@ -197,13 +198,13 @@ func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*P
func (p *BatchingProducer) publishLoop(ctx context.Context) {
defer close(p.loopDone)

plog.Debug("Producer is starting the publisher loop for topic", log.String("topic", p.topic))
p.publisher.Cluster().ActorSystem.Logger.Debug("Producer is starting the publisher loop for topic", slog.String("topic", p.topic))
batchWrapper := newPubSubBatchWithReceipts()

handleUnrecoverableError := func(err error) {
p.stopAcceptingNewMessages()
if p.config.LogThrottle() == actor.Open {
plog.Error("Error in the publisher loop of Producer for topic", log.String("topic", p.topic), log.Error(err))
p.publisher.Cluster().ActorSystem.Logger.Error("Error in the publisher loop of Producer for topic", slog.String("topic", p.topic), slog.Any("error", err))
}
p.failBatch(batchWrapper, err)
p.failPendingMessages(err)
Expand Down Expand Up @@ -366,7 +367,7 @@ loop:
}

if p.config.LogThrottle() == actor.Open {
plog.Warn("Error while publishing batch", log.Error(err))
p.publisher.Cluster().ActorSystem.Logger.Warn("Error while publishing batch", slog.Any("error", err))
}

if decision == FailBatchAndContinue {
Expand Down
6 changes: 6 additions & 0 deletions cluster/pubsub_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Publisher interface {

// Publish publishes a single message to the topic.
Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error)

Cluster() *Cluster
}

type defaultPublisher struct {
Expand All @@ -32,6 +34,10 @@ func NewPublisher(cluster *Cluster) Publisher {
}
}

func (p *defaultPublisher) Cluster() *Cluster {
return p.cluster
}

func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) {
select {
case <-ctx.Done():
Expand Down
6 changes: 4 additions & 2 deletions cluster/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"log/slog"
"strings"
"time"

Expand All @@ -13,8 +14,9 @@ import (

const TopicActorKind = "prototopic"

var topicLogThrottle = actor.NewThrottle(10, time.Second, func(count int32) {
plog.Info("[TopicActor] Throttled logs", log.Int("count", int(count)))
// TODO: fix this
var topicLogThrottle = actor.NewThrottleWithLogger(nil, 10, time.Second, func(logger *slog.Logger, count int32) {
logger.Info("[TopicActor] Throttled logs", slog.Int("count", int(count)))
})

type TopicActor struct {
Expand Down

0 comments on commit 0d1a8a8

Please sign in to comment.