Skip to content

Commit

Permalink
Start using slog
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Nov 19, 2023
1 parent e6a8933 commit 81a6830
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 39 deletions.
12 changes: 6 additions & 6 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"log/slog"
"time"

"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -9,7 +10,6 @@ import (

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/extensions"
"github.com/asynkron/protoactor-go/log"
"github.com/asynkron/protoactor-go/remote"
)

Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *Cluster) StartMember() {
c.Remote.Start()

address := c.ActorSystem.Address()
plog.Info("Starting Proto.Actor cluster member", log.String("id", c.ActorSystem.ID), log.String("address", address))
c.ActorSystem.Logger.Info("Starting Proto.Actor cluster member", slog.String("address", address))

c.IdentityLookup = cfg.IdentityLookup
c.IdentityLookup.Setup(c, c.GetClusterKinds(), false)
Expand Down Expand Up @@ -127,7 +127,7 @@ func (c *Cluster) StartClient() {
c.Remote.Start()

address := c.ActorSystem.Address()
plog.Info("Starting Proto.Actor cluster-client", log.String("address", address))
c.ActorSystem.Logger.Info("Starting Proto.Actor cluster-client", slog.String("address", address))

c.IdentityLookup = cfg.IdentityLookup
c.IdentityLookup.Setup(c, c.GetClusterKinds(), true)
Expand All @@ -154,7 +154,7 @@ func (c *Cluster) Shutdown(graceful bool) {
c.Remote.Shutdown(graceful)

address := c.ActorSystem.Address()
plog.Info("Stopped Proto.Actor cluster", log.String("address", address))
c.ActorSystem.Logger.Info("Stopped Proto.Actor cluster", slog.String("address", address))
}

func (c *Cluster) Get(identity string, kind string) *actor.PID {
Expand All @@ -168,7 +168,7 @@ func (c *Cluster) Request(identity string, kind string, message interface{}) (in
func (c *Cluster) GetClusterKind(kind string) *ActivatedKind {
k, ok := c.kinds[kind]
if !ok {
plog.Error("Invalid kind", log.String("kind", kind))
c.ActorSystem.Logger.Error("Invalid kind", slog.String("kind", kind))

return nil
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (c *Cluster) Call(name string, kind string, msg interface{}, opts ...GrainC
timeout := callConfig.Timeout
_resp, err := _context.RequestFuture(pid, msg, timeout).Result()
if err != nil {
plog.Error("cluster.RequestFuture failed", log.Error(err), log.PID("pid", pid))
c.ActorSystem.Logger.Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid))
lastError = err

switch err {
Expand Down
10 changes: 5 additions & 5 deletions cluster/default_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package cluster
import (
"context"
"fmt"
"log/slog"
"time"

"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/log"
"github.com/asynkron/protoactor-go/remote"
)

Expand Down Expand Up @@ -44,7 +44,7 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, t

start := time.Now()

plog.Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))
dcc.cluster.ActorSystem.Logger.Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))

// crate a new Timeout Context
ttl := cfg.ActorRequestTimeout
Expand All @@ -67,15 +67,15 @@ selectloop:
default:
pid := dcc.getCachedPid(identity, kind)
if pid == nil {
plog.Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind))
dcc.cluster.ActorSystem.Logger.Debug(fmt.Sprintf("Requesting %s:%s did not get PID from IdentityLookup", identity, kind))
counter = cfg.RetryAction(counter)

continue
}

resp, err = _context.RequestFuture(pid, message, ttl).Result()
if err != nil {
plog.Error("cluster.RequestFuture failed", log.Error(err), log.PID("pid", pid))
dcc.cluster.ActorSystem.Logger.Error("cluster.RequestFuture failed", slog.Any("error", err), slog.Any("pid", pid))
switch err {
case actor.ErrTimeout, remote.ErrTimeout, actor.ErrDeadLetter, remote.ErrDeadLetter:
counter = cfg.RetryAction(counter)
Expand All @@ -97,7 +97,7 @@ selectloop:

if contextError := ctx.Err(); contextError != nil && cfg.requestLogThrottle() == actor.Open {
// context timeout exceeded, report and return
plog.Warn(fmt.Sprintf("Request retried but failed for %s:%s, elapsed %v", identity, kind, totalTime))
dcc.cluster.ActorSystem.Logger.Warn("Request retried but failed", slog.String("identity", identity), slog.String("kind", kind), slog.Duration("duration", totalTime))
}

return resp, err
Expand Down
27 changes: 13 additions & 14 deletions cluster/gossip_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
package cluster

import (
"github.com/opentracing/opentracing-go/log"
"log/slog"
"time"

"github.com/asynkron/gofun/set"
"github.com/asynkron/protoactor-go/actor"
"github.com/asynkron/protoactor-go/log"
)

// convenience customary type to represent an empty value
Expand All @@ -30,7 +31,9 @@ func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers
gossipRequestTimeout: requestTimeout,
gossip: informer,
}
gossipActor.throttler = actor.NewThrottle(3, 60*time.Second, gossipActor.throttledLog)
gossipActor.throttler = actor.NewThrottle(3, 60*time.Second, func(counter int32) {
plog.Debug("[Gossip] Sending GossipRequest", log.Int("throttled", int(counter)))
})

return &gossipActor
}
Expand All @@ -55,9 +58,9 @@ func (ga *GossipActor) Receive(ctx actor.Context) {
case *ClusterTopology:
ga.onClusterTopology(r)
case *GossipResponse:
plog.Error("GossipResponse should not be received by GossipActor") // it should be a response to a request
ctx.Logger().Error("GossipResponse should not be received by GossipActor") // it should be a response to a request
default:
plog.Warn("Gossip received unknown message request", log.Message(r), log.TypeOf("msg_type", r))
ctx.Logger().Warn("Gossip received unknown message request", slog.Any("message", r))
}
}

Expand All @@ -81,12 +84,12 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C

func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
if ga.throttler() == actor.Open {
plog.Debug("OnGossipRequest", log.PID("sender", ctx.Sender()))
ctx.Logger().Debug("OnGossipRequest", slog.Any("sender", ctx.Sender()))
}
ga.ReceiveState(r.State, ctx)

if !GetCluster(ctx.ActorSystem()).MemberList.ContainsMemberID(r.MemberId) {
plog.Warn("Got gossip request from unknown member", log.String("MemberId", r.MemberId))
ctx.Logger().Warn("Got gossip request from unknown member", slog.String("MemberId", r.MemberId))

// nothing to send, do not provide sender or state payload
// ctx.Respond(&GossipResponse{State: &GossipState{Members: make(map[string]*GossipState_GossipMemberState)}})
Expand All @@ -97,7 +100,7 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {

memberState := ga.gossip.GetMemberStateDelta(r.MemberId)
if !memberState.HasState {
plog.Warn("Got gossip request from member, but no state was found", log.String("MemberId", r.MemberId))
ctx.Logger().Warn("Got gossip request from member, but no state was found", slog.String("MemberId", r.MemberId))

// nothing to send, do not provide sender or state payload
ctx.Respond(&GossipResponse{})
Expand Down Expand Up @@ -164,7 +167,7 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)
func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *MemberStateDelta, ctx actor.Context) {
pid := actor.NewPID(member.Address(), DefaultGossipActorName)
if ga.throttler() == actor.Open {
plog.Debug("Sending GossipRequest", log.String("MemberId", member.Id))
ctx.Logger().Debug("Sending GossipRequest", slog.String("MemberId", member.Id))
}

// a short timeout is massively important, we cannot afford hanging around waiting
Expand All @@ -178,13 +181,13 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem

ctx.ReenterAfter(future, func(res interface{}, err error) {
if err != nil {
plog.Warn("sendGossipForMember failed", log.String("MemberId", member.Id), log.Error(err))
ctx.Logger().Warn("sendGossipForMember failed", slog.String("MemberId", member.Id), slog.Any("error", err))
return
}

resp, ok := res.(*GossipResponse)
if !ok {
plog.Error("sendGossipForMember received unknown response message", log.TypeOf("messageType", res), log.Message(resp))
ctx.Logger().Error("sendGossipForMember received unknown response message", slog.Any("message", resp))

return
}
Expand All @@ -196,7 +199,3 @@ func (ga *GossipActor) sendGossipForMember(member *Member, memberStateDelta *Mem
}
})
}

func (ga *GossipActor) throttledLog(counter int32) {
plog.Debug("[Gossip] Sending GossipRequest", log.Int("throttled", int(counter)))
}
14 changes: 0 additions & 14 deletions cluster/log.go

This file was deleted.

0 comments on commit 81a6830

Please sign in to comment.