Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:get all keys #1040

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions actor/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac
func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period time.Duration, throttledCallBack func(*slog.Logger, int32)) ShouldThrottle {
currentEvents := int32(0)

startTimer := func(duration time.Duration, back func(*slog.Logger, int32)) {
startTimer := func(duration time.Duration) {
go func() {
// crete ticker to mimic sleep, we do not want to put the goroutine to sleep
// as it will schedule it out of the P making a syscall, we just want it to
Expand All @@ -77,7 +77,7 @@ func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period
return func() Valve {
tries := atomic.AddInt32(&currentEvents, 1)
if tries == 1 {
startTimer(period, throttledCallBack)
startTimer(period)
}

if tries == maxEventsInPeriod {
Expand Down
11 changes: 10 additions & 1 deletion cluster/cluster_test_tool/pubsub_cluster_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ type Delivery struct {
Data int
}

func NewInMemorySubscriberStore() *InMemorySubscribersStore[*cluster.Subscribers] {
func NewInMemorySubscriberStore() cluster.KeyValueStore[*cluster.Subscribers] {
return &InMemorySubscribersStore[*cluster.Subscribers]{
store: &sync.Map{},
}
Expand Down Expand Up @@ -236,3 +236,12 @@ func (i *InMemorySubscribersStore[T]) Clear(_ context.Context, key string) error
i.store.Delete(key)
return nil
}

func (i *InMemorySubscribersStore[T]) Keys(_ context.Context) ([]string, error) {
keys := make([]string, 0)
i.store.Range(func(key, value interface{}) bool {
keys = append(keys, key.(string))
return true
})
return keys, nil
}
11 changes: 5 additions & 6 deletions cluster/clusterproviders/k8s/k8s_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (p *Provider) startClusterMonitor(c *cluster.Cluster) error {
p.clusterMonitor, err = c.ActorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
return newClusterMonitor(p)
}), "k8s-cluster-monitor")

if err != nil {
p.cluster.Logger().Error("Failed to start k8s-cluster-monitor actor", slog.Any("error", err))
return err
Expand All @@ -177,7 +176,7 @@ func (p *Provider) registerMemberAsync(c *cluster.Cluster) {

// registers itself as a member in k8s cluster
func (p *Provider) registerMember(timeout time.Duration) error {
p.cluster.Logger().Info(fmt.Sprintf("Registering service %s on %s", p.podName, p.address))
p.cluster.Logger().Info("Registering service in Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address))

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand All @@ -187,7 +186,7 @@ func (p *Provider) registerMember(timeout time.Duration) error {
return fmt.Errorf("unable to get own pod information for %s: %w", p.podName, err)
}

p.cluster.Logger().Info(fmt.Sprintf("Using Kubernetes namespace: %s\nUsing Kubernetes port: %d", pod.Namespace, p.port))
p.cluster.Logger().Info("Using Kubernetes namespace", slog.String("namespace", pod.Namespace), slog.Int("port", p.port))

labels := Labels{
LabelCluster: p.clusterName,
Expand Down Expand Up @@ -218,7 +217,7 @@ func (p *Provider) startWatchingClusterAsync(c *cluster.Cluster) {
func (p *Provider) startWatchingCluster() error {
selector := fmt.Sprintf("%s=%s", LabelCluster, p.clusterName)

p.cluster.Logger().Debug(fmt.Sprintf("Starting to watch pods with %s", selector), slog.String("selector", selector))
p.cluster.Logger().Debug("Starting to watch pods", slog.String("selector", selector))

ctx, cancel := context.WithCancel(context.Background())
p.cancelWatch = cancel
Expand Down Expand Up @@ -365,7 +364,7 @@ func mapPodsToMembers(clusterPods map[types.UID]*v1.Pod, logger *slog.Logger) []

// deregister itself as a member from a k8s cluster
func (p *Provider) deregisterMember(timeout time.Duration) error {
p.cluster.Logger().Info(fmt.Sprintf("Deregistering service %s from %s", p.podName, p.address))
p.cluster.Logger().Info("Deregistering service from Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address))

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand Down Expand Up @@ -419,7 +418,7 @@ func (p *Provider) retrieveNamespace() string {
filename := filepath.Join(string(filepath.Separator), "var", "run", "secrets", "kubernetes.io", "serviceaccount", "namespace")
content, err := os.ReadFile(filename)
if err != nil {
p.cluster.Logger().Warn(fmt.Sprintf("Could not read %s contents defaulting to empty namespace: %s", filename, err.Error()))
p.cluster.Logger().Warn("Could not read contents, defaulting to empty namespace", slog.String("filename", filename), slog.Any("error", err))
return p.namespace
}
p.namespace = string(content)
Expand Down
5 changes: 3 additions & 2 deletions cluster/default_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"log/slog"
"reflect"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, o

start := time.Now()

dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))
dcc.cluster.Logger().Debug("Requesting", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message))

// crate a new Timeout Context
ttl := callConfig.Timeout
Expand Down Expand Up @@ -120,7 +121,7 @@ func (dcc *DefaultContext) RequestFuture(identity string, kind string, message i

_context := callConfig.Context

dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting future %s:%s Message %#v", identity, kind, message))
dcc.cluster.Logger().Debug("Requesting future", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message))

// crate a new Timeout Context
ttl := callConfig.Timeout
Expand Down
9 changes: 4 additions & 5 deletions cluster/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newGossiper(cl *Cluster, opts ...Option) (*Gossiper, error) {

func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper getting state from %s", g.pid))
g.cluster.Logger().Debug("Gossiper getting state", slog.String("key", key), slog.String("remote", g.pid.String()))
}

msg := NewGetGossipStateRequest(key)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
// SetState Sends fire and forget message to update member state
func (g *Gossiper) SetState(key string, value proto.Message) {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid))
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
}

if g.pid == nil {
Expand All @@ -118,7 +118,7 @@ func (g *Gossiper) SetState(key string, value proto.Message) {
// SetStateRequest Sends a Request (that blocks) to update member state
func (g *Gossiper) SetStateRequest(key string, value proto.Message) error {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid))
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
}

if g.pid == nil {
Expand Down Expand Up @@ -186,7 +186,6 @@ func (g *Gossiper) StartGossiping() error {
system,
)
}), g.GossipActorName)

if err != nil {
g.cluster.Logger().Error("Failed to start gossip actor", slog.Any("error", err))
return err
Expand Down Expand Up @@ -300,5 +299,5 @@ func (g *Gossiper) blockGracefullyLeft() {
}

func (g *Gossiper) throttledLog(counter int32) {
g.cluster.Logger().Debug(fmt.Sprintf("[Gossiper] Gossiper Setting State to %s", g.pid), slog.Int("throttled", int(counter)))
g.cluster.Logger().Debug("Gossiper Setting State", slog.String("remote", g.pid.String()), slog.Int("throttled", int(counter)))
}
10 changes: 10 additions & 0 deletions cluster/key_value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ type KeyValueStore[T any] interface {
Get(ctx context.Context, key string) (T, error)
// Clear the value for the given key.
Clear(ctx context.Context, key string) error
// Keys returns all the keys in the store.
Keys(ctx context.Context) ([]string, error)
}

// EmptyKeyValueStore is a key value store that does nothing.
type EmptyKeyValueStore[T any] struct{}

func NewEmptyKeyValueStore[T any]() KeyValueStore[T] {
return &EmptyKeyValueStore[T]{}
}

func (e *EmptyKeyValueStore[T]) Set(_ context.Context, _ string, _ T) error { return nil }

func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error) {
Expand All @@ -23,3 +29,7 @@ func (e *EmptyKeyValueStore[T]) Get(_ context.Context, _ string) (T, error) {
}

func (e *EmptyKeyValueStore[T]) Clear(_ context.Context, _ string) error { return nil }

func (e *EmptyKeyValueStore[T]) Keys(_ context.Context) ([]string, error) {
return make([]string, 0), nil
}
6 changes: 3 additions & 3 deletions cluster/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg
}

// unsubscribeUnreachablePidSubscribers deletes all subscribers that have a PID that is unreachable
func (t *TopicActor) unsubscribeUnreachablePidSubscribers(_ actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) {
func (t *TopicActor) unsubscribeUnreachablePidSubscribers(c actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) {
subscribers := make([]subscribeIdentityStruct, 0, len(allInvalidDeliveryReports))
for _, r := range allInvalidDeliveryReports {
if r.Subscriber.GetPid() != nil && r.Status == DeliveryStatus_SubscriberNoLongerReachable {
subscribers = append(subscribers, newSubscribeIdentityStruct(r.Subscriber))
}
}
t.removeSubscribers(subscribers, nil)
t.removeSubscribers(subscribers, c.Logger())
}

// onClusterTopologyChanged handles a ClusterTopology message
Expand Down Expand Up @@ -217,7 +217,7 @@ func (t *TopicActor) unsubscribeSubscribersOnMembersThatLeft(c actor.Context) {
}
}
}
t.removeSubscribers(subscribersThatLeft, nil)
t.removeSubscribers(subscribersThatLeft, c.Logger())
}

// removeSubscribers remove subscribers from the topic
Expand Down
Loading