diff --git a/internal/proxy/client.go b/internal/proxy/client.go index bcbe3b9..7c07b88 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -404,7 +404,7 @@ func (c *Client) sendKeepAlive(ctx context.Context, stream cloudproxyv1alpha.Clo return fmt.Errorf("stream ended with %w", stream.Context().Err()) } if err := c.isAlive(); err != nil { - return err + return fmt.Errorf("isAlive: %w", err) } } } diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index f9dec3e..30a5b01 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -491,3 +491,99 @@ func TestClient_Run(t *testing.T) { }) } } + +func TestClient_sendKeepAlive(t *testing.T) { + t.Parallel() + + type args struct { + ctx func() context.Context + tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) + lastSeenSend int64 + lastSeenReceive int64 + keepAlive int64 + keepAliveTimeout int64 + sendCh chan *cloudproxyv1alpha.StreamCloudProxyRequest + } + tests := []struct { + name string + args args + wantErrMsg string + }{ + { + name: "context done", + args: args{ + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { // expected 0 or 1 times. + m.EXPECT().Context().Return(context.Background()).AnyTimes() // expected 0 or 1 times. + }, + lastSeenSend: time.Now().UnixNano(), + lastSeenReceive: time.Now().UnixNano(), + keepAlive: int64(time.Hour), + keepAliveTimeout: int64(time.Hour), + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 100), + }, + wantErrMsg: "context ended with context canceled", + }, + { + name: "skip keep alive", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second) + return ctx + }, + lastSeenReceive: time.Now().Add(time.Minute).UnixNano(), + lastSeenSend: time.Now().Add(time.Minute).UnixNano(), + keepAlive: int64(time.Millisecond), + keepAliveTimeout: int64(time.Hour), + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest), + }, + wantErrMsg: "context ended with context deadline exceeded", + }, + { + name: "sendch is full and exit with not alive", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + return ctx + }, + lastSeenReceive: time.Now().Add(-time.Minute).UnixNano(), + lastSeenSend: time.Now().Add(-time.Minute).UnixNano(), + keepAlive: int64(time.Millisecond), + keepAliveTimeout: int64(time.Second * 2), + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 1), + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { + m.EXPECT().Context().Return(context.Background()).AnyTimes() + }, + }, + wantErrMsg: "isAlive: client connection is not alive", + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := &Client{ + log: logrus.New(), + } + c.keepAlive.Store(tt.args.keepAlive) + c.keepAliveTimeout.Store(tt.args.keepAliveTimeout) + c.lastSeenSend.Store(tt.args.lastSeenSend) + c.lastSeenReceive.Store(tt.args.lastSeenReceive) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) + if tt.args.tuneMockStream != nil { + tt.args.tuneMockStream(stream) + } + err := c.sendKeepAlive(tt.args.ctx(), stream) + require.Equal(t, len(tt.wantErrMsg) > 0, err != nil, "error: %v", err) + if err != nil { + require.Equal(t, tt.wantErrMsg, err.Error(), "error: %v", err) + } + }) + } +}