Skip to content

Commit

Permalink
fix call vs request
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Nov 21, 2023
1 parent efdd7ae commit c7bf963
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 33 deletions.
10 changes: 0 additions & 10 deletions cluster/cluster_test_tool/log.go
Original file line number Diff line number Diff line change
@@ -1,11 +1 @@
package cluster_test_tool

import "github.com/asynkron/protoactor-go/log"

var plog = log.New(log.DebugLevel, "[CLUSTER TEST]")

// SetLogLevel sets the log level for the logger
// SetLogLevel is safe to be called concurrently
func SetLogLevel(level log.Level) {
plog.SetLevel(level)
}
3 changes: 2 additions & 1 deletion cluster/clusterproviders/zk/misc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zk

import (
"log/slog"
"strings"
"testing"

Expand Down Expand Up @@ -48,7 +49,7 @@ func (suite *MiscTestSuite) TestMapString() {
}

func (suite *MiscTestSuite) TestSafeRun() {
suite.NotPanics(func() { safeRun(func() { panic("don't worry, should panic here") }) })
suite.NotPanics(func() { safeRun(slog.Default(), func() { panic("don't worry, should panic here") }) })
}

func (suite *MiscTestSuite) TestNode() {
Expand Down
5 changes: 3 additions & 2 deletions cluster/clusterproviders/zk/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@ func (s *SingletonScheduler) FromProducer(f actor.Producer) *SingletonScheduler
}

func (s *SingletonScheduler) OnRoleChanged(rt RoleType) {

s.Lock()
defer s.Unlock()
if rt == Follower {
if len(s.pids) > 0 {
plog.Info("I am follower, poison singleton actors")
s.root.Logger().Info("I am follower, poison singleton actors")
for _, pid := range s.pids {
s.root.Poison(pid)
}
s.pids = nil
}
} else if rt == Leader {
if len(s.props) > 0 {
plog.Info("I am leader now, start singleton actors")
s.root.Logger().Info("I am leader now, start singleton actors")
s.pids = make([]*actor.PID, len(s.props))
for i, p := range s.props {
s.pids[i] = s.root.Spawn(p)
Expand Down
7 changes: 3 additions & 4 deletions cluster/clusterproviders/zk/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package zk

import (
"fmt"
"log/slog"
"runtime"
"strconv"
"strings"

"github.com/asynkron/protoactor-go/log"
)

func intToStr(i int) string {
Expand Down Expand Up @@ -55,10 +54,10 @@ func mapString(list []string, fn func(string) string) []string {
return l
}

func safeRun(fn func()) {
func safeRun(logger *slog.Logger, fn func()) {
defer func() {
if r := recover(); r != nil {
plog.Warn("OnRoleChanged.", log.Error(fmt.Errorf("%v\n%s", r, string(getRunTimeStack()))))
logger.Warn("OnRoleChanged.", slog.Any("error", fmt.Errorf("%v\n%s", r, string(getRunTimeStack()))))
}
}()
fn()
Expand Down
2 changes: 1 addition & 1 deletion cluster/clusterproviders/zk/zk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (p *Provider) startRoleChangedNotifyLoop() {
for !p.shutdown {
role := <-p.roleChangedChan
if lis := p.roleChangedListener; lis != nil {
safeRun(func() { lis.OnRoleChanged(role) })
safeRun(p.cluster.Logger(), func() { lis.OnRoleChanged(role) })
}
}
}()
Expand Down
3 changes: 1 addition & 2 deletions cluster/clusterproviders/zk/zk_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
"github.com/asynkron/protoactor-go/cluster/identitylookup/disthash"
"github.com/asynkron/protoactor-go/log"
"github.com/asynkron/protoactor-go/remote"
"github.com/stretchr/testify/suite"
)
Expand All @@ -18,7 +17,7 @@ type ZookeeperTestSuite struct {
}

func (suite *ZookeeperTestSuite) SetupTest() {
plog.SetLevel(log.ErrorLevel)

}

func (suite *ZookeeperTestSuite) TearDownTest() {
Expand Down
8 changes: 4 additions & 4 deletions cluster/pubsub_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type BatchingProducer struct {
}

func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer {
config := newBatchingProducerConfig(publisher.Cluster().Logger(), opts...)
config := newBatchingProducerConfig(publisher.Logger(), opts...)
p := &BatchingProducer{
config: config,
topic: topic,
Expand Down Expand Up @@ -195,13 +195,13 @@ func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*P
func (p *BatchingProducer) publishLoop(ctx context.Context) {
defer close(p.loopDone)

p.publisher.Cluster().ActorSystem.Logger.Debug("Producer is starting the publisher loop for topic", slog.String("topic", p.topic))
p.publisher.Logger().Debug("Producer is starting the publisher loop for topic", slog.String("topic", p.topic))
batchWrapper := newPubSubBatchWithReceipts()

handleUnrecoverableError := func(err error) {
p.stopAcceptingNewMessages()
if p.config.LogThrottle() == actor.Open {
p.publisher.Cluster().ActorSystem.Logger.Error("Error in the publisher loop of Producer for topic", slog.String("topic", p.topic), slog.Any("error", err))
p.publisher.Logger().Error("Error in the publisher loop of Producer for topic", slog.String("topic", p.topic), slog.Any("error", err))
}
p.failBatch(batchWrapper, err)
p.failPendingMessages(err)
Expand Down Expand Up @@ -364,7 +364,7 @@ loop:
}

if p.config.LogThrottle() == actor.Open {
p.publisher.Cluster().ActorSystem.Logger.Warn("Error while publishing batch", slog.Any("error", err))
p.publisher.Logger().Warn("Error while publishing batch", slog.Any("error", err))
}

if decision == FailBatchAndContinue {
Expand Down
12 changes: 6 additions & 6 deletions cluster/pubsub_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"log/slog"
"testing"
"time"

Expand Down Expand Up @@ -73,6 +74,7 @@ func (suite *PubSubBatchingProducerTestSuite) timeout() (*PublishResponse, error
}

func (suite *PubSubBatchingProducerTestSuite) TestProducerSendsMessagesInBatches() {

producer := NewBatchingProducer(newMockPublisher(suite.record), "topic", WithBatchingProducerBatchSize(10))
defer producer.Dispose()

Expand Down Expand Up @@ -295,9 +297,8 @@ type mockPublisher struct {
publish func(*PubSubBatch) (*PublishResponse, error)
}

func (m *mockPublisher) Cluster() *Cluster {
//TODO implement me
panic("implement me")
func (m *mockPublisher) Logger() *slog.Logger {
return slog.Default()
}

func newMockPublisher(publish func(*PubSubBatch) (*PublishResponse, error)) *mockPublisher {
Expand All @@ -321,9 +322,8 @@ type optionalFailureMockPublisher struct {
shouldFail bool
}

func (o *optionalFailureMockPublisher) Cluster() *Cluster {
//TODO implement me
panic("implement me")
func (o *optionalFailureMockPublisher) Logger() *slog.Logger {
return slog.Default()
}

// newOptionalFailureMockPublisher creates a mock publisher that can be configured to fail or not
Expand Down
7 changes: 4 additions & 3 deletions cluster/pubsub_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"log/slog"
"time"

"google.golang.org/protobuf/types/known/durationpb"
Expand All @@ -21,7 +22,7 @@ type Publisher interface {
// Publish publishes a single message to the topic.
Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error)

Cluster() *Cluster
Logger() *slog.Logger
}

type defaultPublisher struct {
Expand All @@ -34,8 +35,8 @@ func NewPublisher(cluster *Cluster) Publisher {
}
}

func (p *defaultPublisher) Cluster() *Cluster {
return p.cluster
func (p *defaultPublisher) Logger() *slog.Logger {
return p.cluster.Logger()
}

func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config PublisherConfig) (*Acknowledge, error) {
Expand Down

0 comments on commit c7bf963

Please sign in to comment.