Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More logging #947

Merged
merged 5 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/cluster-broadcast/shared/protos.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 24 additions & 30 deletions _examples/cluster-broadcast/shared/protos_protoactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@ package shared
import (
"errors"
"fmt"
"log/slog"
"math"
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/cluster"
logmod "github.com/asynkron/protoactor-go/log"
"google.golang.org/protobuf/proto"
)

var (
plog = logmod.New(logmod.InfoLevel, "[GRAIN][shared]")
_ = proto.Marshal
_ = fmt.Errorf
_ = math.Inf
_ = proto.Marshal
_ = fmt.Errorf
_ = math.Inf
)

// SetLogLevel sets the log level.
func SetLogLevel(level logmod.Level) {
plog.SetLevel(level)
}

var xCalculatorFactory func() Calculator

// CalculatorFactory produces a Calculator
Expand Down Expand Up @@ -89,7 +83,7 @@ func (g *CalculatorGrainClient) Add(r *NumberRequest, opts ...cluster.GrainCallO
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 0, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Calculator", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Calculator", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +109,7 @@ func (g *CalculatorGrainClient) Subtract(r *NumberRequest, opts ...cluster.Grain
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 1, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Calculator", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Calculator", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -141,7 +135,7 @@ func (g *CalculatorGrainClient) GetCurrent(r *Noop, opts ...cluster.GrainCallOpt
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 2, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Calculator", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Calculator", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -170,7 +164,7 @@ type CalculatorActor struct {
// Receive ensures the lifecycle of the actor for the received message
func (a *CalculatorActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started: // pass
case *actor.Started: //pass
case *cluster.ClusterInit:
a.ctx = cluster.NewGrainContext(ctx, msg.Identity, msg.Cluster)
a.inner = xCalculatorFactory()
Expand All @@ -192,7 +186,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
req := &NumberRequest{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("Add(NumberRequest) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] Add(NumberRequest) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -205,7 +199,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("Add(NumberRequest) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] Add(NumberRequest) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -216,7 +210,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
req := &NumberRequest{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("Subtract(NumberRequest) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] Subtract(NumberRequest) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -229,7 +223,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("Subtract(NumberRequest) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] Subtract(NumberRequest) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -240,7 +234,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
req := &Noop{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("GetCurrent(Noop) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] GetCurrent(Noop) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -253,7 +247,7 @@ func (a *CalculatorActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("GetCurrent(Noop) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] GetCurrent(Noop) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand Down Expand Up @@ -331,7 +325,7 @@ func (g *TrackerGrainClient) RegisterGrain(r *RegisterMessage, opts ...cluster.G
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 0, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Tracker", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Tracker", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -357,7 +351,7 @@ func (g *TrackerGrainClient) DeregisterGrain(r *RegisterMessage, opts ...cluster
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 1, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Tracker", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Tracker", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand All @@ -383,7 +377,7 @@ func (g *TrackerGrainClient) BroadcastGetCounts(r *Noop, opts ...cluster.GrainCa
return nil, err
}
reqMsg := &cluster.GrainRequest{MethodIndex: 2, MessageData: bytes}
resp, err := g.cluster.Call(g.Identity, "Tracker", reqMsg, opts...)
resp, err := g.cluster.Request(g.Identity, "Tracker", reqMsg, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -412,7 +406,7 @@ type TrackerActor struct {
// Receive ensures the lifecycle of the actor for the received message
func (a *TrackerActor) Receive(ctx actor.Context) {
switch msg := ctx.Message().(type) {
case *actor.Started: // pass
case *actor.Started: //pass
case *cluster.ClusterInit:
a.ctx = cluster.NewGrainContext(ctx, msg.Identity, msg.Cluster)
a.inner = xTrackerFactory()
Expand All @@ -434,7 +428,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
req := &RegisterMessage{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("RegisterGrain(RegisterMessage) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] RegisterGrain(RegisterMessage) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -447,7 +441,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("RegisterGrain(RegisterMessage) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] RegisterGrain(RegisterMessage) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -458,7 +452,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
req := &RegisterMessage{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("DeregisterGrain(RegisterMessage) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] DeregisterGrain(RegisterMessage) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -471,7 +465,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("DeregisterGrain(RegisterMessage) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] DeregisterGrain(RegisterMessage) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -482,7 +476,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
req := &Noop{}
err := proto.Unmarshal(msg.MessageData, req)
if err != nil {
plog.Error("BroadcastGetCounts(Noop) proto.Unmarshal failed.", logmod.Error(err))
ctx.Logger().Error("[Grain] BroadcastGetCounts(Noop) proto.Unmarshal failed.", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand All @@ -495,7 +489,7 @@ func (a *TrackerActor) Receive(ctx actor.Context) {
}
bytes, err := proto.Marshal(r0)
if err != nil {
plog.Error("BroadcastGetCounts(Noop) proto.Marshal failed", logmod.Error(err))
ctx.Logger().Error("[Grain] BroadcastGetCounts(Noop) proto.Marshal failed", slog.Any("error", err))
resp := &cluster.GrainErrorResponse{Err: err.Error()}
ctx.Respond(resp)
return
Expand Down
2 changes: 2 additions & 0 deletions _examples/cluster-grain/node2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log/slog"

"cluster-grain/shared"

Expand All @@ -20,6 +21,7 @@ func (h HelloGrain) Terminate(ctx cluster.GrainContext) {}
func (h HelloGrain) ReceiveDefault(ctx cluster.GrainContext) {}

func (h HelloGrain) SayHello(request *shared.HelloRequest, ctx cluster.GrainContext) (*shared.HelloResponse, error) {
ctx.Logger().Info("SayHello", slog.String("name", request.Name))
return &shared.HelloResponse{Message: "Hello " + request.Name}, nil
}

Expand Down
9 changes: 4 additions & 5 deletions _examples/remote-activate/node1/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package main

import (
"fmt"
"log"
"log/slog"
"time"

"remoteactivate/messages"
Expand All @@ -26,9 +25,9 @@ func main() {
PropsFromFunc(func(context actor.Context) {
switch context.Message().(type) {
case *actor.Started:
log.Printf("actor started " + context.Self().String())
context.Logger().Info("actor started ", slog.Any("self", context.Self()))
case *messages.HelloRequest:
log.Println("Received pong from sender")
context.Logger().Info("Received pong from sender")
message := &messages.HelloResponse{Message: "hello from remote"}
context.Request(context.Sender(), message)
}
Expand All @@ -40,7 +39,7 @@ func main() {

response := res.(*messages.HelloResponse)

fmt.Printf("Response from remote %v", response.Message)
system.Logger().Info("Response from remote", slog.Any("message", response.Message))

console.ReadLine()
}
10 changes: 5 additions & 5 deletions actor/actor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (ctx *actorContext) ActorSystem() *ActorSystem {
}

func (ctx *actorContext) Logger() *slog.Logger {
return ctx.actorSystem.Logger
return ctx.actorSystem.Logger()
}

func (ctx *actorContext) Parent() *PID {
Expand Down Expand Up @@ -260,7 +260,7 @@ func (ctx *actorContext) receiveTimeoutHandler() {
func (ctx *actorContext) Forward(pid *PID) {
if msg, ok := ctx.messageOrEnvelope.(SystemMessage); ok {
// SystemMessage cannot be forwarded
ctx.actorSystem.Logger.Error("SystemMessage cannot be forwarded", slog.Any("message", msg))
ctx.Logger().Error("SystemMessage cannot be forwarded", slog.Any("message", msg))

return
}
Expand Down Expand Up @@ -567,7 +567,7 @@ func (ctx *actorContext) InvokeSystemMessage(message interface{}) {
case *Restart:
ctx.handleRestart()
default:
ctx.actorSystem.Logger.Error("unknown system message", slog.Any("message", msg))
ctx.Logger().Error("unknown system message", slog.Any("message", msg))
}
}

Expand Down Expand Up @@ -707,11 +707,11 @@ func (ctx *actorContext) finalizeStop() {

func (ctx *actorContext) EscalateFailure(reason interface{}, message interface{}) {
//TODO: add callstack to log?
ctx.actorSystem.Logger.Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
ctx.Logger().Info("[ACTOR] Recovering", slog.Any("self", ctx.self), slog.Any("reason", reason))
// debug setting, allows to output supervision failures in console/error level
if ctx.actorSystem.Config.DeveloperSupervisionLogging {
fmt.Println("[Supervision] Actor:", ctx.self, " failed with message:", message, " exception:", reason)
ctx.actorSystem.Logger.Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
ctx.Logger().Error("[Supervision]", slog.Any("actor", ctx.self), slog.Any("message", message), slog.Any("exception", reason))
}

metricsSystem, ok := ctx.actorSystem.Extensions.Get(extensionId).(*Metrics)
Expand Down
10 changes: 7 additions & 3 deletions actor/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ type ActorSystem struct {
Config *Config
ID string
stopper chan struct{}
Logger *slog.Logger
logger *slog.Logger
}

func (as *ActorSystem) Logger() *slog.Logger {
return as.logger
}

func (as *ActorSystem) NewLocalPID(id string) *PID {
Expand Down Expand Up @@ -72,7 +76,7 @@ func NewActorSystemWithConfig(config *Config) *ActorSystem {
system := &ActorSystem{}
system.ID = shortuuid.New()
system.Config = config
system.Logger = config.LoggerFactory(system)
system.logger = config.LoggerFactory(system)
system.ProcessRegistry = NewProcessRegistry(system)
system.Root = NewRootContext(system, EmptyMessageHeader)
system.Guardians = NewGuardians(system)
Expand All @@ -85,7 +89,7 @@ func NewActorSystemWithConfig(config *Config) *ActorSystem {
system.ProcessRegistry.Add(NewEventStreamProcess(system), "eventstream")
system.stopper = make(chan struct{})

system.Logger.Info("actor system started", slog.String("id", system.ID))
system.Logger().Info("actor system started", slog.String("id", system.ID))

return system
}
2 changes: 1 addition & 1 deletion actor/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (b *Behavior) Receive(context Context) {
if ok {
behavior(context)
} else {
context.ActorSystem().Logger.Error("empty behavior called", slog.Any("pid", context.Self()))
context.Logger().Error("empty behavior called", slog.Any("pid", context.Self()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {
}

shouldThrottle := NewThrottle(actorSystem.Config.DeadLetterThrottleCount, actorSystem.Config.DeadLetterThrottleInterval, func(i int32) {
actorSystem.Logger.Info("[DeadLetter]", slog.Int64("throttled", int64(i)))
actorSystem.Logger().Info("[DeadLetter]", slog.Int64("throttled", int64(i)))
})

actorSystem.ProcessRegistry.Add(dp, "deadletter")
Expand All @@ -42,7 +42,7 @@ func NewDeadLetter(actorSystem *ActorSystem) *deadLetterProcess {

if _, isIgnoreDeadLetter := deadLetter.Message.(IgnoreDeadLetterLogging); !isIgnoreDeadLetter {
if shouldThrottle() == Open {
actorSystem.Logger.Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
actorSystem.Logger().Debug("[DeadLetter]", slog.Any("pid", deadLetter.PID), slog.Any("message", deadLetter.Message), slog.Any("sender", deadLetter.Sender))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion actor/future.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewFuture(actorSystem *ActorSystem, d time.Duration) *Future {

pid, ok := actorSystem.ProcessRegistry.Add(ref, "future"+id)
if !ok {
actorSystem.Logger.Error("failed to register future process", slog.Any("pid", pid))
actorSystem.Logger().Error("failed to register future process", slog.Any("pid", pid))
}

sysMetrics, ok := actorSystem.Extensions.Get(extensionId).(*Metrics)
Expand Down
2 changes: 1 addition & 1 deletion actor/guardian.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (gs *guardiansValue) newGuardian(s SupervisorStrategy) *guardianProcess {

pid, ok := gs.actorSystem.ProcessRegistry.Add(ref, "guardian"+id)
if !ok {
gs.actorSystem.Logger.Error("failed to register guardian process", slog.Any("pid", pid))
gs.actorSystem.Logger().Error("failed to register guardian process", slog.Any("pid", pid))
}

ref.pid = pid
Expand Down
Loading