Skip to content

Commit

Permalink
Merge pull request #947 from asynkron/more-logging
Browse files Browse the repository at this point in the history
More logging
  • Loading branch information
rogeralsing authored Nov 23, 2023
2 parents 050314e + 0c7aecb commit 65710ad
Show file tree
Hide file tree
Showing 23 changed files with 234 additions and 73 deletions.
2 changes: 1 addition & 1 deletion _examples/cluster-broadcast/shared/protos.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 24 additions & 30 deletions _examples/cluster-broadcast/shared/protos_protoactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@ package shared
import (
"errors"
"fmt"
"log/slog"
"math"
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
logmod "github.com/asynkron/protoactor-go/log"
"google.golang.org/protobuf/proto"
)

var (
plog = logmod.New(logmod.InfoLevel, "[GRAIN][shared]")
_ = proto.Marshal
_ = fmt.Errorf
_ = math.Inf
_ = proto.Marshal
_ = fmt.Errorf
_ = math.Inf
)

// SetLogLevel sets the log level.
func SetLogLevel(level logmod.Level) {
plog.SetLevel(level)
}

var xCalculatorFactory func() Calculator

// CalculatorFactory produces a Calculator
Expand Down Expand Up @@ -89,7 +83,7 @@ func (g *CalculatorGrainClient) Add(r *NumberRequest, opts ...cluster.GrainCallO
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 0, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Calculator", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Calculator", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +109,7 @@ func (g *CalculatorGrainClient) Subtract(r *NumberRequest, opts ...cluster.Grain
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 1, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Calculator", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Calculator", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -141,7 +135,7 @@ func (g *CalculatorGrainClient) GetCurrent(r *Noop, opts ...cluster.GrainCallOpt
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 2, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Calculator", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Calculator", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -170,7 +164,7 @@ type CalculatorActor struct {
// Receive ensures the lifecycle of the actor for the received message
func (a *CalculatorActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started: // pass
case *actor.Started: //pass
case *cluster.ClusterInit:
a.ctx = cluster.NewGrainContext(ctx, msg.Identity, msg.Cluster)
a.inner = xCalculatorFactory()
Expand All @@ -192,7 +186,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
req := &NumberRequest{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("Add(NumberRequest) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] Add(NumberRequest) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -205,7 +199,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("Add(NumberRequest) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] Add(NumberRequest) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -216,7 +210,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
req := &NumberRequest{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("Subtract(NumberRequest) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] Subtract(NumberRequest) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -229,7 +223,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("Subtract(NumberRequest) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] Subtract(NumberRequest) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -240,7 +234,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
req := &Noop{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("GetCurrent(Noop) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] GetCurrent(Noop) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -253,7 +247,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("GetCurrent(Noop) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] GetCurrent(Noop) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand Down Expand Up @@ -331,7 +325,7 @@ func (g *TrackerGrainClient) RegisterGrain(r *RegisterMessage, opts ...cluster.G
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 0, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Tracker", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Tracker", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -357,7 +351,7 @@ func (g *TrackerGrainClient) DeregisterGrain(r *RegisterMessage, opts ...cluster
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 1, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Tracker", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Tracker", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -383,7 +377,7 @@ func (g *TrackerGrainClient) BroadcastGetCounts(r *Noop, opts ...cluster.GrainCa
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 2, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Tracker", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Tracker", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -412,7 +406,7 @@ type TrackerActor struct {
// Receive ensures the lifecycle of the actor for the received message
func (a *TrackerActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started: // pass
case *actor.Started: //pass
case *cluster.ClusterInit:
a.ctx = cluster.NewGrainContext(ctx, msg.Identity, msg.Cluster)
a.inner = xTrackerFactory()
Expand All @@ -434,7 +428,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
req := &RegisterMessage{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("RegisterGrain(RegisterMessage) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] RegisterGrain(RegisterMessage) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -447,7 +441,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("RegisterGrain(RegisterMessage) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] RegisterGrain(RegisterMessage) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -458,7 +452,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
req := &RegisterMessage{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("DeregisterGrain(RegisterMessage) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] DeregisterGrain(RegisterMessage) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -471,7 +465,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("DeregisterGrain(RegisterMessage) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] DeregisterGrain(RegisterMessage) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -482,7 +476,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
req := &Noop{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("BroadcastGetCounts(Noop) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] BroadcastGetCounts(Noop) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -495,7 +489,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("BroadcastGetCounts(Noop) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] BroadcastGetCounts(Noop) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand Down
2 changes: 2 additions & 0 deletions _examples/cluster-grain/node2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log/slog"

"cluster-grain/shared"

Expand All @@ -20,6 +21,7 @@ func (h HelloGrain) Terminate(ctx cluster.GrainContext) {}
func (h HelloGrain) ReceiveDefault(ctx cluster.GrainContext) {}

func (h HelloGrain) SayHello(request *shared.HelloRequest, ctx cluster.GrainContext) (*shared.HelloResponse, error) {
ctx.Logger().Info("SayHello", slog.String("name", request.Name))
return &shared.HelloResponse{Message: "Hello " + request.Name}, nil
}

Expand Down
9 changes: 4 additions & 5 deletions _examples/remote-activate/node1/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package main

import (
"fmt"
"log"
"log/slog"
"time"

"remoteactivate/messages"
Expand All @@ -26,9 +25,9 @@ func main() {
PropsFromFunc(func(context actor.Context) {
switch context.Message().(type) {
case *actor.Started:
log.Printf("actor started " + context.Self().String())
context.Logger().Info("actor started ", slog.Any("self", context.Self()))
case *messages.HelloRequest:
log.Println("Received pong from sender")
context.Logger().Info("Received pong from sender")
message := &messages.HelloResponse{Message: "hello from remote"}
context.Request(context.Sender(), message)
}
Expand All @@ -40,7 +39,7 @@ func main() {

response := res.(*messages.HelloResponse)

fmt.Printf("Response from remote %v", response.Message)
system.Logger().Info("Response from remote", slog.Any("message", response.Message))

console.ReadLine()
}
10 changes: 5 additions & 5 deletions actor/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (ctx *actorContext) ActorSystem() *ActorSystem {
}

func (ctx *actorContext) Logger() *slog.Logger {
return ctx.actorSystem.Logger
return ctx.actorSystem.Logger()
}

func (ctx *actorContext) Parent() *PID {
Expand Down Expand Up @@ -260,7 +260,7 @@ func (ctx *actorContext) receiveTimeoutHandler() {
func (ctx *actorContext) Forward(pid *PID) {
if msg, ok := ctx.messageOrEnvelope.(SystemMessage); ok {
// SystemMessage cannot be forwarded
ctx.actorSystem.Logger.Error("SystemMessage cannot be forwarded", slog.Any("message", msg))
ctx.Logger().Error("SystemMessage cannot be forwarded", slog.Any("message", msg))

return
}
Expand Down Expand Up @@ -567,7 +567,7 @@ func (ctx *actorContext) InvokeSystemMessage(message interface{}) {
case *Restart:
ctx.handleRestart()
default:
ctx.actorSystem.Logger.Error("unknown system message", slog.Any("message", msg))
ctx.Logger().Error("unknown system message", slog.Any("message", msg))
}
}

Expand Down Expand Up @@ -707,11 +707,11 @@ func (ctx *actorContext) finalizeStop() {

func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) {
//TODO: add callstack to log?
ctx.actorSystem.Logger.Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
// debug setting, allows to output supervision failures in console/error level
if ctx.actorSystem.Config.DeveloperSupervisionLogging {
fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason)
ctx.actorSystem.Logger.Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
}

metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
Expand Down
10 changes: 7 additions & 3 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ type ActorSystem struct {
Config *Config
ID string
stopper chan struct{}
Logger *slog.Logger
logger *slog.Logger
}

func (as *ActorSystem) Logger() *slog.Logger {
return as.logger
}

func (as *ActorSystem) NewLocalPID(id string) *PID {
Expand Down Expand Up @@ -72,7 +76,7 @@ func NewActorSystemWithConfig(config *Config) *ActorSystem {
system := &ActorSystem{}
system.ID = shortuuid.New()
system.Config = config
system.Logger = config.LoggerFactory(system)
system.logger = config.LoggerFactory(system)
system.ProcessRegistry = NewProcessRegistry(system)
system.Root = NewRootContext(system, EmptyMessageHeader)
system.Guardians = NewGuardians(system)
Expand All @@ -85,7 +89,7 @@ func NewActorSystemWithConfig(config *Config) *ActorSystem {
system.ProcessRegistry.Add(NewEventStreamProcess(system), "eventstream")
system.stopper = make(chan struct{})

system.Logger.Info("actor system started", slog.String("id", system.ID))
system.Logger().Info("actor system started", slog.String("id", system.ID))

return system
}
2 changes: 1 addition & 1 deletion actor/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (b *Behavior) Receive(context Context) {
if ok {
behavior(context)
} else {
context.ActorSystem().Logger.Error("empty behavior called", slog.Any("pid", context.Self()))
context.Logger().Error("empty behavior called", slog.Any("pid", context.Self()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {
}

shouldThrottle := NewThrottle(actorSystem.Config.DeadLetterThrottleCount, actorSystem.Config.DeadLetterThrottleInterval, func(i int32) {
actorSystem.Logger.Info("[DeadLetter]", slog.Int64("throttled", int64(i)))
actorSystem.Logger().Info("[DeadLetter]", slog.Int64("throttled", int64(i)))
})

actorSystem.ProcessRegistry.Add(dp, "deadletter")
Expand All @@ -42,7 +42,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {

if _, isIgnoreDeadLetter := deadLetter.Message.(IgnoreDeadLetterLogging); !isIgnoreDeadLetter {
if shouldThrottle() == Open {
actorSystem.Logger.Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
actorSystem.Logger().Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion actor/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewFuture(actorSystem *ActorSystem, d time.Duration) *Future {

pid, ok := actorSystem.ProcessRegistry.Add(ref, "future"+id)
if !ok {
actorSystem.Logger.Error("failed to register future process", slog.Any("pid", pid))
actorSystem.Logger().Error("failed to register future process", slog.Any("pid", pid))
}

sysMetrics, ok := actorSystem.Extensions.Get(extensionId).(*Metrics)
Expand Down
2 changes: 1 addition & 1 deletion actor/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (gs *guardiansValue) newGuardian(s SupervisorStrategy) *guardianProcess {

pid, ok := gs.actorSystem.ProcessRegistry.Add(ref, "guardian"+id)
if !ok {
gs.actorSystem.Logger.Error("failed to register guardian process", slog.Any("pid", pid))
gs.actorSystem.Logger().Error("failed to register guardian process", slog.Any("pid", pid))
}

ref.pid = pid
Expand Down
Loading

0 comments on commit 65710ad

Please sign in to comment.