Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publish should only accept proto.Message #1010

Merged
merged 3 commits into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cluster/cluster_test_tool/pubsub_cluster_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/asynkron/protoactor-go/cluster"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/protobuf/proto"
)

const (
Expand Down Expand Up @@ -134,7 +135,7 @@ func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.Pub

// PublishDataBatch publishes the given messages to the given topic
func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error) {
batches := make([]interface{}, 0)
batches := make([]proto.Message, 0)
for _, d := range data {
batches = append(batches, &DataPublished{Data: int32(d)})
}
Expand Down
22 changes: 17 additions & 5 deletions cluster/pubsub_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cluster
import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/remote"
"google.golang.org/protobuf/proto"
)

type PubSubBatch struct {
Envelopes []interface{}
Envelopes []proto.Message
}

// Serialize converts a PubSubBatch to a PubSubBatchTransport.
Expand Down Expand Up @@ -46,15 +47,20 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
// Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable {
b := &PubSubBatch{
Envelopes: make([]interface{}, 0),
Envelopes: make([]proto.Message, 0),
}

for _, envelope := range t.Envelopes {
message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
panic(err)
}
b.Envelopes = append(b.Envelopes, message)
protoMessage, ok := message.(proto.Message)
if !ok {
panic("message is not proto.Message")
}

b.Envelopes = append(b.Envelopes, protoMessage)
}
return b
}
Expand All @@ -81,8 +87,10 @@ func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable {
}
}

var _ actor.MessageBatch = (*PubSubAutoRespondBatch)(nil)

type PubSubAutoRespondBatch struct {
Envelopes []interface{}
Envelopes []proto.Message
}

// Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
Expand All @@ -104,7 +112,11 @@ func (b *PubSubAutoRespondBatch) GetAutoResponse(_ actor.Context) interface{} {

// GetMessages returns the message.
func (b *PubSubAutoRespondBatch) GetMessages() []interface{} {
return b.Envelopes
var messages []interface{}
for _, envelope := range b.Envelopes {
messages = append(messages, envelope)
}
return messages
}

// Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
Expand Down
4 changes: 2 additions & 2 deletions cluster/pubsub_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) {
identityLog := func(err error) {
if p.shouldThrottle() == actor.Open {
if fWithIdentity.identity.GetPid() != nil {
c.Logger().Info("Pub-sub message delivered to PID", slog.String("pid", fWithIdentity.identity.GetPid().String()))
c.Logger().Error("Pub-sub message failed to deliver to PID", slog.String("pid", fWithIdentity.identity.GetPid().String()), slog.Any("error", err))
} else if fWithIdentity.identity.GetClusterIdentity() != nil {
c.Logger().Info("Pub-sub message delivered to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()))
c.Logger().Error("Pub-sub message failed to deliver to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()), slog.Any("error", err))
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions cluster/pubsub_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/internal/queue/mpsc"
"golang.org/x/net/context"
"google.golang.org/protobuf/proto"
)

// PublishingErrorHandler decides what to do with a publishing error in BatchingProducer
Expand Down Expand Up @@ -98,13 +99,13 @@ type pubsubBatchWithReceipts struct {
// newPubSubBatchWithReceipts creates a new pubsubBatchWithReceipts
func newPubSubBatchWithReceipts() *pubsubBatchWithReceipts {
return &pubsubBatchWithReceipts{
batch: &PubSubBatch{Envelopes: make([]interface{}, 0, 10)},
batch: &PubSubBatch{Envelopes: make([]proto.Message, 0, 10)},
ctxArr: make([]context.Context, 0, 10),
}
}

type produceMessage struct {
message interface{}
message proto.Message
ctx context.Context
}

Expand Down Expand Up @@ -171,7 +172,7 @@ func (p *BatchingProducer) getProduceProcessInfo(ctx context.Context) *ProducePr
}

// Produce a message to producer queue. The return info can be used to wait for the message to be published.
func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*ProduceProcessInfo, error) {
func (p *BatchingProducer) Produce(ctx context.Context, message proto.Message) (*ProduceProcessInfo, error) {
ctx, cancel := context.WithCancel(ctx)
info := &ProduceProcessInfo{
Finished: make(chan struct{}),
Expand Down Expand Up @@ -301,7 +302,7 @@ func (p *BatchingProducer) failBatch(batchWrapper *pubsubBatchWithReceipts, err

// clearBatch clears the batch wrapper
func (p *BatchingProducer) clearBatch(batchWrapper *pubsubBatchWithReceipts) {
batchWrapper.batch = &PubSubBatch{Envelopes: make([]interface{}, 0, 10)}
batchWrapper.batch = &PubSubBatch{Envelopes: make([]proto.Message, 0, 10)}
batchWrapper.ctxArr = batchWrapper.ctxArr[:0]
}

Expand Down
14 changes: 7 additions & 7 deletions cluster/pubsub_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"
)

type PubSubBatchingProducerTestSuite struct {
Expand Down Expand Up @@ -37,7 +38,7 @@ func (suite *PubSubBatchingProducerTestSuite) iter(from, to int) []int {
}

func (suite *PubSubBatchingProducerTestSuite) record(batch *PubSubBatch) (*PublishResponse, error) {
b := &PubSubBatch{Envelopes: make([]interface{}, 0, len(batch.Envelopes))}
b := &PubSubBatch{Envelopes: make([]proto.Message, 0, len(batch.Envelopes))}
b.Envelopes = append(b.Envelopes, batch.Envelopes...)

suite.batchesSent = append(suite.batchesSent, b)
Expand Down Expand Up @@ -74,7 +75,6 @@ func (suite *PubSubBatchingProducerTestSuite) timeout() (*PublishResponse, error
}

func (suite *PubSubBatchingProducerTestSuite) TestProducerSendsMessagesInBatches() {

producer := NewBatchingProducer(newMockPublisher(suite.record), "topic", WithBatchingProducerBatchSize(10))
defer producer.Dispose()

Expand Down Expand Up @@ -313,8 +313,8 @@ func (m *mockPublisher) PublishBatch(_ context.Context, topic string, batch *Pub
return m.publish(batch)
}

func (m *mockPublisher) Publish(_ context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) {
return m.publish(&PubSubBatch{Envelopes: []interface{}{message}})
func (m *mockPublisher) Publish(_ context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) {
return m.publish(&PubSubBatch{Envelopes: []proto.Message{message}})
}

type optionalFailureMockPublisher struct {
Expand All @@ -339,15 +339,15 @@ func (o *optionalFailureMockPublisher) PublishBatch(ctx context.Context, topic s
if o.shouldFail {
return nil, &testException{}
}
copiedBatch := &PubSubBatch{Envelopes: make([]interface{}, len(batch.Envelopes))}
copiedBatch := &PubSubBatch{Envelopes: make([]proto.Message, len(batch.Envelopes))}
copy(copiedBatch.Envelopes, batch.Envelopes)

o.sentBatches = append(o.sentBatches, copiedBatch)
return &PublishResponse{}, nil
}

func (o *optionalFailureMockPublisher) Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) {
return o.PublishBatch(ctx, topic, &PubSubBatch{Envelopes: []interface{}{message}}, opts...)
func (o *optionalFailureMockPublisher) Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) {
return o.PublishBatch(ctx, topic, &PubSubBatch{Envelopes: []proto.Message{message}}, opts...)
}

type testException struct{}
Expand Down
7 changes: 4 additions & 3 deletions cluster/pubsub_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"time"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand All @@ -20,7 +21,7 @@ type Publisher interface {
PublishBatch(ctx context.Context, topic string, batch *PubSubBatch, opts ...GrainCallOption) (*PublishResponse, error)

// Publish publishes a single message to the topic.
Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error)
Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error)

Logger() *slog.Logger
}
Expand Down Expand Up @@ -67,8 +68,8 @@ func (p *defaultPublisher) PublishBatch(ctx context.Context, topic string, batch
}
}

func (p *defaultPublisher) Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) {
func (p *defaultPublisher) Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) {
return p.PublishBatch(ctx, topic, &PubSubBatch{
Envelopes: []interface{}{message},
Envelopes: []proto.Message{message},
}, opts...)
}