diff --git a/cluster/cluster.go b/cluster/cluster.go index a59ebe92..fe821b79 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,6 +1,7 @@ package cluster import ( + "log/slog" "time" "google.golang.org/protobuf/types/known/emptypb" @@ -9,7 +10,6 @@ import ( "github.com/asynkron/protoactor-go/actor" "github.com/asynkron/protoactor-go/extensions" - "github.com/asynkron/protoactor-go/log" "github.com/asynkron/protoactor-go/remote" ) @@ -91,7 +91,7 @@ func (c *Cluster) StartMember() { c.Remote.Start() address := c.ActorSystem.Address() - plog.Info("Starting Proto.Actor cluster member", log.String("id", c.ActorSystem.ID), log.String("address", address)) + c.ActorSystem.Logger.Info("Starting Proto.Actor cluster member", slog.String("address", address)) c.IdentityLookup = cfg.IdentityLookup c.IdentityLookup.Setup(c, c.GetClusterKinds(), false) @@ -127,7 +127,7 @@ func (c *Cluster) StartClient() { c.Remote.Start() address := c.ActorSystem.Address() - plog.Info("Starting Proto.Actor cluster-client", log.String("address", address)) + c.ActorSystem.Logger.Info("Starting Proto.Actor cluster-client", slog.String("address", address)) c.IdentityLookup = cfg.IdentityLookup c.IdentityLookup.Setup(c, c.GetClusterKinds(), true) @@ -154,7 +154,7 @@ func (c *Cluster) Shutdown(graceful bool) { c.Remote.Shutdown(graceful) address := c.ActorSystem.Address() - plog.Info("Stopped Proto.Actor cluster", log.String("address", address)) + c.ActorSystem.Logger.Info("Stopped Proto.Actor cluster", slog.String("address", address)) } func (c *Cluster) Get(identity string, kind string) *actor.PID { @@ -168,7 +168,7 @@ func (c *Cluster) Request(identity string, kind string, message interface{}) (in func (c *Cluster) GetClusterKind(kind string) *ActivatedKind { k, ok := c.kinds[kind] if !ok { - plog.Error("Invalid kind", log.String("kind", kind)) + c.ActorSystem.Logger.Error("Invalid kind", slog.String("kind", kind)) return nil } @@ -232,7 +232,7 @@ func (c *Cluster) Call(name string, kind string, msg interface{}, opts ...GrainC timeout := callConfig.Timeout _resp, err := _context.RequestFuture(pid, msg, timeout).Result() if err != nil { - plog.Error("cluster.RequestFuture failed", log.Error(err), log.PID("pid", pid)) + c.ActorSystem.Logger.Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid)) lastError = err switch err { diff --git a/cluster/default_context.go b/cluster/default_context.go index 851c0238..2d4f876b 100644 --- a/cluster/default_context.go +++ b/cluster/default_context.go @@ -5,10 +5,10 @@ package cluster import ( "context" "fmt" + "log/slog" "time" "github.com/asynkron/protoactor-go/actor" - "github.com/asynkron/protoactor-go/log" "github.com/asynkron/protoactor-go/remote" ) @@ -44,7 +44,7 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, t start := time.Now() - plog.Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message)) + dcc.cluster.ActorSystem.Logger.Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message)) // crate a new Timeout Context ttl := cfg.ActorRequestTimeout @@ -67,7 +67,7 @@ selectloop: default: pid := dcc.getCachedPid(identity, kind) if pid == nil { - plog.Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind)) + dcc.cluster.ActorSystem.Logger.Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind)) counter = cfg.RetryAction(counter) continue @@ -75,7 +75,7 @@ selectloop: resp, err = _context.RequestFuture(pid, message, ttl).Result() if err != nil { - plog.Error("cluster.RequestFuture failed", log.Error(err), log.PID("pid", pid)) + dcc.cluster.ActorSystem.Logger.Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid)) switch err { case actor.ErrTimeout, remote.ErrTimeout, actor.ErrDeadLetter, remote.ErrDeadLetter: counter = cfg.RetryAction(counter) @@ -97,7 +97,7 @@ selectloop: if contextError := ctx.Err(); contextError != nil && cfg.requestLogThrottle() == actor.Open { // context timeout exceeded, report and return - plog.Warn(fmt.Sprintf("Request retried but failed for %s:%s, elapsed %v", identity, kind, totalTime)) + dcc.cluster.ActorSystem.Logger.Warn("Request retried but failed", slog.String("identity", identity), slog.String("kind", kind), slog.Duration("duration", totalTime)) } return resp, err diff --git a/cluster/gossip_actor.go b/cluster/gossip_actor.go index 56b1b9a8..c56fe168 100644 --- a/cluster/gossip_actor.go +++ b/cluster/gossip_actor.go @@ -3,11 +3,12 @@ package cluster import ( + "github.com/opentracing/opentracing-go/log" + "log/slog" "time" "github.com/asynkron/gofun/set" "github.com/asynkron/protoactor-go/actor" - "github.com/asynkron/protoactor-go/log" ) // convenience customary type to represent an empty value @@ -30,7 +31,9 @@ func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers gossipRequestTimeout: requestTimeout, gossip: informer, } - gossipActor.throttler = actor.NewThrottle(3, 60*time.Second, gossipActor.throttledLog) + gossipActor.throttler = actor.NewThrottle(3, 60*time.Second, func(counter int32) { + plog.Debug("[Gossip] Sending GossipRequest", log.Int("throttled", int(counter))) + }) return &gossipActor } @@ -55,9 +58,9 @@ func (ga *GossipActor) Receive(ctx actor.Context) { case *ClusterTopology: ga.onClusterTopology(r) case *GossipResponse: - plog.Error("GossipResponse should not be received by GossipActor") // it should be a response to a request + ctx.Logger().Error("GossipResponse should not be received by GossipActor") // it should be a response to a request default: - plog.Warn("Gossip received unknown message request", log.Message(r), log.TypeOf("msg_type", r)) + ctx.Logger().Warn("Gossip received unknown message request", slog.Any("message", r)) } } @@ -81,12 +84,12 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { if ga.throttler() == actor.Open { - plog.Debug("OnGossipRequest", log.PID("sender", ctx.Sender())) + ctx.Logger().Debug("OnGossipRequest", slog.Any("sender", ctx.Sender())) } ga.ReceiveState(r.State, ctx) if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) { - plog.Warn("Got gossip request from unknown member", log.String("MemberId", r.MemberId)) + ctx.Logger().Warn("Got gossip request from unknown member", slog.String("MemberId", r.MemberId)) // nothing to send, do not provide sender or state payload // ctx.Respond(&GossipResponse{State: &GossipState{Members: make(map[string]*GossipState_GossipMemberState)}}) @@ -97,7 +100,7 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) { memberState := ga.gossip.GetMemberStateDelta(r.MemberId) if !memberState.HasState { - plog.Warn("Got gossip request from member, but no state was found", log.String("MemberId", r.MemberId)) + ctx.Logger().Warn("Got gossip request from member, but no state was found", slog.String("MemberId", r.MemberId)) // nothing to send, do not provide sender or state payload ctx.Respond(&GossipResponse{}) @@ -164,7 +167,7 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context) func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *MemberStateDelta, ctx actor.Context) { pid := actor.NewPID(member.Address(), DefaultGossipActorName) if ga.throttler() == actor.Open { - plog.Debug("Sending GossipRequest", log.String("MemberId", member.Id)) + ctx.Logger().Debug("Sending GossipRequest", slog.String("MemberId", member.Id)) } // a short timeout is massively important, we cannot afford hanging around waiting @@ -178,13 +181,13 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem ctx.ReenterAfter(future, func(res interface{}, err error) { if err != nil { - plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err)) + ctx.Logger().Warn("sendGossipForMember failed", slog.String("MemberId", member.Id), slog.Any("error", err)) return } resp, ok := res.(*GossipResponse) if !ok { - plog.Error("sendGossipForMember received unknown response message", log.TypeOf("messageType", res), log.Message(resp)) + ctx.Logger().Error("sendGossipForMember received unknown response message", slog.Any("message", resp)) return } @@ -196,7 +199,3 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem } }) } - -func (ga *GossipActor) throttledLog(counter int32) { - plog.Debug("[Gossip] Sending GossipRequest", log.Int("throttled", int(counter))) -} diff --git a/cluster/log.go b/cluster/log.go deleted file mode 100644 index a3cd7ad9..00000000 --- a/cluster/log.go +++ /dev/null @@ -1,14 +0,0 @@ -package cluster - -import ( - "github.com/asynkron/protoactor-go/log" -) - -var plog = log.New(log.DefaultLevel, "[CLUSTER]") - -// SetLogLevel sets the log level for the logger. -// -// SetLogLevel is safe to call concurrently -func SetLogLevel(level log.Level) { - plog.SetLevel(level) -}