Skip to content

Commit

Permalink
Merge pull request #1038 from qazwsxedckll/dev
Browse files Browse the repository at this point in the history
bug fixes
  • Loading branch information
rogeralsing authored Mar 19, 2024
2 parents ef91a6a + 225f8e7 commit cdac50e
Show file tree
Hide file tree
Showing 17 changed files with 55 additions and 32 deletions.
4 changes: 2 additions & 2 deletions actor/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac
func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period time.Duration, throttledCallBack func(*slog.Logger, int32)) ShouldThrottle {
currentEvents := int32(0)

startTimer := func(duration time.Duration, back func(*slog.Logger, int32)) {
startTimer := func(duration time.Duration) {
go func() {
// crete ticker to mimic sleep, we do not want to put the goroutine to sleep
// as it will schedule it out of the P making a syscall, we just want it to
Expand All @@ -77,7 +77,7 @@ func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period
return func() Valve {
tries := atomic.AddInt32(&currentEvents, 1)
if tries == 1 {
startTimer(period, throttledCallBack)
startTimer(period)
}

if tries == maxEventsInPeriod {
Expand Down
11 changes: 5 additions & 6 deletions cluster/clusterproviders/k8s/k8s_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (p *Provider) startClusterMonitor(c *cluster.Cluster) error {
p.clusterMonitor, err = c.ActorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
return newClusterMonitor(p)
}), "k8s-cluster-monitor")

if err != nil {
p.cluster.Logger().Error("Failed to start k8s-cluster-monitor actor", slog.Any("error", err))
return err
Expand All @@ -177,7 +176,7 @@ func (p *Provider) registerMemberAsync(c *cluster.Cluster) {

// registers itself as a member in k8s cluster
func (p *Provider) registerMember(timeout time.Duration) error {
p.cluster.Logger().Info(fmt.Sprintf("Registering service %s on %s", p.podName, p.address))
p.cluster.Logger().Info("Registering service in Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address))

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand All @@ -187,7 +186,7 @@ func (p *Provider) registerMember(timeout time.Duration) error {
return fmt.Errorf("unable to get own pod information for %s: %w", p.podName, err)
}

p.cluster.Logger().Info(fmt.Sprintf("Using Kubernetes namespace: %s\nUsing Kubernetes port: %d", pod.Namespace, p.port))
p.cluster.Logger().Info("Using Kubernetes namespace", slog.String("namespace", pod.Namespace), slog.Int("port", p.port))

labels := Labels{
LabelCluster: p.clusterName,
Expand Down Expand Up @@ -218,7 +217,7 @@ func (p *Provider) startWatchingClusterAsync(c *cluster.Cluster) {
func (p *Provider) startWatchingCluster() error {
selector := fmt.Sprintf("%s=%s", LabelCluster, p.clusterName)

p.cluster.Logger().Debug(fmt.Sprintf("Starting to watch pods with %s", selector), slog.String("selector", selector))
p.cluster.Logger().Debug("Starting to watch pods", slog.String("selector", selector))

ctx, cancel := context.WithCancel(context.Background())
p.cancelWatch = cancel
Expand Down Expand Up @@ -365,7 +364,7 @@ func mapPodsToMembers(clusterPods map[types.UID]*v1.Pod, logger *slog.Logger) []

// deregister itself as a member from a k8s cluster
func (p *Provider) deregisterMember(timeout time.Duration) error {
p.cluster.Logger().Info(fmt.Sprintf("Deregistering service %s from %s", p.podName, p.address))
p.cluster.Logger().Info("Deregistering service from Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address))

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand Down Expand Up @@ -419,7 +418,7 @@ func (p *Provider) retrieveNamespace() string {
filename := filepath.Join(string(filepath.Separator), "var", "run", "secrets", "kubernetes.io", "serviceaccount", "namespace")
content, err := os.ReadFile(filename)
if err != nil {
p.cluster.Logger().Warn(fmt.Sprintf("Could not read %s contents defaulting to empty namespace: %s", filename, err.Error()))
p.cluster.Logger().Warn("Could not read contents, defaulting to empty namespace", slog.String("filename", filename), slog.Any("error", err))
return p.namespace
}
p.namespace = string(content)
Expand Down
2 changes: 1 addition & 1 deletion cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
RemoteConfig *remote.Config
RequestTimeoutTime time.Duration
RequestsLogThrottlePeriod time.Duration
RequestLog bool
MaxNumberOfEventsInRequestLogThrottledPeriod int
ClusterContextProducer ContextProducer
MemberStrategyBuilder func(cluster *Cluster, kind string) MemberStrategy
Expand Down Expand Up @@ -62,7 +63,6 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook
// into a valid ClusterContextConfig value and returns a pointer to its memory
func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig {
clusterContextConfig := ClusterContextConfig{

RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod,
MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod,

Expand Down
6 changes: 6 additions & 0 deletions cluster/config_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,9 @@ func WithHeartbeatExpiration(t time.Duration) ConfigOption {
c.HeartbeatExpiration = t
}
}

func WithRequestLog(enabled bool) ConfigOption {
return func(c *Config) {
c.RequestLog = enabled
}
}
5 changes: 3 additions & 2 deletions cluster/default_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"log/slog"
"reflect"
"time"

"github.com/asynkron/protoactor-go/actor"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, o

start := time.Now()

dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))
dcc.cluster.Logger().Debug("Requesting", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message))

// crate a new Timeout Context
ttl := callConfig.Timeout
Expand Down Expand Up @@ -120,7 +121,7 @@ func (dcc *DefaultContext) RequestFuture(identity string, kind string, message i

_context := callConfig.Context

dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting future %s:%s Message %#v", identity, kind, message))
dcc.cluster.Logger().Debug("Requesting future", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message))

// crate a new Timeout Context
ttl := callConfig.Timeout
Expand Down
9 changes: 4 additions & 5 deletions cluster/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newGossiper(cl *Cluster, opts ...Option) (*Gossiper, error) {

func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper getting state from %s", g.pid))
g.cluster.Logger().Debug("Gossiper getting state", slog.String("key", key), slog.String("remote", g.pid.String()))
}

msg := NewGetGossipStateRequest(key)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
// SetState Sends fire and forget message to update member state
func (g *Gossiper) SetState(key string, value proto.Message) {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid))
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
}

if g.pid == nil {
Expand All @@ -118,7 +118,7 @@ func (g *Gossiper) SetState(key string, value proto.Message) {
// SetStateRequest Sends a Request (that blocks) to update member state
func (g *Gossiper) SetStateRequest(key string, value proto.Message) error {
if g.throttler() == actor.Open {
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid))
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
}

if g.pid == nil {
Expand Down Expand Up @@ -186,7 +186,6 @@ func (g *Gossiper) StartGossiping() error {
system,
)
}), g.GossipActorName)

if err != nil {
g.cluster.Logger().Error("Failed to start gossip actor", slog.Any("error", err))
return err
Expand Down Expand Up @@ -300,5 +299,5 @@ func (g *Gossiper) blockGracefullyLeft() {
}

func (g *Gossiper) throttledLog(counter int32) {
g.cluster.Logger().Debug(fmt.Sprintf("[Gossiper] Gossiper Setting State to %s", g.pid), slog.Int("throttled", int(counter)))
g.cluster.Logger().Debug("Gossiper Setting State", slog.String("remote", g.pid.String()), slog.Int("throttled", int(counter)))
}
6 changes: 3 additions & 3 deletions cluster/pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg
}

// unsubscribeUnreachablePidSubscribers deletes all subscribers that have a PID that is unreachable
func (t *TopicActor) unsubscribeUnreachablePidSubscribers(_ actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) {
func (t *TopicActor) unsubscribeUnreachablePidSubscribers(c actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) {
subscribers := make([]subscribeIdentityStruct, 0, len(allInvalidDeliveryReports))
for _, r := range allInvalidDeliveryReports {
if r.Subscriber.GetPid() != nil && r.Status == DeliveryStatus_SubscriberNoLongerReachable {
subscribers = append(subscribers, newSubscribeIdentityStruct(r.Subscriber))
}
}
t.removeSubscribers(subscribers, nil)
t.removeSubscribers(subscribers, c.Logger())
}

// onClusterTopologyChanged handles a ClusterTopology message
Expand Down Expand Up @@ -217,7 +217,7 @@ func (t *TopicActor) unsubscribeSubscribersOnMembersThatLeft(c actor.Context) {
}
}
}
t.removeSubscribers(subscribersThatLeft, nil)
t.removeSubscribers(subscribersThatLeft, c.Logger())
}

// removeSubscribers remove subscribers from the topic
Expand Down
3 changes: 3 additions & 0 deletions protobuf/protoc-gen-go-grain/templates/grain.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (g *{{ $service.Name }}GrainClient) {{ $method.Name }}Future(r *{{ $method.
{{ end }}
// {{ $method.Name }} requests the execution on to the cluster with CallOptions
func (g *{{ $service.Name }}GrainClient) {{ $method.Name }}(r *{{ $method.Input }}, opts ...cluster.GrainCallOption) (*{{ $method.Output }}, error) {
if g.cluster.Config.RequestLog {
g.cluster.Logger().Info("Requesting", slog.String("identity", g.Identity), slog.String("kind", "{{ $service.Name }}"), slog.String("method", "{{ $method.Name }}"), slog.Any("request", r))
}
bytes, err := proto.Marshal(r)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion protobuf/protoc-gen-go-grain/test/error/error.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions protobuf/protoc-gen-go-grain/test/error/error_grain.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protobuf/protoc-gen-go-grain/test/hello/hello.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions protobuf/protoc-gen-go-grain/test/hello/hello_grain.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protobuf/protoc-gen-go-grain/test/reenter/hello.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions protobuf/protoc-gen-go-grain/test/reenter/hello_grain.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protobuf/protoc-gen-go-grain/version.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package main

const version = "v0.6.1"
const version = "v0.7.0"

0 comments on commit cdac50e

Please sign in to comment.