diff --git a/cluster/cluster_test_tool/log.go b/cluster/cluster_test_tool/log.go index d4ff9ccd..7c3f5501 100644 --- a/cluster/cluster_test_tool/log.go +++ b/cluster/cluster_test_tool/log.go @@ -1,11 +1 @@ package cluster_test_tool - -import "github.com/asynkron/protoactor-go/log" - -var plog = log.New(log.DebugLevel, "[CLUSTER TEST]") - -// SetLogLevel sets the log level for the logger -// SetLogLevel is safe to be called concurrently -func SetLogLevel(level log.Level) { - plog.SetLevel(level) -} diff --git a/cluster/clusterproviders/zk/misc_test.go b/cluster/clusterproviders/zk/misc_test.go index 9b8c2a18..d1acb533 100644 --- a/cluster/clusterproviders/zk/misc_test.go +++ b/cluster/clusterproviders/zk/misc_test.go @@ -1,6 +1,7 @@ package zk import ( + "log/slog" "strings" "testing" @@ -48,7 +49,7 @@ func (suite *MiscTestSuite) TestMapString() { } func (suite *MiscTestSuite) TestSafeRun() { - suite.NotPanics(func() { safeRun(func() { panic("don't worry, should panic here") }) }) + suite.NotPanics(func() { safeRun(slog.Default(), func() { panic("don't worry, should panic here") }) }) } func (suite *MiscTestSuite) TestNode() { diff --git a/cluster/clusterproviders/zk/singleton.go b/cluster/clusterproviders/zk/singleton.go index 2ee932bc..bf27ecda 100644 --- a/cluster/clusterproviders/zk/singleton.go +++ b/cluster/clusterproviders/zk/singleton.go @@ -32,11 +32,12 @@ func (s *SingletonScheduler) FromProducer(f actor.Producer) *SingletonScheduler } func (s *SingletonScheduler) OnRoleChanged(rt RoleType) { + s.Lock() defer s.Unlock() if rt == Follower { if len(s.pids) > 0 { - plog.Info("I am follower, poison singleton actors") + s.root.Logger().Info("I am follower, poison singleton actors") for _, pid := range s.pids { s.root.Poison(pid) } @@ -44,7 +45,7 @@ func (s *SingletonScheduler) OnRoleChanged(rt RoleType) { } } else if rt == Leader { if len(s.props) > 0 { - plog.Info("I am leader now, start singleton actors") + s.root.Logger().Info("I am leader now, start singleton actors") s.pids = make([]*actor.PID, len(s.props)) for i, p := range s.props { s.pids[i] = s.root.Spawn(p) diff --git a/cluster/clusterproviders/zk/utils.go b/cluster/clusterproviders/zk/utils.go index ea476bdf..fd3c049f 100644 --- a/cluster/clusterproviders/zk/utils.go +++ b/cluster/clusterproviders/zk/utils.go @@ -2,11 +2,10 @@ package zk import ( "fmt" + "log/slog" "runtime" "strconv" "strings" - - "github.com/asynkron/protoactor-go/log" ) func intToStr(i int) string { @@ -55,10 +54,10 @@ func mapString(list []string, fn func(string) string) []string { return l } -func safeRun(fn func()) { +func safeRun(logger *slog.Logger, fn func()) { defer func() { if r := recover(); r != nil { - plog.Warn("OnRoleChanged.", log.Error(fmt.Errorf("%v\n%s", r, string(getRunTimeStack())))) + logger.Warn("OnRoleChanged.", slog.Any("error", fmt.Errorf("%v\n%s", r, string(getRunTimeStack())))) } }() fn() diff --git a/cluster/clusterproviders/zk/zk_provider.go b/cluster/clusterproviders/zk/zk_provider.go index 787cec53..ae0c707a 100644 --- a/cluster/clusterproviders/zk/zk_provider.go +++ b/cluster/clusterproviders/zk/zk_provider.go @@ -320,7 +320,7 @@ func (p *Provider) startRoleChangedNotifyLoop() { for !p.shutdown { role := <-p.roleChangedChan if lis := p.roleChangedListener; lis != nil { - safeRun(func() { lis.OnRoleChanged(role) }) + safeRun(p.cluster.Logger(), func() { lis.OnRoleChanged(role) }) } } }() diff --git a/cluster/clusterproviders/zk/zk_provider_test.go b/cluster/clusterproviders/zk/zk_provider_test.go index c0849dd2..00f8a4c4 100644 --- a/cluster/clusterproviders/zk/zk_provider_test.go +++ b/cluster/clusterproviders/zk/zk_provider_test.go @@ -8,7 +8,6 @@ import ( "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/cluster" "github.com/asynkron/protoactor-go/cluster/identitylookup/disthash" - "github.com/asynkron/protoactor-go/log" "github.com/asynkron/protoactor-go/remote" "github.com/stretchr/testify/suite" ) @@ -18,7 +17,7 @@ type ZookeeperTestSuite struct { } func (suite *ZookeeperTestSuite) SetupTest() { - plog.SetLevel(log.ErrorLevel) + } func (suite *ZookeeperTestSuite) TearDownTest() { diff --git a/cluster/pubsub_producer.go b/cluster/pubsub_producer.go index f4191785..f584adb4 100644 --- a/cluster/pubsub_producer.go +++ b/cluster/pubsub_producer.go @@ -70,7 +70,7 @@ type BatchingProducer struct { } func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer { - config := newBatchingProducerConfig(publisher.Cluster().Logger(), opts...) + config := newBatchingProducerConfig(publisher.Logger(), opts...) p := &BatchingProducer{ config: config, topic: topic, @@ -195,13 +195,13 @@ func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*P func (p *BatchingProducer) publishLoop(ctx context.Context) { defer close(p.loopDone) - p.publisher.Cluster().ActorSystem.Logger.Debug("Producer is starting the publisher loop for topic", slog.String("topic", p.topic)) + p.publisher.Logger().Debug("Producer is starting the publisher loop for topic", slog.String("topic", p.topic)) batchWrapper := newPubSubBatchWithReceipts() handleUnrecoverableError := func(err error) { p.stopAcceptingNewMessages() if p.config.LogThrottle() == actor.Open { - p.publisher.Cluster().ActorSystem.Logger.Error("Error in the publisher loop of Producer for topic", slog.String("topic", p.topic), slog.Any("error", err)) + p.publisher.Logger().Error("Error in the publisher loop of Producer for topic", slog.String("topic", p.topic), slog.Any("error", err)) } p.failBatch(batchWrapper, err) p.failPendingMessages(err) @@ -364,7 +364,7 @@ loop: } if p.config.LogThrottle() == actor.Open { - p.publisher.Cluster().ActorSystem.Logger.Warn("Error while publishing batch", slog.Any("error", err)) + p.publisher.Logger().Warn("Error while publishing batch", slog.Any("error", err)) } if decision == FailBatchAndContinue { diff --git a/cluster/pubsub_producer_test.go b/cluster/pubsub_producer_test.go index ef439bd2..9d32e8af 100644 --- a/cluster/pubsub_producer_test.go +++ b/cluster/pubsub_producer_test.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "log/slog" "testing" "time" @@ -73,6 +74,7 @@ func (suite *PubSubBatchingProducerTestSuite) timeout() (*PublishResponse, error } func (suite *PubSubBatchingProducerTestSuite) TestProducerSendsMessagesInBatches() { + producer := NewBatchingProducer(newMockPublisher(suite.record), "topic", WithBatchingProducerBatchSize(10)) defer producer.Dispose() @@ -295,9 +297,8 @@ type mockPublisher struct { publish func(*PubSubBatch) (*PublishResponse, error) } -func (m *mockPublisher) Cluster() *Cluster { - //TODO implement me - panic("implement me") +func (m *mockPublisher) Logger() *slog.Logger { + return slog.Default() } func newMockPublisher(publish func(*PubSubBatch) (*PublishResponse, error)) *mockPublisher { @@ -321,9 +322,8 @@ type optionalFailureMockPublisher struct { shouldFail bool } -func (o *optionalFailureMockPublisher) Cluster() *Cluster { - //TODO implement me - panic("implement me") +func (o *optionalFailureMockPublisher) Logger() *slog.Logger { + return slog.Default() } // newOptionalFailureMockPublisher creates a mock publisher that can be configured to fail or not diff --git a/cluster/pubsub_publisher.go b/cluster/pubsub_publisher.go index 8ef4c5db..86379f4a 100644 --- a/cluster/pubsub_publisher.go +++ b/cluster/pubsub_publisher.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "log/slog" "time" "google.golang.org/protobuf/types/known/durationpb" @@ -21,7 +22,7 @@ type Publisher interface { // Publish publishes a single message to the topic. Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) - Cluster() *Cluster + Logger() *slog.Logger } type defaultPublisher struct { @@ -34,8 +35,8 @@ func NewPublisher(cluster *Cluster) Publisher { } } -func (p *defaultPublisher) Cluster() *Cluster { - return p.cluster +func (p *defaultPublisher) Logger() *slog.Logger { + return p.cluster.Logger() } func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) {