diff --git a/cluster/cluster_test_tool/pubsub_cluster_fixture.go b/cluster/cluster_test_tool/pubsub_cluster_fixture.go index fc15505d..c995dff7 100644 --- a/cluster/cluster_test_tool/pubsub_cluster_fixture.go +++ b/cluster/cluster_test_tool/pubsub_cluster_fixture.go @@ -13,6 +13,7 @@ import ( "github.com/asynkron/protoactor-go/cluster" "github.com/stretchr/testify/assert" "golang.org/x/net/context" + "google.golang.org/protobuf/proto" ) const ( @@ -134,7 +135,7 @@ func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.Pub // PublishDataBatch publishes the given messages to the given topic func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error) { - batches := make([]interface{}, 0) + batches := make([]proto.Message, 0) for _, d := range data { batches = append(batches, &DataPublished{Data: int32(d)}) } diff --git a/cluster/pubsub_batch.go b/cluster/pubsub_batch.go index d9745730..e56912c6 100644 --- a/cluster/pubsub_batch.go +++ b/cluster/pubsub_batch.go @@ -3,10 +3,11 @@ package cluster import ( "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/remote" + "google.golang.org/protobuf/proto" ) type PubSubBatch struct { - Envelopes []interface{} + Envelopes []proto.Message } // Serialize converts a PubSubBatch to a PubSubBatchTransport. @@ -46,7 +47,7 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized { // Deserialize converts a PubSubBatchTransport to a PubSubBatch. func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { b := &PubSubBatch{ - Envelopes: make([]interface{}, 0), + Envelopes: make([]proto.Message, 0), } for _, envelope := range t.Envelopes { @@ -54,7 +55,12 @@ func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { if err != nil { panic(err) } - b.Envelopes = append(b.Envelopes, message) + protoMessage, ok := message.(proto.Message) + if !ok { + panic("message is not proto.Message") + } + + b.Envelopes = append(b.Envelopes, protoMessage) } return b } @@ -81,8 +87,10 @@ func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable { } } +var _ actor.MessageBatch = (*PubSubAutoRespondBatch)(nil) + type PubSubAutoRespondBatch struct { - Envelopes []interface{} + Envelopes []proto.Message } // Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport. @@ -104,7 +112,11 @@ func (b *PubSubAutoRespondBatch) GetAutoResponse(_ actor.Context) interface{} { // GetMessages returns the message. func (b *PubSubAutoRespondBatch) GetMessages() []interface{} { - return b.Envelopes + var messages []interface{} + for _, envelope := range b.Envelopes { + messages = append(messages, envelope) + } + return messages } // Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch. diff --git a/cluster/pubsub_delivery.go b/cluster/pubsub_delivery.go index 304d806a..f2d42493 100644 --- a/cluster/pubsub_delivery.go +++ b/cluster/pubsub_delivery.go @@ -46,9 +46,9 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) { identityLog := func(err error) { if p.shouldThrottle() == actor.Open { if fWithIdentity.identity.GetPid() != nil { - c.Logger().Info("Pub-sub message delivered to PID", slog.String("pid", fWithIdentity.identity.GetPid().String())) + c.Logger().Error("Pub-sub message failed to deliver to PID", slog.String("pid", fWithIdentity.identity.GetPid().String()), slog.Any("error", err)) } else if fWithIdentity.identity.GetClusterIdentity() != nil { - c.Logger().Info("Pub-sub message delivered to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String())) + c.Logger().Error("Pub-sub message failed to deliver to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()), slog.Any("error", err)) } } } diff --git a/cluster/pubsub_producer.go b/cluster/pubsub_producer.go index f584adb4..7f775325 100644 --- a/cluster/pubsub_producer.go +++ b/cluster/pubsub_producer.go @@ -9,6 +9,7 @@ import ( "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/internal/queue/mpsc" "golang.org/x/net/context" + "google.golang.org/protobuf/proto" ) // PublishingErrorHandler decides what to do with a publishing error in BatchingProducer @@ -98,13 +99,13 @@ type pubsubBatchWithReceipts struct { // newPubSubBatchWithReceipts creates a new pubsubBatchWithReceipts func newPubSubBatchWithReceipts() *pubsubBatchWithReceipts { return &pubsubBatchWithReceipts{ - batch: &PubSubBatch{Envelopes: make([]interface{}, 0, 10)}, + batch: &PubSubBatch{Envelopes: make([]proto.Message, 0, 10)}, ctxArr: make([]context.Context, 0, 10), } } type produceMessage struct { - message interface{} + message proto.Message ctx context.Context } @@ -171,7 +172,7 @@ func (p *BatchingProducer) getProduceProcessInfo(ctx context.Context) *ProducePr } // Produce a message to producer queue. The return info can be used to wait for the message to be published. -func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*ProduceProcessInfo, error) { +func (p *BatchingProducer) Produce(ctx context.Context, message proto.Message) (*ProduceProcessInfo, error) { ctx, cancel := context.WithCancel(ctx) info := &ProduceProcessInfo{ Finished: make(chan struct{}), @@ -301,7 +302,7 @@ func (p *BatchingProducer) failBatch(batchWrapper *pubsubBatchWithReceipts, err // clearBatch clears the batch wrapper func (p *BatchingProducer) clearBatch(batchWrapper *pubsubBatchWithReceipts) { - batchWrapper.batch = &PubSubBatch{Envelopes: make([]interface{}, 0, 10)} + batchWrapper.batch = &PubSubBatch{Envelopes: make([]proto.Message, 0, 10)} batchWrapper.ctxArr = batchWrapper.ctxArr[:0] } diff --git a/cluster/pubsub_producer_test.go b/cluster/pubsub_producer_test.go index 9d32e8af..2863f80d 100644 --- a/cluster/pubsub_producer_test.go +++ b/cluster/pubsub_producer_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stretchr/testify/suite" + "google.golang.org/protobuf/proto" ) type PubSubBatchingProducerTestSuite struct { @@ -37,7 +38,7 @@ func (suite *PubSubBatchingProducerTestSuite) iter(from, to int) []int { } func (suite *PubSubBatchingProducerTestSuite) record(batch *PubSubBatch) (*PublishResponse, error) { - b := &PubSubBatch{Envelopes: make([]interface{}, 0, len(batch.Envelopes))} + b := &PubSubBatch{Envelopes: make([]proto.Message, 0, len(batch.Envelopes))} b.Envelopes = append(b.Envelopes, batch.Envelopes...) suite.batchesSent = append(suite.batchesSent, b) @@ -74,7 +75,6 @@ func (suite *PubSubBatchingProducerTestSuite) timeout() (*PublishResponse, error } func (suite *PubSubBatchingProducerTestSuite) TestProducerSendsMessagesInBatches() { - producer := NewBatchingProducer(newMockPublisher(suite.record), "topic", WithBatchingProducerBatchSize(10)) defer producer.Dispose() @@ -313,8 +313,8 @@ func (m *mockPublisher) PublishBatch(_ context.Context, topic string, batch *Pub return m.publish(batch) } -func (m *mockPublisher) Publish(_ context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) { - return m.publish(&PubSubBatch{Envelopes: []interface{}{message}}) +func (m *mockPublisher) Publish(_ context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) { + return m.publish(&PubSubBatch{Envelopes: []proto.Message{message}}) } type optionalFailureMockPublisher struct { @@ -339,15 +339,15 @@ func (o *optionalFailureMockPublisher) PublishBatch(ctx context.Context, topic s if o.shouldFail { return nil, &testException{} } - copiedBatch := &PubSubBatch{Envelopes: make([]interface{}, len(batch.Envelopes))} + copiedBatch := &PubSubBatch{Envelopes: make([]proto.Message, len(batch.Envelopes))} copy(copiedBatch.Envelopes, batch.Envelopes) o.sentBatches = append(o.sentBatches, copiedBatch) return &PublishResponse{}, nil } -func (o *optionalFailureMockPublisher) Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) { - return o.PublishBatch(ctx, topic, &PubSubBatch{Envelopes: []interface{}{message}}, opts...) +func (o *optionalFailureMockPublisher) Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) { + return o.PublishBatch(ctx, topic, &PubSubBatch{Envelopes: []proto.Message{message}}, opts...) } type testException struct{} diff --git a/cluster/pubsub_publisher.go b/cluster/pubsub_publisher.go index 86379f4a..165465c1 100644 --- a/cluster/pubsub_publisher.go +++ b/cluster/pubsub_publisher.go @@ -5,6 +5,7 @@ import ( "log/slog" "time" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" ) @@ -20,7 +21,7 @@ type Publisher interface { PublishBatch(ctx context.Context, topic string, batch *PubSubBatch, opts ...GrainCallOption) (*PublishResponse, error) // Publish publishes a single message to the topic. - Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) + Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) Logger() *slog.Logger } @@ -67,8 +68,8 @@ func (p *defaultPublisher) PublishBatch(ctx context.Context, topic string, batch } } -func (p *defaultPublisher) Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) { +func (p *defaultPublisher) Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) { return p.PublishBatch(ctx, topic, &PubSubBatch{ - Envelopes: []interface{}{message}, + Envelopes: []proto.Message{message}, }, opts...) }