diff --git a/_examples/README.md b/_examples/README.md new file mode 100644 index 00000000..4443ccb4 --- /dev/null +++ b/_examples/README.md @@ -0,0 +1,5 @@ +It is recommanded to create your own go.work to use the local version of the main module. + +Besides, go.work is not recommanded to be commited to the repository. + +Not all examples are up-to-date, please help to update them if you find any outdated examples. \ No newline at end of file diff --git a/_examples/cluster-grain/go.mod b/_examples/cluster-grain/go.mod index 9e6ace9e..35b38d54 100644 --- a/_examples/cluster-grain/go.mod +++ b/_examples/cluster-grain/go.mod @@ -2,11 +2,11 @@ module cluster-grain go 1.21 -replace github.com/asynkron/protoactor-go => ../.. +// replace github.com/asynkron/protoactor-go => ../.. require ( github.com/asynkron/goconsole v0.0.0-20160504192649-bfa12eebf716 - github.com/asynkron/protoactor-go v0.0.0-00010101000000-000000000000 + github.com/asynkron/protoactor-go v0.0.0-20231231215642-2ecba7517929 google.golang.org/protobuf v1.31.0 ) diff --git a/_examples/cluster-grain/shared/build.sh b/_examples/cluster-grain/shared/build.sh index 653ecce2..234e6c11 100755 --- a/_examples/cluster-grain/shared/build.sh +++ b/_examples/cluster-grain/shared/build.sh @@ -1,2 +1,2 @@ -protoc --go_out=. --go_opt=paths=source_relative --proto_path=. protos.proto -protoc --go_out=. --go_opt=paths=source_relative --plugin=$GOPATH/bin/protoc-gen-go-grain --go-grain_out=. --go-grain_opt=paths=source_relative protos.proto \ No newline at end of file +protoc --go_out=. --go_opt=paths=source_relative \ + --plugin=protoc-gen-go-grain=../../../protobuf/protoc-gen-go-grain/protoc-gen-go-grain.sh --go-grain_out=. --go-grain_opt=paths=source_relative -I../../ -I. protos.proto diff --git a/_examples/cluster-grain/shared/protos.pb.go b/_examples/cluster-grain/shared/protos.pb.go index b7cd6cce..97cb47b1 100644 --- a/_examples/cluster-grain/shared/protos.pb.go +++ b/_examples/cluster-grain/shared/protos.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 -// protoc v4.24.3 +// protoc-gen-go v1.31.0 +// protoc v4.25.0 // source: protos.proto package shared diff --git a/_examples/cluster-grain/shared/protos_grain.pb.go b/_examples/cluster-grain/shared/protos_grain.pb.go index ab35695d..a028ac26 100644 --- a/_examples/cluster-grain/shared/protos_grain.pb.go +++ b/_examples/cluster-grain/shared/protos_grain.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-grain. DO NOT EDIT. // versions: -// protoc-gen-grain v0.1.0 -// protoc v4.24.3 +// protoc-gen-grain v0.4.0 +// protoc v4.25.0 // source: protos.proto package shared @@ -62,7 +62,7 @@ type Hello interface { Init(ctx cluster.GrainContext) Terminate(ctx cluster.GrainContext) ReceiveDefault(ctx cluster.GrainContext) - SayHello(*HelloRequest, cluster.GrainContext) (*HelloResponse, error) + SayHello(req *HelloRequest, ctx cluster.GrainContext) (*HelloResponse, error) } // HelloGrainClient holds the base data for the HelloGrain @@ -80,20 +80,15 @@ func (g *HelloGrainClient) SayHello(r *HelloRequest, opts ...cluster.GrainCallOp reqMsg := &cluster.GrainRequest{MethodIndex: 0, MessageData: bytes} resp, err := g.cluster.Request(g.Identity, "Hello", reqMsg, opts...) if err != nil { - return nil, err + return nil, fmt.Errorf("error request: %w", err) } switch msg := resp.(type) { - case *cluster.GrainResponse: - result := &HelloResponse{} - err = proto.Unmarshal(msg.MessageData, result) - if err != nil { - return nil, err - } - return result, nil + case *HelloResponse: + return msg, nil case *cluster.GrainErrorResponse: return nil, errors.New(msg.Err) default: - return nil, errors.New("unknown response") + return nil, fmt.Errorf("unknown response type %T", resp) } } @@ -134,23 +129,29 @@ func (a *HelloActor) Receive(ctx actor.Context) { ctx.Respond(resp) return } + r0, err := a.inner.SayHello(req, a.ctx) if err != nil { resp := &cluster.GrainErrorResponse{Err: err.Error()} ctx.Respond(resp) return } - bytes, err := proto.Marshal(r0) - if err != nil { - ctx.Logger().Error("[Grain] SayHello(HelloRequest) proto.Marshal failed", slog.Any("error", err)) - resp := &cluster.GrainErrorResponse{Err: err.Error()} - ctx.Respond(resp) - return - } - resp := &cluster.GrainResponse{MessageData: bytes} - ctx.Respond(resp) + ctx.Respond(r0) } default: a.inner.ReceiveDefault(a.ctx) } } + +// onError should be used in ctx.ReenterAfter +// you can just return error in reenterable method for other errors +func (a *HelloActor) onError(err error) { + resp := &cluster.GrainErrorResponse{Err: err.Error()} + a.ctx.Respond(resp) +} + +func respond[T proto.Message](ctx cluster.GrainContext) func(T) { + return func(resp T) { + ctx.Respond(resp) + } +} diff --git a/actor/actor_context.go b/actor/actor_context.go index 55de5dc0..e9e62e3d 100644 --- a/actor/actor_context.go +++ b/actor/actor_context.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "runtime/debug" "sync/atomic" "time" @@ -702,10 +703,10 @@ func (ctx *actorContext) finalizeStop() { // func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) { - // TODO: add callstack to log? ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason)) // debug setting, allows to output supervision failures in console/error level if ctx.actorSystem.Config.DeveloperSupervisionLogging { + fmt.Printf("debug.Stack(): %s\n", debug.Stack()) fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason) ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason)) } diff --git a/cluster/identitylookup/disthash/manager.go b/cluster/identitylookup/disthash/manager.go index 3d85501b..1369a71a 100644 --- a/cluster/identitylookup/disthash/manager.go +++ b/cluster/identitylookup/disthash/manager.go @@ -63,10 +63,7 @@ func (pm *Manager) onClusterTopology(tplg *clustering.ClusterTopology) { pm.cluster.Logger().Info("onClusterTopology", slog.Uint64("topology-hash", tplg.TopologyHash)) for _, m := range tplg.Members { - pm.cluster.Logger().Info("Got member ", slog.String("MemberId", m.Id)) - for _, k := range m.Kinds { - pm.cluster.Logger().Info("" + m.Id + " - " + k) - } + pm.cluster.Logger().Info("Got member", slog.Any("member", m)) } pm.rdv = clustering.NewRendezvous() diff --git a/cluster/pubsub_batch.go b/cluster/pubsub_batch.go index e56912c6..7bd9f8af 100644 --- a/cluster/pubsub_batch.go +++ b/cluster/pubsub_batch.go @@ -6,12 +6,22 @@ import ( "google.golang.org/protobuf/proto" ) +var ( + _ remote.RootSerializable = (*PubSubBatch)(nil) + _ remote.RootSerializable = (*DeliverBatchRequest)(nil) + _ remote.RootSerializable = (*PubSubAutoRespondBatch)(nil) + + _ remote.RootSerialized = (*PubSubBatchTransport)(nil) + _ remote.RootSerialized = (*DeliverBatchRequestTransport)(nil) + _ remote.RootSerialized = (*PubSubAutoRespondBatchTransport)(nil) +) + type PubSubBatch struct { Envelopes []proto.Message } // Serialize converts a PubSubBatch to a PubSubBatchTransport. -func (b *PubSubBatch) Serialize() remote.RootSerialized { +func (b *PubSubBatch) Serialize() (remote.RootSerialized, error) { batch := &PubSubBatchTransport{ TypeNames: make([]string, 0), Envelopes: make([]*PubSubEnvelope, 0), @@ -21,7 +31,7 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized { var serializerId int32 messageData, typeName, err := remote.Serialize(envelope, serializerId) if err != nil { - panic(err) + return nil, err } // batch.TypeNames.IndexOf(typeName) typeIndex := -1 @@ -41,11 +51,11 @@ func (b *PubSubBatch) Serialize() remote.RootSerialized { SerializerId: serializerId, }) } - return batch + return batch, nil } // Deserialize converts a PubSubBatchTransport to a PubSubBatch. -func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { +func (t *PubSubBatchTransport) Deserialize() (remote.RootSerializable, error) { b := &PubSubBatch{ Envelopes: make([]proto.Message, 0), } @@ -53,7 +63,7 @@ func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { for _, envelope := range t.Envelopes { message, err := remote.Deserialize(envelope.MessageData, t.TypeNames[envelope.TypeId], envelope.SerializerId) if err != nil { - panic(err) + return nil, err } protoMessage, ok := message.(proto.Message) if !ok { @@ -62,7 +72,7 @@ func (t *PubSubBatchTransport) Deserialize() remote.RootSerializable { b.Envelopes = append(b.Envelopes, protoMessage) } - return b + return b, nil } type DeliverBatchRequest struct { @@ -71,20 +81,30 @@ type DeliverBatchRequest struct { Topic string } -func (d *DeliverBatchRequest) Serialize() remote.RootSerialized { +func (d *DeliverBatchRequest) Serialize() (remote.RootSerialized, error) { + rs, err := d.PubSubBatch.Serialize() + if err != nil { + return nil, err + } + return &DeliverBatchRequestTransport{ Subscribers: d.Subscribers, - Batch: d.PubSubBatch.Serialize().(*PubSubBatchTransport), + Batch: rs.(*PubSubBatchTransport), Topic: d.Topic, - } + }, nil } -func (t *DeliverBatchRequestTransport) Deserialize() remote.RootSerializable { +func (t *DeliverBatchRequestTransport) Deserialize() (remote.RootSerializable, error) { + rs, err := t.Batch.Deserialize() + if err != nil { + return nil, err + } + return &DeliverBatchRequest{ Subscribers: t.Subscribers, - PubSubBatch: t.Batch.Deserialize().(*PubSubBatch), + PubSubBatch: rs.(*PubSubBatch), Topic: t.Topic, - } + }, nil } var _ actor.MessageBatch = (*PubSubAutoRespondBatch)(nil) @@ -94,13 +114,18 @@ type PubSubAutoRespondBatch struct { } // Serialize converts a PubSubAutoRespondBatch to a PubSubAutoRespondBatchTransport. -func (b *PubSubAutoRespondBatch) Serialize() remote.RootSerialized { +func (b *PubSubAutoRespondBatch) Serialize() (remote.RootSerialized, error) { batch := &PubSubBatch{Envelopes: b.Envelopes} - transport := batch.Serialize().(*PubSubBatchTransport) - return &PubSubAutoRespondBatchTransport{ - TypeNames: transport.TypeNames, - Envelopes: transport.Envelopes, + + rs, err := batch.Serialize() + if err != nil { + return nil, err } + + return &PubSubAutoRespondBatchTransport{ + TypeNames: rs.(*PubSubBatchTransport).TypeNames, + Envelopes: rs.(*PubSubBatchTransport).Envelopes, + }, nil } // GetAutoResponse returns a PublishResponse. @@ -120,12 +145,17 @@ func (b *PubSubAutoRespondBatch) GetMessages() []interface{} { } // Deserialize converts a PubSubAutoRespondBatchTransport to a PubSubAutoRespondBatch. -func (t *PubSubAutoRespondBatchTransport) Deserialize() remote.RootSerializable { +func (t *PubSubAutoRespondBatchTransport) Deserialize() (remote.RootSerializable, error) { batch := &PubSubBatchTransport{ TypeNames: t.TypeNames, Envelopes: t.Envelopes, } - return &PubSubAutoRespondBatch{ - Envelopes: batch.Deserialize().(*PubSubBatch).Envelopes, + rs, err := batch.Deserialize() + if err != nil { + return nil, err } + + return &PubSubAutoRespondBatch{ + Envelopes: rs.(*PubSubBatch).Envelopes, + }, nil } diff --git a/remote/endpoint_manager.go b/remote/endpoint_manager.go index d0dd9a2c..85f209a1 100644 --- a/remote/endpoint_manager.go +++ b/remote/endpoint_manager.go @@ -12,7 +12,7 @@ import ( type endpointLazy struct { // valueFunc func() *endpoint - unloaded uint32 + unloaded atomic.Bool once sync.Once endpoint atomic.Value manager *endpointManager @@ -27,6 +27,7 @@ func NewEndpointLazy(em *endpointManager, address string) *endpointLazy { } func (el *endpointLazy) connect() { + el.manager.remote.actorSystem.Logger().Debug("connecting to remote address", slog.String("address", el.address)) em := el.manager system := em.remote.actorSystem rst, _ := system.Root.RequestFuture(em.endpointSupervisor, el.address, -1).Result() @@ -247,10 +248,10 @@ func (em *endpointManager) removeEndpoint(msg *EndpointTerminatedEvent) { v, ok := em.connections.Load(msg.Address) if ok { le := v.(*endpointLazy) - if atomic.CompareAndSwapUint32(&le.unloaded, 0, 1) { + if le.unloaded.CompareAndSwap(false, true) { em.connections.Delete(msg.Address) ep := le.Get() - em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher ans EndpointWriter", slog.String("address", msg.Address)) + em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher and EndpointWriter", slog.String("address", msg.Address)) em.remote.actorSystem.Root.Send(ep.watcher, msg) em.remote.actorSystem.Root.Send(ep.writer, msg) } @@ -274,13 +275,18 @@ func (state *endpointSupervisor) Receive(ctx actor.Context) { writer: state.spawnEndpointWriter(state.remote, address, ctx), watcher: state.spawnEndpointWatcher(state.remote, address, ctx), } + ctx.Logger().Debug("id", slog.String("ewr", e.writer.Id), slog.String("ewa", e.watcher.Id)) ctx.Respond(e) } } func (state *endpointSupervisor) HandleFailure(actorSystem *actor.ActorSystem, supervisor actor.Supervisor, child *actor.PID, rs *actor.RestartStatistics, reason interface{}, message interface{}) { actorSystem.Logger().Debug("EndpointSupervisor handling failure", slog.Any("reason", reason), slog.Any("message", message)) - supervisor.RestartChildren(child) + // use restart will cause a start loop, just stop it for now + // supervisor.RestartChildren(child) + + // TODO: an extra stop is sent to the deadletter caused by EndpointTerminatedEvent + supervisor.StopChildren(child) } func (state *endpointSupervisor) spawnEndpointWriter(remote *Remote, address string, ctx actor.Context) *actor.PID { diff --git a/remote/endpoint_reader.go b/remote/endpoint_reader.go index 4310153e..ec3e1d97 100644 --- a/remote/endpoint_reader.go +++ b/remote/endpoint_reader.go @@ -142,7 +142,11 @@ func (s *endpointReader) onMessageBatch(m *MessageBatch) error { // translate from on-the-wire representation to in-process representation // this only applies to root level messages, and never on nested child messages if v, ok := message.(RootSerialized); ok { - message = v.Deserialize() + message, err = v.Deserialize() + if err != nil { + s.remote.Logger().Error("EndpointReader failed to deserialize", slog.Any("error", err)) + return err + } } switch msg := message.(type) { diff --git a/remote/endpoint_writer.go b/remote/endpoint_writer.go index da96a3ed..88911608 100644 --- a/remote/endpoint_writer.go +++ b/remote/endpoint_writer.go @@ -155,7 +155,7 @@ func (state *endpointWriter) initializeInternal() error { } func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) { - envelopes := make([]*MessageEnvelope, len(msg)) + envelopes := make([]*MessageEnvelope, 0) // type name uniqueness map name string to type index typeNames := make(map[string]int32) @@ -175,7 +175,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) serializerID int32 ) - for i, tmp := range msg { + for _, tmp := range msg { switch unwrapped := tmp.(type) { case *EndpointTerminatedEvent, EndpointTerminatedEvent: state.remote.Logger().Debug("Handling array wrapped terminate event", slog.String("address", state.address), slog.Any("message", unwrapped)) @@ -205,13 +205,19 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) // if the message can be translated to a serialization representation, we do this here // this only apply to root level messages and never to nested child objects inside the message message := rd.message + var err error if v, ok := message.(RootSerializable); ok { - message = v.Serialize() + message, err = v.Serialize() + if err != nil { + state.remote.Logger().Error("EndpointWriter failed to serialize message", slog.String("address", state.address), slog.Any("error", err), slog.Any("message", v)) + continue + } } bytes, typeName, err := Serialize(message, serializerID) if err != nil { - panic(err) + state.remote.Logger().Error("EndpointWriter failed to serialize message", slog.String("address", state.address), slog.Any("error", err), slog.Any("message", message)) + continue } typeID, typeNamesArr = addToLookup(typeNames, typeName, typeNamesArr) targetID, targetNamesArr = addToTargetLookup(targetNames, rd.target, targetNamesArr) @@ -223,7 +229,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) senderRequestID = rd.sender.RequestId } - envelopes[i] = &MessageEnvelope{ + envelopes = append(envelopes, &MessageEnvelope{ MessageHeader: header, MessageData: bytes, Sender: senderID, @@ -232,7 +238,11 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) SerializerId: serializerID, TargetRequestId: targetRequestID, SenderRequestId: senderRequestID, - } + }) + } + + if len(envelopes) == 0 { + return } err := state.stream.Send(&RemoteMessage{ @@ -248,7 +258,7 @@ func (state *endpointWriter) sendEnvelopes(msg []interface{}, ctx actor.Context) if err != nil { ctx.Stash() state.remote.Logger().Debug("gRPC Failed to send", slog.String("address", state.address), slog.Any("error", err)) - panic("restart it") + ctx.Stop(ctx.Self()) } } diff --git a/remote/serializer.go b/remote/serializer.go index 670a06e0..d603636b 100644 --- a/remote/serializer.go +++ b/remote/serializer.go @@ -22,8 +22,14 @@ type Serializer interface { func Serialize(message interface{}, serializerID int32) ([]byte, string, error) { res, err := serializers[serializerID].Serialize(message) + if err != nil { + return nil, "", err + } typeName, err := serializers[serializerID].GetTypeName(message) - return res, typeName, err + if err != nil { + return nil, "", err + } + return res, typeName, nil } func Deserialize(message []byte, typeName string, serializerID int32) (interface{}, error) { @@ -34,12 +40,12 @@ func Deserialize(message []byte, typeName string, serializerID int32) (interface type RootSerializable interface { // Serialize returns the on-the-wire representation of the message // Message -> IRootSerialized -> ByteString - Serialize() RootSerialized + Serialize() (RootSerialized, error) } // RootSerialized is the root level on-the-wire representation of a message type RootSerialized interface { // Deserialize returns the in-process representation of a message // ByteString -> IRootSerialized -> Message - Deserialize() RootSerializable + Deserialize() (RootSerializable, error) }