Skip to content

Commit

Permalink
fix: skip when remote message cannot be serialized
Browse files Browse the repository at this point in the history
  • Loading branch information
qazwsxedckll committed Jan 5, 2024
1 parent 9685d4e commit ad7304a
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 32 deletions.
3 changes: 2 additions & 1 deletion actor/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"runtime/debug"
"sync/atomic"
"time"

Expand Down Expand Up @@ -702,10 +703,10 @@ func (ctx *actorContext) finalizeStop() {
//

func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) {
// TODO: add callstack to log?
ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
// debug setting, allows to output supervision failures in console/error level
if ctx.actorSystem.Config.DeveloperSupervisionLogging {
fmt.Printf("debug.Stack(): %s\n", debug.Stack())
fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason)
ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
}
Expand Down
70 changes: 50 additions & 20 deletions cluster/pubsub_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ import (
"github.com/asynkron/protoactor-go/remote"
)

var (
_ remote.RootSerializable = (*PubSubBatch)(nil)
_ remote.RootSerializable = (*DeliverBatchRequest)(nil)
_ remote.RootSerializable = (*PubSubAutoRespondBatch)(nil)

_ remote.RootSerialized = (*PubSubBatchTransport)(nil)
_ remote.RootSerialized = (*DeliverBatchRequestTransport)(nil)
_ remote.RootSerialized = (*PubSubAutoRespondBatchTransport)(nil)
)

type PubSubBatch struct {
Envelopes []interface{}
}

// Serialize converts a PubSubBatch to a PubSubBatchTransport.
func (b *PubSubBatch) Serialize() remote.RootSerialized {
func (b *PubSubBatch) Serialize() (remote.RootSerialized, error) {
batch := &PubSubBatchTransport{
TypeNames: make([]string, 0),
Envelopes: make([]*PubSubEnvelope, 0),
Expand All @@ -20,7 +30,7 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
var serializerId int32
messageData, typeName, err := remote.Serialize(envelope, serializerId)
if err != nil {
panic(err)
return nil, err
}
// batch.TypeNames.IndexOf(typeName)
typeIndex := -1
Expand All @@ -40,23 +50,23 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
SerializerId: serializerId,
})
}
return batch
return batch, nil
}

// Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable {
func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error) {
b := &PubSubBatch{
Envelopes: make([]interface{}, 0),
}

for _, envelope := range t.Envelopes {
message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
panic(err)
return nil, err
}
b.Envelopes = append(b.Envelopes, message)
}
return b
return b, nil
}

type DeliverBatchRequest struct {
Expand All @@ -65,34 +75,49 @@ type DeliverBatchRequest struct {
Topic string
}

func (d *DeliverBatchRequest) Serialize() remote.RootSerialized {
func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error) {
rs, err := d.PubSubBatch.Serialize()
if err != nil {
return nil, err
}

return &DeliverBatchRequestTransport{
Subscribers: d.Subscribers,
Batch: d.PubSubBatch.Serialize().(*PubSubBatchTransport),
Batch: rs.(*PubSubBatchTransport),
Topic: d.Topic,
}
}, nil
}

func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable {
func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error) {
rs, err := t.Batch.Deserialize()
if err != nil {
return nil, err
}

return &DeliverBatchRequest{
Subscribers: t.Subscribers,
PubSubBatch: t.Batch.Deserialize().(*PubSubBatch),
PubSubBatch: rs.(*PubSubBatch),
Topic: t.Topic,
}
}, nil
}

type PubSubAutoRespondBatch struct {
Envelopes []interface{}
}

// Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized {
func (b *PubSubAutoRespondBatch) Serialize() (remote.RootSerialized, error) {
batch := &PubSubBatch{Envelopes: b.Envelopes}
transport := batch.Serialize().(*PubSubBatchTransport)
return &PubSubAutoRespondBatchTransport{
TypeNames: transport.TypeNames,
Envelopes: transport.Envelopes,

rs, err := batch.Serialize()
if err != nil {
return nil, err
}

return &PubSubAutoRespondBatchTransport{
TypeNames: rs.(*PubSubBatchTransport).TypeNames,
Envelopes: rs.(*PubSubBatchTransport).Envelopes,
}, nil
}

// GetAutoResponse returns a PublishResponse.
Expand All @@ -108,12 +133,17 @@ func (b *PubSubAutoRespondBatch) GetMessages() []interface{} {
}

// Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable {
func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error) {
batch := &PubSubBatchTransport{
TypeNames: t.TypeNames,
Envelopes: t.Envelopes,
}
return &PubSubAutoRespondBatch{
Envelopes: batch.Deserialize().(*PubSubBatch).Envelopes,
rs, err := batch.Deserialize()
if err != nil {
return nil, err
}

return &PubSubAutoRespondBatch{
Envelopes: rs.(*PubSubBatch).Envelopes,
}, nil
}
6 changes: 5 additions & 1 deletion remote/endpoint_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (s *endpointReader) onMessageBatch(m *MessageBatch) error {
// translate from on-the-wire representation to in-process representation
// this only applies to root level messages, and never on nested child messages
if v, ok := message.(RootSerialized); ok {
message = v.Deserialize()
message, err = v.Deserialize()
if err != nil {
s.remote.Logger().Error("EndpointReader failed to deserialize", slog.Any("error", err))
return err
}
}

switch msg := message.(type) {
Expand Down
24 changes: 17 additions & 7 deletions remote/endpoint_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (state *endpointWriter) initializeInternal() error {
}

func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) {
envelopes := make([]*MessageEnvelope, len(msg))
envelopes := make([]*MessageEnvelope, 0)

// type name uniqueness map name string to type index
typeNames := make(map[string]int32)
Expand All @@ -175,7 +175,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context)
serializerID int32
)

for i, tmp := range msg {
for _, tmp := range msg {
switch unwrapped := tmp.(type) {
case *EndpointTerminatedEvent, EndpointTerminatedEvent:
state.remote.Logger().Debug("Handling array wrapped terminate event", slog.String("address", state.address), slog.Any("message", unwrapped))
Expand Down Expand Up @@ -205,13 +205,19 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context)
// if the message can be translated to a serialization representation, we do this here
// this only apply to root level messages and never to nested child objects inside the message
message := rd.message
var err error
if v, ok := message.(RootSerializable); ok {
message = v.Serialize()
message, err = v.Serialize()
if err != nil {
state.remote.Logger().Error("EndpointWriter failed to serialize message", slog.String("address", state.address), slog.Any("error", err), slog.Any("message", v))
continue
}
}

bytes, typeName, err := Serialize(message, serializerID)
if err != nil {
panic(err)
state.remote.Logger().Error("EndpointWriter failed to serialize message", slog.String("address", state.address), slog.Any("error", err), slog.Any("message", message))
continue
}
typeID, typeNamesArr = addToLookup(typeNames, typeName, typeNamesArr)
targetID, targetNamesArr = addToTargetLookup(targetNames, rd.target, targetNamesArr)
Expand All @@ -223,7 +229,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context)
senderRequestID = rd.sender.RequestId
}

envelopes[i] = &MessageEnvelope{
envelopes = append(envelopes, &MessageEnvelope{
MessageHeader: header,
MessageData: bytes,
Sender: senderID,
Expand All @@ -232,7 +238,11 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context)
SerializerId: serializerID,
TargetRequestId: targetRequestID,
SenderRequestId: senderRequestID,
}
})
}

if len(envelopes) == 0 {
return
}

err := state.stream.Send(&RemoteMessage{
Expand All @@ -248,7 +258,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context)
if err != nil {
ctx.Stash()
state.remote.Logger().Debug("gRPC Failed to send", slog.String("address", state.address), slog.Any("error", err))
panic("restart it")
ctx.Stop(ctx.Self())
}
}

Expand Down
12 changes: 9 additions & 3 deletions remote/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ type Serializer interface {

func Serialize(message interface{}, serializerID int32) ([]byte, string, error) {
res, err := serializers[serializerID].Serialize(message)
if err != nil {
return nil, "", err
}
typeName, err := serializers[serializerID].GetTypeName(message)
return res, typeName, err
if err != nil {
return nil, "", err
}
return res, typeName, nil
}

func Deserialize(message []byte, typeName string, serializerID int32) (interface{}, error) {
Expand All @@ -34,12 +40,12 @@ func Deserialize(message []byte, typeName string, serializerID int32) (interface
type RootSerializable interface {
// Serialize returns the on-the-wire representation of the message
// Message -> IRootSerialized -> ByteString
Serialize() RootSerialized
Serialize() (RootSerialized, error)
}

// RootSerialized is the root level on-the-wire representation of a message
type RootSerialized interface {
// Deserialize returns the in-process representation of a message
// ByteString -> IRootSerialized -> Message
Deserialize() RootSerializable
Deserialize() (RootSerializable, error)
}

0 comments on commit ad7304a

Please sign in to comment.