diff --git a/internal/proxy/client.go b/internal/proxy/client.go index ac5b98d..690215e 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -127,6 +127,19 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI defer cancel() go c.sendKeepAlive(ctxWithCancel, stream) + go func() { + c.log.Info("Polling stream for messages") + + for { + in, err := stream.Recv() + if err != nil { + c.log.Errorf("stream.Recv: %v", err) + } + + go c.handleMessage(in, stream) + } + }() + for { if ctx.Err() != nil { return ctx.Err() @@ -134,14 +147,7 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI if !c.isAlive() { return fmt.Errorf("last seen too old, closing stream") } - c.log.Info("Polling stream for messages") - in, err := stream.Recv() - if err != nil { - return fmt.Errorf("stream.Recv: %w", err) - } - - c.log.Info("Handling message from castai") - go c.handleMessage(in, stream) + time.Sleep(time.Duration(c.keepAliveTimeout.Load())) } }