From 607ffc8d05e30b1d8df9e30328d31b0fd8df8b93 Mon Sep 17 00:00:00 2001 From: evilolipop Date: Thu, 14 Mar 2024 16:32:50 +0800 Subject: [PATCH 1/3] style: remove sprintf --- cluster/clusterproviders/k8s/k8s_provider.go | 11 +++++------ cluster/default_context.go | 5 +++-- cluster/gossiper.go | 9 ++++----- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/cluster/clusterproviders/k8s/k8s_provider.go b/cluster/clusterproviders/k8s/k8s_provider.go index 4a534bd0..6757993b 100644 --- a/cluster/clusterproviders/k8s/k8s_provider.go +++ b/cluster/clusterproviders/k8s/k8s_provider.go @@ -159,7 +159,6 @@ func (p *Provider) startClusterMonitor(c *cluster.Cluster) error { p.clusterMonitor, err = c.ActorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor { return newClusterMonitor(p) }), "k8s-cluster-monitor") - if err != nil { p.cluster.Logger().Error("Failed to start k8s-cluster-monitor actor", slog.Any("error", err)) return err @@ -177,7 +176,7 @@ func (p *Provider) registerMemberAsync(c *cluster.Cluster) { // registers itself as a member in k8s cluster func (p *Provider) registerMember(timeout time.Duration) error { - p.cluster.Logger().Info(fmt.Sprintf("Registering service %s on %s", p.podName, p.address)) + p.cluster.Logger().Info("Registering service in Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address)) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -187,7 +186,7 @@ func (p *Provider) registerMember(timeout time.Duration) error { return fmt.Errorf("unable to get own pod information for %s: %w", p.podName, err) } - p.cluster.Logger().Info(fmt.Sprintf("Using Kubernetes namespace: %s\nUsing Kubernetes port: %d", pod.Namespace, p.port)) + p.cluster.Logger().Info("Using Kubernetes namespace", slog.String("namespace", pod.Namespace), slog.Int("port", p.port)) labels := Labels{ LabelCluster: p.clusterName, @@ -218,7 +217,7 @@ func (p *Provider) startWatchingClusterAsync(c *cluster.Cluster) { func (p *Provider) startWatchingCluster() error { selector := fmt.Sprintf("%s=%s", LabelCluster, p.clusterName) - p.cluster.Logger().Debug(fmt.Sprintf("Starting to watch pods with %s", selector), slog.String("selector", selector)) + p.cluster.Logger().Debug("Starting to watch pods", slog.String("selector", selector)) ctx, cancel := context.WithCancel(context.Background()) p.cancelWatch = cancel @@ -365,7 +364,7 @@ func mapPodsToMembers(clusterPods map[types.UID]*v1.Pod, logger *slog.Logger) [] // deregister itself as a member from a k8s cluster func (p *Provider) deregisterMember(timeout time.Duration) error { - p.cluster.Logger().Info(fmt.Sprintf("Deregistering service %s from %s", p.podName, p.address)) + p.cluster.Logger().Info("Deregistering service from Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address)) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -419,7 +418,7 @@ func (p *Provider) retrieveNamespace() string { filename := filepath.Join(string(filepath.Separator), "var", "run", "secrets", "kubernetes.io", "serviceaccount", "namespace") content, err := os.ReadFile(filename) if err != nil { - p.cluster.Logger().Warn(fmt.Sprintf("Could not read %s contents defaulting to empty namespace: %s", filename, err.Error())) + p.cluster.Logger().Warn("Could not read contents, defaulting to empty namespace", slog.String("filename", filename), slog.Any("error", err)) return p.namespace } p.namespace = string(content) diff --git a/cluster/default_context.go b/cluster/default_context.go index 50f0afaa..3eba6f71 100644 --- a/cluster/default_context.go +++ b/cluster/default_context.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "log/slog" + "reflect" "time" "github.com/asynkron/protoactor-go/actor" @@ -50,7 +51,7 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, o start := time.Now() - dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message)) + dcc.cluster.Logger().Debug("Requesting", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message)) // crate a new Timeout Context ttl := callConfig.Timeout @@ -120,7 +121,7 @@ func (dcc *DefaultContext) RequestFuture(identity string, kind string, message i _context := callConfig.Context - dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting future %s:%s Message %#v", identity, kind, message)) + dcc.cluster.Logger().Debug("Requesting future", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message)) // crate a new Timeout Context ttl := callConfig.Timeout diff --git a/cluster/gossiper.go b/cluster/gossiper.go index 9aa02a31..e4fc24c6 100644 --- a/cluster/gossiper.go +++ b/cluster/gossiper.go @@ -70,7 +70,7 @@ func newGossiper(cl *Cluster, opts ...Option) (*Gossiper, error) { func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) { if g.throttler() == actor.Open { - g.cluster.Logger().Debug(fmt.Sprintf("Gossiper getting state from %s", g.pid)) + g.cluster.Logger().Debug("Gossiper getting state", slog.String("key", key), slog.String("remote", g.pid.String())) } msg := NewGetGossipStateRequest(key) @@ -104,7 +104,7 @@ func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) { // SetState Sends fire and forget message to update member state func (g *Gossiper) SetState(key string, value proto.Message) { if g.throttler() == actor.Open { - g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid)) + g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String())) } if g.pid == nil { @@ -118,7 +118,7 @@ func (g *Gossiper) SetState(key string, value proto.Message) { // SetStateRequest Sends a Request (that blocks) to update member state func (g *Gossiper) SetStateRequest(key string, value proto.Message) error { if g.throttler() == actor.Open { - g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid)) + g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String())) } if g.pid == nil { @@ -186,7 +186,6 @@ func (g *Gossiper) StartGossiping() error { system, ) }), g.GossipActorName) - if err != nil { g.cluster.Logger().Error("Failed to start gossip actor", slog.Any("error", err)) return err @@ -300,5 +299,5 @@ func (g *Gossiper) blockGracefullyLeft() { } func (g *Gossiper) throttledLog(counter int32) { - g.cluster.Logger().Debug(fmt.Sprintf("[Gossiper] Gossiper Setting State to %s", g.pid), slog.Int("throttled", int(counter))) + g.cluster.Logger().Debug("Gossiper Setting State", slog.String("remote", g.pid.String()), slog.Int("throttled", int(counter))) } From 856e4c89eb8855f651d98cb40ba4040f1ea439b0 Mon Sep 17 00:00:00 2001 From: evilolipop Date: Mon, 18 Mar 2024 09:58:40 +0800 Subject: [PATCH 2/3] fix: fix nil logger --- actor/throttler.go | 4 ++-- cluster/pubsub_topic.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/actor/throttler.go b/actor/throttler.go index b8007379..e925c7ab 100644 --- a/actor/throttler.go +++ b/actor/throttler.go @@ -58,7 +58,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period time.Duration, throttledCallBack func(*slog.Logger, int32)) ShouldThrottle { currentEvents := int32(0) - startTimer := func(duration time.Duration, back func(*slog.Logger, int32)) { + startTimer := func(duration time.Duration) { go func() { // crete ticker to mimic sleep, we do not want to put the goroutine to sleep // as it will schedule it out of the P making a syscall, we just want it to @@ -77,7 +77,7 @@ func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period return func() Valve { tries := atomic.AddInt32(¤tEvents, 1) if tries == 1 { - startTimer(period, throttledCallBack) + startTimer(period) } if tries == maxEventsInPeriod { diff --git a/cluster/pubsub_topic.go b/cluster/pubsub_topic.go index 11dc47e1..76feba20 100644 --- a/cluster/pubsub_topic.go +++ b/cluster/pubsub_topic.go @@ -170,14 +170,14 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg } // unsubscribeUnreachablePidSubscribers deletes all subscribers that have a PID that is unreachable -func (t *TopicActor) unsubscribeUnreachablePidSubscribers(_ actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) { +func (t *TopicActor) unsubscribeUnreachablePidSubscribers(c actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) { subscribers := make([]subscribeIdentityStruct, 0, len(allInvalidDeliveryReports)) for _, r := range allInvalidDeliveryReports { if r.Subscriber.GetPid() != nil && r.Status == DeliveryStatus_SubscriberNoLongerReachable { subscribers = append(subscribers, newSubscribeIdentityStruct(r.Subscriber)) } } - t.removeSubscribers(subscribers, nil) + t.removeSubscribers(subscribers, c.Logger()) } // onClusterTopologyChanged handles a ClusterTopology message @@ -217,7 +217,7 @@ func (t *TopicActor) unsubscribeSubscribersOnMembersThatLeft(c actor.Context) { } } } - t.removeSubscribers(subscribersThatLeft, nil) + t.removeSubscribers(subscribersThatLeft, c.Logger()) } // removeSubscribers remove subscribers from the topic From 3c1f244254fe8c3a564c87d40f30039674bfe0de Mon Sep 17 00:00:00 2001 From: evilolipop Date: Mon, 18 Mar 2024 16:28:37 +0800 Subject: [PATCH 3/3] feat: return all keys --- cluster/cluster_test_tool/pubsub_cluster_fixture.go | 11 ++++++++++- cluster/key_value_store.go | 10 ++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/cluster/cluster_test_tool/pubsub_cluster_fixture.go b/cluster/cluster_test_tool/pubsub_cluster_fixture.go index c995dff7..403f20aa 100644 --- a/cluster/cluster_test_tool/pubsub_cluster_fixture.go +++ b/cluster/cluster_test_tool/pubsub_cluster_fixture.go @@ -208,7 +208,7 @@ type Delivery struct { Data int } -func NewInMemorySubscriberStore() *InMemorySubscribersStore[*cluster.Subscribers] { +func NewInMemorySubscriberStore() cluster.KeyValueStore[*cluster.Subscribers] { return &InMemorySubscribersStore[*cluster.Subscribers]{ store: &sync.Map{}, } @@ -236,3 +236,12 @@ func (i *InMemorySubscribersStore[T]) Clear(_ context.Context, key string) error i.store.Delete(key) return nil } + +func (i *InMemorySubscribersStore[T]) Keys(_ context.Context) ([]string, error) { + keys := make([]string, 0) + i.store.Range(func(key, value interface{}) bool { + keys = append(keys, key.(string)) + return true + }) + return keys, nil +} diff --git a/cluster/key_value_store.go b/cluster/key_value_store.go index 1d90ab37..41df768c 100644 --- a/cluster/key_value_store.go +++ b/cluster/key_value_store.go @@ -10,11 +10,17 @@ type KeyValueStore[T any] interface { Get(ctx context.Context, key string) (T, error) // Clear the value for the given key. Clear(ctx context.Context, key string) error + // Keys returns all the keys in the store. + Keys(ctx context.Context) ([]string, error) } // EmptyKeyValueStore is a key value store that does nothing. type EmptyKeyValueStore[T any] struct{} +func NewEmptyKeyValueStore[T any]() KeyValueStore[T] { + return &EmptyKeyValueStore[T]{} +} + func (e *EmptyKeyValueStore[T]) Set(_ context.Context, _ string, _ T) error { return nil } func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error) { @@ -23,3 +29,7 @@ func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error) { } func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error { return nil } + +func (e *EmptyKeyValueStore[T]) Keys(_ context.Context) ([]string, error) { + return make([]string, 0), nil +}