Skip to content

Commit

Permalink
Merge pull request #1010 from qazwsxedckll/pubsub-protoMessage
Browse files Browse the repository at this point in the history
feat: publish should only accept proto.Message
  • Loading branch information
rogeralsing authored Dec 31, 2023
2 parents 1fcdf9e + 4c65054 commit 2ecba75
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 22 deletions.
3 changes: 2 additions & 1 deletion cluster/cluster_test_tool/pubsub_cluster_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/asynkron/protoactor-go/cluster"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"google.golang.org/protobuf/proto"
)

const (
Expand Down Expand Up @@ -134,7 +135,7 @@ func (p *PubSubClusterFixture) PublishData(topic string, data int) (*cluster.Pub

// PublishDataBatch publishes the given messages to the given topic
func (p *PubSubClusterFixture) PublishDataBatch(topic string, data []int) (*cluster.PublishResponse, error) {
batches := make([]interface{}, 0)
batches := make([]proto.Message, 0)
for _, d := range data {
batches = append(batches, &DataPublished{Data: int32(d)})
}
Expand Down
22 changes: 17 additions & 5 deletions cluster/pubsub_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cluster
import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/remote"
"google.golang.org/protobuf/proto"
)

type PubSubBatch struct {
Envelopes []interface{}
Envelopes []proto.Message
}

// Serialize converts a PubSubBatch to a PubSubBatchTransport.
Expand Down Expand Up @@ -46,15 +47,20 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
// Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable {
b := &PubSubBatch{
Envelopes: make([]interface{}, 0),
Envelopes: make([]proto.Message, 0),
}

for _, envelope := range t.Envelopes {
message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
panic(err)
}
b.Envelopes = append(b.Envelopes, message)
protoMessage, ok := message.(proto.Message)
if !ok {
panic("message is not proto.Message")
}

b.Envelopes = append(b.Envelopes, protoMessage)
}
return b
}
Expand All @@ -81,8 +87,10 @@ func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable {
}
}

var _ actor.MessageBatch = (*PubSubAutoRespondBatch)(nil)

type PubSubAutoRespondBatch struct {
Envelopes []interface{}
Envelopes []proto.Message
}

// Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
Expand All @@ -104,7 +112,11 @@ func (b *PubSubAutoRespondBatch) GetAutoResponse(_ actor.Context) interface{} {

// GetMessages returns the message.
func (b *PubSubAutoRespondBatch) GetMessages() []interface{} {
return b.Envelopes
var messages []interface{}
for _, envelope := range b.Envelopes {
messages = append(messages, envelope)
}
return messages
}

// Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
Expand Down
4 changes: 2 additions & 2 deletions cluster/pubsub_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (p *PubSubMemberDeliveryActor) Receive(c actor.Context) {
identityLog := func(err error) {
if p.shouldThrottle() == actor.Open {
if fWithIdentity.identity.GetPid() != nil {
c.Logger().Info("Pub-sub message delivered to PID", slog.String("pid", fWithIdentity.identity.GetPid().String()))
c.Logger().Error("Pub-sub message failed to deliver to PID", slog.String("pid", fWithIdentity.identity.GetPid().String()), slog.Any("error", err))
} else if fWithIdentity.identity.GetClusterIdentity() != nil {
c.Logger().Info("Pub-sub message delivered to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()))
c.Logger().Error("Pub-sub message failed to deliver to cluster identity", slog.String("cluster identity", fWithIdentity.identity.GetClusterIdentity().String()), slog.Any("error", err))
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions cluster/pubsub_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/internal/queue/mpsc"
"golang.org/x/net/context"
"google.golang.org/protobuf/proto"
)

// PublishingErrorHandler decides what to do with a publishing error in BatchingProducer
Expand Down Expand Up @@ -98,13 +99,13 @@ type pubsubBatchWithReceipts struct {
// newPubSubBatchWithReceipts creates a new pubsubBatchWithReceipts
func newPubSubBatchWithReceipts() *pubsubBatchWithReceipts {
return &pubsubBatchWithReceipts{
batch: &PubSubBatch{Envelopes: make([]interface{}, 0, 10)},
batch: &PubSubBatch{Envelopes: make([]proto.Message, 0, 10)},
ctxArr: make([]context.Context, 0, 10),
}
}

type produceMessage struct {
message interface{}
message proto.Message
ctx context.Context
}

Expand Down Expand Up @@ -171,7 +172,7 @@ func (p *BatchingProducer) getProduceProcessInfo(ctx context.Context) *ProducePr
}

// Produce a message to producer queue. The return info can be used to wait for the message to be published.
func (p *BatchingProducer) Produce(ctx context.Context, message interface{}) (*ProduceProcessInfo, error) {
func (p *BatchingProducer) Produce(ctx context.Context, message proto.Message) (*ProduceProcessInfo, error) {
ctx, cancel := context.WithCancel(ctx)
info := &ProduceProcessInfo{
Finished: make(chan struct{}),
Expand Down Expand Up @@ -301,7 +302,7 @@ func (p *BatchingProducer) failBatch(batchWrapper *pubsubBatchWithReceipts, err

// clearBatch clears the batch wrapper
func (p *BatchingProducer) clearBatch(batchWrapper *pubsubBatchWithReceipts) {
batchWrapper.batch = &PubSubBatch{Envelopes: make([]interface{}, 0, 10)}
batchWrapper.batch = &PubSubBatch{Envelopes: make([]proto.Message, 0, 10)}
batchWrapper.ctxArr = batchWrapper.ctxArr[:0]
}

Expand Down
14 changes: 7 additions & 7 deletions cluster/pubsub_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"
)

type PubSubBatchingProducerTestSuite struct {
Expand Down Expand Up @@ -37,7 +38,7 @@ func (suite *PubSubBatchingProducerTestSuite) iter(from, to int) []int {
}

func (suite *PubSubBatchingProducerTestSuite) record(batch *PubSubBatch) (*PublishResponse, error) {
b := &PubSubBatch{Envelopes: make([]interface{}, 0, len(batch.Envelopes))}
b := &PubSubBatch{Envelopes: make([]proto.Message, 0, len(batch.Envelopes))}
b.Envelopes = append(b.Envelopes, batch.Envelopes...)

suite.batchesSent = append(suite.batchesSent, b)
Expand Down Expand Up @@ -74,7 +75,6 @@ func (suite *PubSubBatchingProducerTestSuite) timeout() (*PublishResponse, error
}

func (suite *PubSubBatchingProducerTestSuite) TestProducerSendsMessagesInBatches() {

producer := NewBatchingProducer(newMockPublisher(suite.record), "topic", WithBatchingProducerBatchSize(10))
defer producer.Dispose()

Expand Down Expand Up @@ -313,8 +313,8 @@ func (m *mockPublisher) PublishBatch(_ context.Context, topic string, batch *Pub
return m.publish(batch)
}

func (m *mockPublisher) Publish(_ context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) {
return m.publish(&PubSubBatch{Envelopes: []interface{}{message}})
func (m *mockPublisher) Publish(_ context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) {
return m.publish(&PubSubBatch{Envelopes: []proto.Message{message}})
}

type optionalFailureMockPublisher struct {
Expand All @@ -339,15 +339,15 @@ func (o *optionalFailureMockPublisher) PublishBatch(ctx context.Context, topic s
if o.shouldFail {
return nil, &testException{}
}
copiedBatch := &PubSubBatch{Envelopes: make([]interface{}, len(batch.Envelopes))}
copiedBatch := &PubSubBatch{Envelopes: make([]proto.Message, len(batch.Envelopes))}
copy(copiedBatch.Envelopes, batch.Envelopes)

o.sentBatches = append(o.sentBatches, copiedBatch)
return &PublishResponse{}, nil
}

func (o *optionalFailureMockPublisher) Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) {
return o.PublishBatch(ctx, topic, &PubSubBatch{Envelopes: []interface{}{message}}, opts...)
func (o *optionalFailureMockPublisher) Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) {
return o.PublishBatch(ctx, topic, &PubSubBatch{Envelopes: []proto.Message{message}}, opts...)
}

type testException struct{}
Expand Down
7 changes: 4 additions & 3 deletions cluster/pubsub_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log/slog"
"time"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

Expand All @@ -20,7 +21,7 @@ type Publisher interface {
PublishBatch(ctx context.Context, topic string, batch *PubSubBatch, opts ...GrainCallOption) (*PublishResponse, error)

// Publish publishes a single message to the topic.
Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error)
Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error)

Logger() *slog.Logger
}
Expand Down Expand Up @@ -67,8 +68,8 @@ func (p *defaultPublisher) PublishBatch(ctx context.Context, topic string, batch
}
}

func (p *defaultPublisher) Publish(ctx context.Context, topic string, message interface{}, opts ...GrainCallOption) (*PublishResponse, error) {
func (p *defaultPublisher) Publish(ctx context.Context, topic string, message proto.Message, opts ...GrainCallOption) (*PublishResponse, error) {
return p.PublishBatch(ctx, topic, &PubSubBatch{
Envelopes: []interface{}{message},
Envelopes: []proto.Message{message},
}, opts...)
}

0 comments on commit 2ecba75

Please sign in to comment.