Skip to content

Commit

Permalink
Start using slog
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Nov 20, 2023
1 parent 5d0ab45 commit 747328c
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 64 deletions.
57 changes: 2 additions & 55 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c *Cluster) Get(identity string, kind string) *actor.PID {
return c.IdentityLookup.Get(NewClusterIdentity(identity, kind))
}

func (c *Cluster) Request(identity string, kind string, message interface{}) (interface{}, error) {
func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error) {
return c.context.Request(identity, kind, message)
}

Expand Down Expand Up @@ -203,64 +203,11 @@ func (c *Cluster) ensureTopicKindRegistered() {
store := &EmptyKeyValueStore[*Subscribers]{}

c.kinds[TopicActorKind] = NewKind(TopicActorKind, actor.PropsFromProducer(func() actor.Actor {
return NewTopicActor(store)
return NewTopicActor(store, c.Logger())
})).Build(c)
}
}

func (c *Cluster) Logger() *slog.Logger {
return c.ActorSystem.Logger
}

// Call is a wrap of context.RequestFuture with retries.
func (c *Cluster) Call(name string, kind string, msg interface{}, opts ...GrainCallOption) (interface{}, error) {
callConfig := DefaultGrainCallConfig(c)
for _, o := range opts {
o(callConfig)
}

_context := callConfig.Context
if _context == nil {
_context = c.ActorSystem.Root
}

var lastError error

for i := 0; i < callConfig.RetryCount; i++ {
pid := c.Get(name, kind)

if pid == nil {
return nil, remote.ErrUnknownError
}

timeout := callConfig.Timeout
_resp, err := _context.RequestFuture(pid, msg, timeout).Result()
if err != nil {
c.ActorSystem.Logger.Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid))
lastError = err

switch err {
case actor.ErrTimeout, remote.ErrTimeout:
callConfig.RetryAction(i)

id := ClusterIdentity{Kind: kind, Identity: name}
c.PidCache.Remove(id.Identity, id.Kind)

continue
case actor.ErrDeadLetter, remote.ErrDeadLetter:
callConfig.RetryAction(i)

id := ClusterIdentity{Kind: kind, Identity: name}
c.PidCache.Remove(id.Identity, id.Kind)

continue
default:
return nil, err
}
}

return _resp, nil
}

return nil, lastError
}
2 changes: 1 addition & 1 deletion cluster/pubsub_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) {
if len(invalidDeliveries) > 0 {
cluster := GetCluster(c.ActorSystem())
// we use cluster.Call to locate the topic actor in the cluster
_, _ = cluster.Call(batch.Topic, TopicActorKind, &NotifyAboutFailingSubscribersRequest{InvalidDeliveries: invalidDeliveries})
_, _ = cluster.Request(batch.Topic, TopicActorKind, &NotifyAboutFailingSubscribersRequest{InvalidDeliveries: invalidDeliveries})
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions cluster/pubsub_extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigO

// SubscribeByPid subscribes to a PubSub topic by subscriber PID
func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error) {
res, err := c.Call(topic, TopicActorKind, &SubscribeRequest{
res, err := c.Request(topic, TopicActorKind, &SubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}},
}, opts...)
if err != nil {
Expand All @@ -27,7 +27,7 @@ func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCall

// SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity
func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error) {
res, err := c.Call(topic, TopicActorKind, &SubscribeRequest{
res, err := c.Request(topic, TopicActorKind, &SubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: identity}},
}, opts...)
if err != nil {
Expand All @@ -45,7 +45,7 @@ func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc,

// UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID
func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error) {
res, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{
res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}},
}, opts...)
if err != nil {
Expand All @@ -56,7 +56,7 @@ func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCa

// UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity
func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error) {
res, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{
res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: identity}},
}, opts...)
if err != nil {
Expand All @@ -67,7 +67,7 @@ func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterId

// UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity
func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error) {
res, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{
res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{
Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: NewClusterIdentity(identity, kind)}},
}, opts...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster/pubsub_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type BatchingProducer struct {
}

func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer {
config := newBatchingProducerConfig(publisher.Cluster.Logger(), opts...)
config := newBatchingProducerConfig(publisher.Cluster().Logger(), opts...)
p := &BatchingProducer{
config: config,
topic: topic,
Expand Down
4 changes: 2 additions & 2 deletions cluster/pubsub_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config
case <-ctx.Done():
return nil, ctx.Err()
default:
res, err := p.cluster.Call(topic, TopicActorKind, &Initialize{
res, err := p.cluster.Request(topic, TopicActorKind, &Initialize{
IdleTimeout: durationpb.New(config.IdleTimeout),
})
if err != nil {
Expand All @@ -58,7 +58,7 @@ func (p *defaultPublisher) PublishBatch(ctx context.Context, topic string, batch
case <-ctx.Done():
return nil, ctx.Err()
default:
res, err := p.cluster.Call(topic, TopicActorKind, batch, opts...)
res, err := p.cluster.Request(topic, TopicActorKind, batch, opts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 747328c

Please sign in to comment.