From 5eb609f64db03c75a63204e8326caef873398b80 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Fri, 25 Oct 2024 18:24:34 +0200 Subject: [PATCH] Fix connected resource counts after keepalive errors (#47931) (#47949) * Fix connected resource counts after keepalive errors * Log server_id when cleaning up resources --- lib/auth/auth.go | 4 +-- lib/inventory/controller.go | 32 ++++++++++++---------- lib/inventory/controller_test.go | 47 +++++++++++++++++++++++++++++++- 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 540df63049468..8b66ba87e389b 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -487,9 +487,9 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) { log.Warnf("missing connected resources gauge for keep alive %s (this is a bug)", s) } }), - inventory.WithOnDisconnect(func(s string) { + inventory.WithOnDisconnect(func(s string, c int) { if g, ok := connectedResourceGauges[s]; ok { - g.Dec() + g.Sub(float64(c)) } else { log.Warnf("missing connected resources gauge for keep alive %s (this is a bug)", s) } diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index ae5258cf97630..e401c4e239c01 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -105,7 +105,7 @@ type controllerOptions struct { maxKeepAliveErrs int authID string onConnectFunc func(string) - onDisconnectFunc func(string) + onDisconnectFunc func(string, int) } func (options *controllerOptions) SetDefaults() { @@ -127,11 +127,11 @@ func (options *controllerOptions) SetDefaults() { } if options.onConnectFunc == nil { - options.onConnectFunc = func(s string) {} + options.onConnectFunc = func(string) {} } if options.onDisconnectFunc == nil { - options.onDisconnectFunc = func(s string) {} + options.onDisconnectFunc = func(string, int) {} } } @@ -154,12 +154,12 @@ func WithOnConnect(f func(heartbeatKind string)) ControllerOption { } } -// WithOnDisconnect sets a function to be called every time an existing -// instance disconnects from the inventory control stream. The value -// provided to the callback is the keep alive type of the disconnected -// resource. The callback should return quickly so as not to prevent -// processing of heartbeats. -func WithOnDisconnect(f func(heartbeatKind string)) ControllerOption { +// WithOnDisconnect sets a function to be called every time an existing instance +// disconnects from the inventory control stream. The values provided to the +// callback are the keep alive type of the disconnected resource, as well as a +// count of how many resources disconnected at once. The callback should return +// quickly so as not to prevent processing of heartbeats. +func WithOnDisconnect(f func(heartbeatKind string, amount int)) ControllerOption { return func(opts *controllerOptions) { opts.onDisconnectFunc = f } @@ -200,7 +200,7 @@ type Controller struct { usageReporter usagereporter.UsageReporter testEvents chan testEvent onConnectFunc func(string) - onDisconnectFunc func(string) + onDisconnectFunc func(string, int) closeContext context.Context cancel context.CancelFunc } @@ -324,7 +324,10 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { defer func() { if handle.goodbye.GetDeleteResources() { - log.WithField("apps", len(handle.appServers)).Debug("Cleaning up resources in response to instance termination") + log.WithFields(log.Fields{ + "apps": len(handle.appServers), + "server_id": handle.Hello().ServerID, + }).Debug("Cleaning up resources in response to instance termination") for _, app := range handle.appServers { if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) { log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err) @@ -341,11 +344,11 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { handle.ticker.Stop() if handle.sshServer != nil { - c.onDisconnectFunc(constants.KeepAliveNode) + c.onDisconnectFunc(constants.KeepAliveNode, 1) } - for range handle.appServers { - c.onDisconnectFunc(constants.KeepAliveApp) + if len(handle.appServers) > 0 { + c.onDisconnectFunc(constants.KeepAliveApp, len(handle.appServers)) } clear(handle.appServers) @@ -677,6 +680,7 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e if shouldRemove { c.testEvent(appKeepAliveDel) + c.onDisconnectFunc(constants.KeepAliveApp, 1) delete(handle.appServers, name) } } else { diff --git a/lib/inventory/controller_test.go b/lib/inventory/controller_test.go index 323bc712e21b0..66c7bd3ca5940 100644 --- a/lib/inventory/controller_test.go +++ b/lib/inventory/controller_test.go @@ -144,11 +144,14 @@ func TestSSHServerBasics(t *testing.T) { expectAddr: wantAddr, } + rc := &resourceCounter{} controller := NewController( auth, usagereporter.DiscardUsageReporter{}, withServerKeepAlive(time.Millisecond*200), withTestEventsChannel(events), + WithOnConnect(rc.onConnect), + WithOnDisconnect(rc.onDisconnect), ) defer controller.Close() @@ -282,6 +285,9 @@ func TestSSHServerBasics(t *testing.T) { // here). require.Equal(t, int64(0), controller.instanceHBVariableDuration.Count()) + // verify that metrics have been updated correctly + require.Zero(t, 0, rc.count()) + // verify that the peer address of the control stream was used to override // zero-value IPs for heartbeats. auth.mu.Lock() @@ -305,11 +311,14 @@ func TestAppServerBasics(t *testing.T) { auth := &fakeAuth{} + rc := &resourceCounter{} controller := NewController( auth, usagereporter.DiscardUsageReporter{}, withServerKeepAlive(time.Millisecond*200), withTestEventsChannel(events), + WithOnConnect(rc.onConnect), + WithOnDisconnect(rc.onDisconnect), ) defer controller.Close() @@ -500,6 +509,9 @@ func TestAppServerBasics(t *testing.T) { // always *before* closure is propagated to downstream handle, hence being safe to load // here). require.Equal(t, int64(0), controller.instanceHBVariableDuration.Count()) + + // verify that metrics have been updated correctly + require.Zero(t, rc.count()) } // TestInstanceHeartbeat verifies basic expected behaviors for instance heartbeat. @@ -897,7 +909,6 @@ func TestGoodbye(t *testing.T) { } func TestGetSender(t *testing.T) { - controller := NewController( &fakeAuth{}, usagereporter.DiscardUsageReporter{}, @@ -1008,3 +1019,37 @@ func awaitEvents(t *testing.T, ch <-chan testEvent, opts ...eventOption) { } } } + +type resourceCounter struct { + mu sync.Mutex + c map[string]int +} + +func (r *resourceCounter) onConnect(typ string) { + r.mu.Lock() + defer r.mu.Unlock() + if r.c == nil { + r.c = make(map[string]int) + } + r.c[typ]++ +} + +func (r *resourceCounter) onDisconnect(typ string, amount int) { + r.mu.Lock() + defer r.mu.Unlock() + if r.c == nil { + r.c = make(map[string]int) + } + r.c[typ] -= amount +} + +func (r *resourceCounter) count() int { + r.mu.Lock() + defer r.mu.Unlock() + + var count int + for _, v := range r.c { + count += v + } + return count +}