Skip to content

Commit

Permalink
add timeouts and return
Browse files Browse the repository at this point in the history
  • Loading branch information
ValyaB committed Sep 27, 2024
1 parent 84a1cde commit b949c2d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
3 changes: 2 additions & 1 deletion cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ func main() {
MaxDelay: 5 * time.Second,
Multiplier: 1.2,
},
MinConnectTimeout: 10 * time.Second,
}
dialOpts = append(dialOpts, grpc.WithConnectParams(connectParams))
dialOpts = append(dialOpts, grpc.WithConnectParams(connectParams), grpc.WithIdleTimeout(cfg.KeepAliveTimeout))

logger.Infof(
"Creating grpc channel against (%s) with connection config (%+v) and TLS enabled=%v",
Expand Down
21 changes: 11 additions & 10 deletions internal/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,7 @@ func (c *Client) Run(ctx context.Context) error {
return authCtx.Err()
case <-t.C:
c.log.Info("Starting proxy client")
stream, closeStream, err := c.getStream(authCtx)
if err != nil {
c.log.Errorf("Could not get stream, restarting proxy client in %vs: %v", time.Duration(c.keepAlive.Load()).Seconds(), err)
t.Reset(time.Duration(c.keepAlive.Load()))
continue
}

err = c.run(authCtx, stream, closeStream)
err := c.run(authCtx)
if err != nil {
c.log.Errorf("Restarting proxy client in %vs: due to error: %v", time.Duration(c.keepAlive.Load()).Seconds(), err)
t.Reset(time.Duration(c.keepAlive.Load()))
Expand Down Expand Up @@ -133,10 +126,17 @@ func (c *Client) sendInitialRequest(stream cloudproxyv1alpha.CloudProxyAPI_Strea
return nil
}

func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, closeStream func()) error {
func (c *Client) run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, closeStream, err := c.getStream(ctx)
if err != nil {
return fmt.Errorf("c.getStream: %w", err)
}
defer closeStream()

err := c.sendInitialRequest(stream)
err = c.sendInitialRequest(stream)
if err != nil {
return fmt.Errorf("c.Connect: %w", err)
}
Expand Down Expand Up @@ -189,6 +189,7 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI
case req := <-messageRespCh:
if err := stream.Send(req); err != nil {
c.log.WithError(err).Warn("failed to send message response")
return fmt.Errorf("stream.Send: %w", err)
}
case <-time.After(time.Duration(c.keepAlive.Load())):
if !c.isAlive() {
Expand Down

0 comments on commit b949c2d

Please sign in to comment.