From da12692c9ff004a6973a0f14d04701f049da22a8 Mon Sep 17 00:00:00 2001 From: jannfis Date: Thu, 29 Feb 2024 15:32:07 +0000 Subject: [PATCH] fix: Make agent reconnect when principal reappears Signed-off-by: jannfis --- agent/agent.go | 6 ++++++ agent/connection.go | 24 ++++++++++++++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 7c3feed7..a544eeb6 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -152,10 +152,16 @@ func (a *Agent) Stop() error { return nil } +// IsConnected returns whether the agent is connected to the principal func (a *Agent) IsConnected() bool { return a.remote != nil && a.connected.Load() } +// SetConnected sets the connection state of the agent +func (a *Agent) SetConnected(connected bool) { + a.connected.Store(connected) +} + func log() *logrus.Entry { return logrus.WithField("module", "Agent") } diff --git a/agent/connection.go b/agent/connection.go index a5fbf93b..427bf224 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -1,6 +1,7 @@ package agent import ( + "errors" "io" "time" @@ -18,7 +19,7 @@ func (a *Agent) maintainConnection() error { go func() { var err error for { - if !a.connected.Load() { + if !a.IsConnected() { err = a.remote.Connect(a.context, false) if err != nil { log().Warnf("Could not connect to %s: %v", a.remote.Addr(), err) @@ -27,7 +28,7 @@ func (a *Agent) maintainConnection() error { if err != nil { log().Warnf("Could not create agent queue pair: %v", err) } else { - a.connected.Store(true) + a.SetConnected(true) } } } else { @@ -103,7 +104,7 @@ func (a *Agent) handleStreamEvents() error { "direction": "Send", }) logCtx.Info("Starting to send events to event stream") - for a.connected.Load() { + for a.IsConnected() { select { case <-a.context.Done(): logCtx.Info("Context canceled") @@ -146,12 +147,19 @@ func (a *Agent) handleStreamEvents() error { if err != nil { status, ok := status.FromError(err) if !ok { - logCtx.Errorf("Error sending data: %v", err) + if errors.Is(err, io.EOF) { + logCtx.Errorf("Remote disappeared") + a.SetConnected(false) + close(syncCh) + return + } else { + logCtx.Errorf("Error sending data: %v", err) + } continue } if status.Code() == codes.Unavailable { logCtx.Info("Agent has closed the connection during send, closing send loop") - a.cancelFn() + close(syncCh) return } } @@ -165,7 +173,11 @@ func (a *Agent) handleStreamEvents() error { case <-a.context.Done(): return nil case <-syncCh: - log().WithField("componet", "EventHandller").Info("Stream closed") + log().WithField("component", "EventHandler").Info("Stream closed") + err := a.queues.Delete(a.remote.ClientID(), true) + if err != nil { + log().Errorf("Could not remove agent queue: %v", err) + } return nil default: time.Sleep(100 * time.Millisecond)