Skip to content

Commit

Permalink
Merge pull request #1014 from qazwsxedckll/fix-recover-from-non-proto…
Browse files Browse the repository at this point in the history
…-remote-message

fix recover from non proto remote message
  • Loading branch information
rogeralsing authored Jan 16, 2024
2 parents 2ecba75 + ad7304a commit 93e384a
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 67 deletions.
5 changes: 5 additions & 0 deletions _examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
It is recommanded to create your own go.work to use the local version of the main module.

Besides, go.work is not recommanded to be commited to the repository.

Not all examples are up-to-date, please help to update them if you find any outdated examples.
4 changes: 2 additions & 2 deletions _examples/cluster-grain/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ module cluster-grain

go 1.21

replace github.com/asynkron/protoactor-go => ../..
// replace github.com/asynkron/protoactor-go => ../..

require (
github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716
github.com/asynkron/protoactor-go v0.0.0-00010101000000-000000000000
github.com/asynkron/protoactor-go v0.0.0-20231231215642-2ecba7517929
google.golang.org/protobuf v1.31.0
)

Expand Down
4 changes: 2 additions & 2 deletions _examples/cluster-grain/shared/build.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
protoc --go_out=. --go_opt=paths=source_relative --proto_path=. protos.proto
protoc --go_out=. --go_opt=paths=source_relative --plugin=$GOPATH/bin/protoc-gen-go-grain --go-grain_out=. --go-grain_opt=paths=source_relative protos.proto
protoc --go_out=. --go_opt=paths=source_relative \
--plugin=protoc-gen-go-grain=../../../protobuf/protoc-gen-go-grain/protoc-gen-go-grain.sh --go-grain_out=. --go-grain_opt=paths=source_relative -I../../ -I. protos.proto
4 changes: 2 additions & 2 deletions _examples/cluster-grain/shared/protos.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 22 additions & 21 deletions _examples/cluster-grain/shared/protos_grain.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion actor/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"runtime/debug"
"sync/atomic"
"time"

Expand Down Expand Up @@ -702,10 +703,10 @@ func (ctx *actorContext) finalizeStop() {
//

func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) {
// TODO: add callstack to log?
ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
// debug setting, allows to output supervision failures in console/error level
if ctx.actorSystem.Config.DeveloperSupervisionLogging {
fmt.Printf("debug.Stack(): %s\n", debug.Stack())
fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason)
ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
}
Expand Down
5 changes: 1 addition & 4 deletions cluster/identitylookup/disthash/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) {
pm.cluster.Logger().Info("onClusterTopology", slog.Uint64("topology-hash", tplg.TopologyHash))

for _, m := range tplg.Members {
pm.cluster.Logger().Info("Got member ", slog.String("MemberId", m.Id))
for _, k := range m.Kinds {
pm.cluster.Logger().Info("" + m.Id + " - " + k)
}
pm.cluster.Logger().Info("Got member", slog.Any("member", m))
}

pm.rdv = clustering.NewRendezvous()
Expand Down
70 changes: 50 additions & 20 deletions cluster/pubsub_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,22 @@ import (
"google.golang.org/protobuf/proto"
)

var (
_ remote.RootSerializable = (*PubSubBatch)(nil)
_ remote.RootSerializable = (*DeliverBatchRequest)(nil)
_ remote.RootSerializable = (*PubSubAutoRespondBatch)(nil)

_ remote.RootSerialized = (*PubSubBatchTransport)(nil)
_ remote.RootSerialized = (*DeliverBatchRequestTransport)(nil)
_ remote.RootSerialized = (*PubSubAutoRespondBatchTransport)(nil)
)

type PubSubBatch struct {
Envelopes []proto.Message
}

// Serialize converts a PubSubBatch to a PubSubBatchTransport.
func (b *PubSubBatch) Serialize() remote.RootSerialized {
func (b *PubSubBatch) Serialize() (remote.RootSerialized, error) {
batch := &PubSubBatchTransport{
TypeNames: make([]string, 0),
Envelopes: make([]*PubSubEnvelope, 0),
Expand All @@ -21,7 +31,7 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
var serializerId int32
messageData, typeName, err := remote.Serialize(envelope, serializerId)
if err != nil {
panic(err)
return nil, err
}
// batch.TypeNames.IndexOf(typeName)
typeIndex := -1
Expand All @@ -41,19 +51,19 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized {
SerializerId: serializerId,
})
}
return batch
return batch, nil
}

// Deserialize converts a PubSubBatchTransport to a PubSubBatch.
func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable {
func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error) {
b := &PubSubBatch{
Envelopes: make([]proto.Message, 0),
}

for _, envelope := range t.Envelopes {
message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
panic(err)
return nil, err
}
protoMessage, ok := message.(proto.Message)
if !ok {
Expand All @@ -62,7 +72,7 @@ func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable {

b.Envelopes = append(b.Envelopes, protoMessage)
}
return b
return b, nil
}

type DeliverBatchRequest struct {
Expand All @@ -71,20 +81,30 @@ type DeliverBatchRequest struct {
Topic string
}

func (d *DeliverBatchRequest) Serialize() remote.RootSerialized {
func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error) {
rs, err := d.PubSubBatch.Serialize()
if err != nil {
return nil, err
}

return &DeliverBatchRequestTransport{
Subscribers: d.Subscribers,
Batch: d.PubSubBatch.Serialize().(*PubSubBatchTransport),
Batch: rs.(*PubSubBatchTransport),
Topic: d.Topic,
}
}, nil
}

func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable {
func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error) {
rs, err := t.Batch.Deserialize()
if err != nil {
return nil, err
}

return &DeliverBatchRequest{
Subscribers: t.Subscribers,
PubSubBatch: t.Batch.Deserialize().(*PubSubBatch),
PubSubBatch: rs.(*PubSubBatch),
Topic: t.Topic,
}
}, nil
}

var _ actor.MessageBatch = (*PubSubAutoRespondBatch)(nil)
Expand All @@ -94,13 +114,18 @@ type PubSubAutoRespondBatch struct {
}

// Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport.
func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized {
func (b *PubSubAutoRespondBatch) Serialize() (remote.RootSerialized, error) {
batch := &PubSubBatch{Envelopes: b.Envelopes}
transport := batch.Serialize().(*PubSubBatchTransport)
return &PubSubAutoRespondBatchTransport{
TypeNames: transport.TypeNames,
Envelopes: transport.Envelopes,

rs, err := batch.Serialize()
if err != nil {
return nil, err
}

return &PubSubAutoRespondBatchTransport{
TypeNames: rs.(*PubSubBatchTransport).TypeNames,
Envelopes: rs.(*PubSubBatchTransport).Envelopes,
}, nil
}

// GetAutoResponse returns a PublishResponse.
Expand All @@ -120,12 +145,17 @@ func (b *PubSubAutoRespondBatch) GetMessages() []interface{} {
}

// Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch.
func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable {
func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error) {
batch := &PubSubBatchTransport{
TypeNames: t.TypeNames,
Envelopes: t.Envelopes,
}
return &PubSubAutoRespondBatch{
Envelopes: batch.Deserialize().(*PubSubBatch).Envelopes,
rs, err := batch.Deserialize()
if err != nil {
return nil, err
}

return &PubSubAutoRespondBatch{
Envelopes: rs.(*PubSubBatch).Envelopes,
}, nil
}
14 changes: 10 additions & 4 deletions remote/endpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type endpointLazy struct {
// valueFunc func() *endpoint
unloaded uint32
unloaded atomic.Bool
once sync.Once
endpoint atomic.Value
manager *endpointManager
Expand All @@ -27,6 +27,7 @@ func NewEndpointLazy(em *endpointManager, address string) *endpointLazy {
}

func (el *endpointLazy) connect() {
el.manager.remote.actorSystem.Logger().Debug("connecting to remote address", slog.String("address", el.address))
em := el.manager
system := em.remote.actorSystem
rst, _ := system.Root.RequestFuture(em.endpointSupervisor, el.address, -1).Result()
Expand Down Expand Up @@ -247,10 +248,10 @@ func (em *endpointManager) removeEndpoint(msg *EndpointTerminatedEvent) {
v, ok := em.connections.Load(msg.Address)
if ok {
le := v.(*endpointLazy)
if atomic.CompareAndSwapUint32(&le.unloaded, 0, 1) {
if le.unloaded.CompareAndSwap(false, true) {
em.connections.Delete(msg.Address)
ep := le.Get()
em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher ans EndpointWriter", slog.String("address", msg.Address))
em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher and EndpointWriter", slog.String("address", msg.Address))
em.remote.actorSystem.Root.Send(ep.watcher, msg)
em.remote.actorSystem.Root.Send(ep.writer, msg)
}
Expand All @@ -274,13 +275,18 @@ func (state *endpointSupervisor) Receive(ctx actor.Context) {
writer: state.spawnEndpointWriter(state.remote, address, ctx),
watcher: state.spawnEndpointWatcher(state.remote, address, ctx),
}
ctx.Logger().Debug("id", slog.String("ewr", e.writer.Id), slog.String("ewa", e.watcher.Id))
ctx.Respond(e)
}
}

func (state *endpointSupervisor) HandleFailure(actorSystem *actor.ActorSystem, supervisor actor.Supervisor, child *actor.PID, rs *actor.RestartStatistics, reason interface{}, message interface{}) {
actorSystem.Logger().Debug("EndpointSupervisor handling failure", slog.Any("reason", reason), slog.Any("message", message))
supervisor.RestartChildren(child)
// use restart will cause a start loop, just stop it for now
// supervisor.RestartChildren(child)

// TODO: an extra stop is sent to the deadletter caused by EndpointTerminatedEvent
supervisor.StopChildren(child)
}

func (state *endpointSupervisor) spawnEndpointWriter(remote *Remote, address string, ctx actor.Context) *actor.PID {
Expand Down
6 changes: 5 additions & 1 deletion remote/endpoint_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (s *endpointReader) onMessageBatch(m *MessageBatch) error {
// translate from on-the-wire representation to in-process representation
// this only applies to root level messages, and never on nested child messages
if v, ok := message.(RootSerialized); ok {
message = v.Deserialize()
message, err = v.Deserialize()
if err != nil {
s.remote.Logger().Error("EndpointReader failed to deserialize", slog.Any("error", err))
return err
}
}

switch msg := message.(type) {
Expand Down
Loading

0 comments on commit 93e384a

Please sign in to comment.