From ad7304af476492cccf61a841cb9af20d56cf2b66 Mon Sep 17 00:00:00 2001 From: evilolipop Date: Fri, 5 Jan 2024 16:08:55 +0800 Subject: [PATCH] fix: skip when remote message cannot be serialized --- actor/actor_context.go | 3 +- cluster/pubsub_batch.go | 70 ++++++++++++++++++++++++++++----------- remote/endpoint_reader.go | 6 +++- remote/endpoint_writer.go | 24 ++++++++++---- remote/serializer.go | 12 +++++-- 5 files changed, 83 insertions(+), 32 deletions(-) diff --git a/actor/actor_context.go b/actor/actor_context.go index 55de5dc0..e9e62e3d 100644 --- a/actor/actor_context.go +++ b/actor/actor_context.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "runtime/debug" "sync/atomic" "time" @@ -702,10 +703,10 @@ func (ctx *actorContext) finalizeStop() { // func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) { - // TODO: add callstack to log? ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason)) // debug setting, allows to output supervision failures in console/error level if ctx.actorSystem.Config.DeveloperSupervisionLogging { + fmt.Printf("debug.Stack(): %s\n", debug.Stack()) fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason) ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason)) } diff --git a/cluster/pubsub_batch.go b/cluster/pubsub_batch.go index d9745730..2f13441b 100644 --- a/cluster/pubsub_batch.go +++ b/cluster/pubsub_batch.go @@ -5,12 +5,22 @@ import ( "github.com/asynkron/protoactor-go/remote" ) +var ( + _ remote.RootSerializable = (*PubSubBatch)(nil) + _ remote.RootSerializable = (*DeliverBatchRequest)(nil) + _ remote.RootSerializable = (*PubSubAutoRespondBatch)(nil) + + _ remote.RootSerialized = (*PubSubBatchTransport)(nil) + _ remote.RootSerialized = (*DeliverBatchRequestTransport)(nil) + _ remote.RootSerialized = (*PubSubAutoRespondBatchTransport)(nil) +) + type PubSubBatch struct { Envelopes []interface{} } // Serialize converts a PubSubBatch to a PubSubBatchTransport. -func (b *PubSubBatch) Serialize() remote.RootSerialized { +func (b *PubSubBatch) Serialize() (remote.RootSerialized, error) { batch := &PubSubBatchTransport{ TypeNames: make([]string, 0), Envelopes: make([]*PubSubEnvelope, 0), @@ -20,7 +30,7 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized { var serializerId int32 messageData, typeName, err := remote.Serialize(envelope, serializerId) if err != nil { - panic(err) + return nil, err } // batch.TypeNames.IndexOf(typeName) typeIndex := -1 @@ -40,11 +50,11 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized { SerializerId: serializerId, }) } - return batch + return batch, nil } // Deserialize converts a PubSubBatchTransport to a PubSubBatch. -func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { +func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error) { b := &PubSubBatch{ Envelopes: make([]interface{}, 0), } @@ -52,11 +62,11 @@ func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { for _, envelope := range t.Envelopes { message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId) if err != nil { - panic(err) + return nil, err } b.Envelopes = append(b.Envelopes, message) } - return b + return b, nil } type DeliverBatchRequest struct { @@ -65,20 +75,30 @@ type DeliverBatchRequest struct { Topic string } -func (d *DeliverBatchRequest) Serialize() remote.RootSerialized { +func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error) { + rs, err := d.PubSubBatch.Serialize() + if err != nil { + return nil, err + } + return &DeliverBatchRequestTransport{ Subscribers: d.Subscribers, - Batch: d.PubSubBatch.Serialize().(*PubSubBatchTransport), + Batch: rs.(*PubSubBatchTransport), Topic: d.Topic, - } + }, nil } -func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable { +func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error) { + rs, err := t.Batch.Deserialize() + if err != nil { + return nil, err + } + return &DeliverBatchRequest{ Subscribers: t.Subscribers, - PubSubBatch: t.Batch.Deserialize().(*PubSubBatch), + PubSubBatch: rs.(*PubSubBatch), Topic: t.Topic, - } + }, nil } type PubSubAutoRespondBatch struct { @@ -86,13 +106,18 @@ type PubSubAutoRespondBatch struct { } // Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport. -func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized { +func (b *PubSubAutoRespondBatch) Serialize() (remote.RootSerialized, error) { batch := &PubSubBatch{Envelopes: b.Envelopes} - transport := batch.Serialize().(*PubSubBatchTransport) - return &PubSubAutoRespondBatchTransport{ - TypeNames: transport.TypeNames, - Envelopes: transport.Envelopes, + + rs, err := batch.Serialize() + if err != nil { + return nil, err } + + return &PubSubAutoRespondBatchTransport{ + TypeNames: rs.(*PubSubBatchTransport).TypeNames, + Envelopes: rs.(*PubSubBatchTransport).Envelopes, + }, nil } // GetAutoResponse returns a PublishResponse. @@ -108,12 +133,17 @@ func (b *PubSubAutoRespondBatch) GetMessages() []interface{} { } // Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch. -func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable { +func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error) { batch := &PubSubBatchTransport{ TypeNames: t.TypeNames, Envelopes: t.Envelopes, } - return &PubSubAutoRespondBatch{ - Envelopes: batch.Deserialize().(*PubSubBatch).Envelopes, + rs, err := batch.Deserialize() + if err != nil { + return nil, err } + + return &PubSubAutoRespondBatch{ + Envelopes: rs.(*PubSubBatch).Envelopes, + }, nil } diff --git a/remote/endpoint_reader.go b/remote/endpoint_reader.go index 4310153e..ec3e1d97 100644 --- a/remote/endpoint_reader.go +++ b/remote/endpoint_reader.go @@ -142,7 +142,11 @@ func (s *endpointReader) onMessageBatch(m *MessageBatch) error { // translate from on-the-wire representation to in-process representation // this only applies to root level messages, and never on nested child messages if v, ok := message.(RootSerialized); ok { - message = v.Deserialize() + message, err = v.Deserialize() + if err != nil { + s.remote.Logger().Error("EndpointReader failed to deserialize", slog.Any("error", err)) + return err + } } switch msg := message.(type) { diff --git a/remote/endpoint_writer.go b/remote/endpoint_writer.go index da96a3ed..88911608 100644 --- a/remote/endpoint_writer.go +++ b/remote/endpoint_writer.go @@ -155,7 +155,7 @@ func (state *endpointWriter) initializeInternal() error { } func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) { - envelopes := make([]*MessageEnvelope, len(msg)) + envelopes := make([]*MessageEnvelope, 0) // type name uniqueness map name string to type index typeNames := make(map[string]int32) @@ -175,7 +175,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) serializerID int32 ) - for i, tmp := range msg { + for _, tmp := range msg { switch unwrapped := tmp.(type) { case *EndpointTerminatedEvent, EndpointTerminatedEvent: state.remote.Logger().Debug("Handling array wrapped terminate event", slog.String("address", state.address), slog.Any("message", unwrapped)) @@ -205,13 +205,19 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) // if the message can be translated to a serialization representation, we do this here // this only apply to root level messages and never to nested child objects inside the message message := rd.message + var err error if v, ok := message.(RootSerializable); ok { - message = v.Serialize() + message, err = v.Serialize() + if err != nil { + state.remote.Logger().Error("EndpointWriter failed to serialize message", slog.String("address", state.address), slog.Any("error", err), slog.Any("message", v)) + continue + } } bytes, typeName, err := Serialize(message, serializerID) if err != nil { - panic(err) + state.remote.Logger().Error("EndpointWriter failed to serialize message", slog.String("address", state.address), slog.Any("error", err), slog.Any("message", message)) + continue } typeID, typeNamesArr = addToLookup(typeNames, typeName, typeNamesArr) targetID, targetNamesArr = addToTargetLookup(targetNames, rd.target, targetNamesArr) @@ -223,7 +229,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) senderRequestID = rd.sender.RequestId } - envelopes[i] = &MessageEnvelope{ + envelopes = append(envelopes, &MessageEnvelope{ MessageHeader: header, MessageData: bytes, Sender: senderID, @@ -232,7 +238,11 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) SerializerId: serializerID, TargetRequestId: targetRequestID, SenderRequestId: senderRequestID, - } + }) + } + + if len(envelopes) == 0 { + return } err := state.stream.Send(&RemoteMessage{ @@ -248,7 +258,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) if err != nil { ctx.Stash() state.remote.Logger().Debug("gRPC Failed to send", slog.String("address", state.address), slog.Any("error", err)) - panic("restart it") + ctx.Stop(ctx.Self()) } } diff --git a/remote/serializer.go b/remote/serializer.go index 670a06e0..d603636b 100644 --- a/remote/serializer.go +++ b/remote/serializer.go @@ -22,8 +22,14 @@ type Serializer interface { func Serialize(message interface{}, serializerID int32) ([]byte, string, error) { res, err := serializers[serializerID].Serialize(message) + if err != nil { + return nil, "", err + } typeName, err := serializers[serializerID].GetTypeName(message) - return res, typeName, err + if err != nil { + return nil, "", err + } + return res, typeName, nil } func Deserialize(message []byte, typeName string, serializerID int32) (interface{}, error) { @@ -34,12 +40,12 @@ func Deserialize(message []byte, typeName string, serializerID int32) (interface type RootSerializable interface { // Serialize returns the on-the-wire representation of the message // Message -> IRootSerialized -> ByteString - Serialize() RootSerialized + Serialize() (RootSerialized, error) } // RootSerialized is the root level on-the-wire representation of a message type RootSerialized interface { // Deserialize returns the in-process representation of a message // ByteString -> IRootSerialized -> Message - Deserialize() RootSerializable + Deserialize() (RootSerializable, error) }