From b82229a8d569ed6ee0627182c1c84b1e9337f7f4 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Mon, 16 Sep 2024 14:13:28 +0200 Subject: [PATCH] check for aliveness without blocking on receive from stream --- internal/proxy/client.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/internal/proxy/client.go b/internal/proxy/client.go index ac5b98d..22ed9de 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -127,6 +127,20 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI defer cancel() go c.sendKeepAlive(ctxWithCancel, stream) + go func() { + for { + c.log.Debugf("Polling stream for messages") + + in, err := stream.Recv() + if err != nil { + c.log.Errorf("stream.Recv: %v", err) + } + + c.log.Debugf("Handling message from castai") + go c.handleMessage(in, stream) + } + }() + for { if ctx.Err() != nil { return ctx.Err() @@ -134,14 +148,7 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI if !c.isAlive() { return fmt.Errorf("last seen too old, closing stream") } - c.log.Info("Polling stream for messages") - in, err := stream.Recv() - if err != nil { - return fmt.Errorf("stream.Recv: %w", err) - } - - c.log.Info("Handling message from castai") - go c.handleMessage(in, stream) + time.Sleep(time.Duration(c.keepAliveTimeout.Load())) } }