diff --git a/cluster/cluster.go b/cluster/cluster.go index 6c617d20..783249f1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -161,7 +161,7 @@ func (c *Cluster) Get(identity string, kind string) *actor.PID { return c.IdentityLookup.Get(NewClusterIdentity(identity, kind)) } -func (c *Cluster) Request(identity string, kind string, message interface{}) (interface{}, error) { +func (c *Cluster) Request(identity string, kind string, message interface{}, option ...GrainCallOption) (interface{}, error) { return c.context.Request(identity, kind, message) } @@ -203,7 +203,7 @@ func (c *Cluster) ensureTopicKindRegistered() { store := &EmptyKeyValueStore[*Subscribers]{} c.kinds[TopicActorKind] = NewKind(TopicActorKind, actor.PropsFromProducer(func() actor.Actor { - return NewTopicActor(store) + return NewTopicActor(store, c.Logger()) })).Build(c) } } @@ -211,56 +211,3 @@ func (c *Cluster) ensureTopicKindRegistered() { func (c *Cluster) Logger() *slog.Logger { return c.ActorSystem.Logger } - -// Call is a wrap of context.RequestFuture with retries. -func (c *Cluster) Call(name string, kind string, msg interface{}, opts ...GrainCallOption) (interface{}, error) { - callConfig := DefaultGrainCallConfig(c) - for _, o := range opts { - o(callConfig) - } - - _context := callConfig.Context - if _context == nil { - _context = c.ActorSystem.Root - } - - var lastError error - - for i := 0; i < callConfig.RetryCount; i++ { - pid := c.Get(name, kind) - - if pid == nil { - return nil, remote.ErrUnknownError - } - - timeout := callConfig.Timeout - _resp, err := _context.RequestFuture(pid, msg, timeout).Result() - if err != nil { - c.ActorSystem.Logger.Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid)) - lastError = err - - switch err { - case actor.ErrTimeout, remote.ErrTimeout: - callConfig.RetryAction(i) - - id := ClusterIdentity{Kind: kind, Identity: name} - c.PidCache.Remove(id.Identity, id.Kind) - - continue - case actor.ErrDeadLetter, remote.ErrDeadLetter: - callConfig.RetryAction(i) - - id := ClusterIdentity{Kind: kind, Identity: name} - c.PidCache.Remove(id.Identity, id.Kind) - - continue - default: - return nil, err - } - } - - return _resp, nil - } - - return nil, lastError -} diff --git a/cluster/pubsub_delivery.go b/cluster/pubsub_delivery.go index 8ee37442..304d806a 100644 --- a/cluster/pubsub_delivery.go +++ b/cluster/pubsub_delivery.go @@ -75,7 +75,7 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) { if len(invalidDeliveries) > 0 { cluster := GetCluster(c.ActorSystem()) // we use cluster.Call to locate the topic actor in the cluster - _, _ = cluster.Call(batch.Topic, TopicActorKind, &NotifyAboutFailingSubscribersRequest{InvalidDeliveries: invalidDeliveries}) + _, _ = cluster.Request(batch.Topic, TopicActorKind, &NotifyAboutFailingSubscribersRequest{InvalidDeliveries: invalidDeliveries}) } } } diff --git a/cluster/pubsub_extensions.go b/cluster/pubsub_extensions.go index fcc778e5..8f68d538 100644 --- a/cluster/pubsub_extensions.go +++ b/cluster/pubsub_extensions.go @@ -16,7 +16,7 @@ func (c *Cluster) BatchingProducer(topic string, opts ...BatchingProducerConfigO // SubscribeByPid subscribes to a PubSub topic by subscriber PID func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*SubscribeResponse, error) { - res, err := c.Call(topic, TopicActorKind, &SubscribeRequest{ + res, err := c.Request(topic, TopicActorKind, &SubscribeRequest{ Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}}, }, opts...) if err != nil { @@ -27,7 +27,7 @@ func (c *Cluster) SubscribeByPid(topic string, pid *actor.PID, opts ...GrainCall // SubscribeByClusterIdentity subscribes to a PubSub topic by cluster identity func (c *Cluster) SubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*SubscribeResponse, error) { - res, err := c.Call(topic, TopicActorKind, &SubscribeRequest{ + res, err := c.Request(topic, TopicActorKind, &SubscribeRequest{ Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: identity}}, }, opts...) if err != nil { @@ -45,7 +45,7 @@ func (c *Cluster) SubscribeWithReceive(topic string, receive actor.ReceiveFunc, // UnsubscribeByPid unsubscribes from a PubSub topic by subscriber PID func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCallOption) (*UnsubscribeResponse, error) { - res, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{ + res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{ Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_Pid{Pid: pid}}, }, opts...) if err != nil { @@ -56,7 +56,7 @@ func (c *Cluster) UnsubscribeByPid(topic string, pid *actor.PID, opts ...GrainCa // UnsubscribeByClusterIdentity unsubscribes from a PubSub topic by cluster identity func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterIdentity, opts ...GrainCallOption) (*UnsubscribeResponse, error) { - res, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{ + res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{ Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: identity}}, }, opts...) if err != nil { @@ -67,7 +67,7 @@ func (c *Cluster) UnsubscribeByClusterIdentity(topic string, identity *ClusterId // UnsubscribeByIdentityAndKind unsubscribes from a PubSub topic by cluster identity func (c *Cluster) UnsubscribeByIdentityAndKind(topic string, identity string, kind string, opts ...GrainCallOption) (*UnsubscribeResponse, error) { - res, err := c.Call(topic, TopicActorKind, &UnsubscribeRequest{ + res, err := c.Request(topic, TopicActorKind, &UnsubscribeRequest{ Subscriber: &SubscriberIdentity{Identity: &SubscriberIdentity_ClusterIdentity{ClusterIdentity: NewClusterIdentity(identity, kind)}}, }, opts...) if err != nil { diff --git a/cluster/pubsub_producer.go b/cluster/pubsub_producer.go index bfa1ce90..f4191785 100644 --- a/cluster/pubsub_producer.go +++ b/cluster/pubsub_producer.go @@ -70,7 +70,7 @@ type BatchingProducer struct { } func NewBatchingProducer(publisher Publisher, topic string, opts ...BatchingProducerConfigOption) *BatchingProducer { - config := newBatchingProducerConfig(publisher.Cluster.Logger(), opts...) + config := newBatchingProducerConfig(publisher.Cluster().Logger(), opts...) p := &BatchingProducer{ config: config, topic: topic, diff --git a/cluster/pubsub_publisher.go b/cluster/pubsub_publisher.go index c157d99d..8ef4c5db 100644 --- a/cluster/pubsub_publisher.go +++ b/cluster/pubsub_publisher.go @@ -43,7 +43,7 @@ func (p *defaultPublisher) Initialize(ctx context.Context, topic string, config case <-ctx.Done(): return nil, ctx.Err() default: - res, err := p.cluster.Call(topic, TopicActorKind, &Initialize{ + res, err := p.cluster.Request(topic, TopicActorKind, &Initialize{ IdleTimeout: durationpb.New(config.IdleTimeout), }) if err != nil { @@ -58,7 +58,7 @@ func (p *defaultPublisher) PublishBatch(ctx context.Context, topic string, batch case <-ctx.Done(): return nil, ctx.Err() default: - res, err := p.cluster.Call(topic, TopicActorKind, batch, opts...) + res, err := p.cluster.Request(topic, TopicActorKind, batch, opts...) if err != nil { return nil, err }