From cf5b9d14dd040261e46f11bfca9e47e1b84f6db3 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 4 Oct 2024 12:32:34 +0200 Subject: [PATCH 1/3] fix tests --- internal/proxy/client.go | 13 +- internal/proxy/client_test.go | 285 ++++++++++++++-------------------- 2 files changed, 128 insertions(+), 170 deletions(-) diff --git a/internal/proxy/client.go b/internal/proxy/client.go index f3dce8e..bcbe3b9 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -222,7 +222,10 @@ func (c *Client) send(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAP return ctx.Err() case <-stream.Context().Done(): return fmt.Errorf("stream closed %w", stream.Context().Err()) - case req := <-c.sendCh: + case req, ok := <-c.sendCh: + if !ok { + return fmt.Errorf("send channel closed %w", ctx.Err()) + } c.log.Printf("Sending message to stream %v len=%v", req.GetResponse().GetMessageId(), len(req.GetResponse().GetHttpResponse().GetBody())) if err := stream.Send(req); err != nil { c.log.WithError(err).Warn("failed to send message to stream") @@ -329,7 +332,7 @@ func (c *Client) processHTTPRequest(req *cloudproxyv1alpha.HTTPRequest) *cloudpr Error: lo.ToPtr("nil http request"), } } - httpReq, err := c.toHTTPRequest(req) + httpReq, err := toHTTPRequest(req) if err != nil { return &cloudproxyv1alpha.HTTPResponse{ Error: lo.ToPtr(fmt.Sprintf("toHTTPRequest: %v", err)), @@ -344,7 +347,7 @@ func (c *Client) processHTTPRequest(req *cloudproxyv1alpha.HTTPRequest) *cloudpr } c.processedCount.Add(1) - return c.toResponse(resp) + return toResponse(resp) } var errAlive = fmt.Errorf("client connection is not alive") @@ -411,7 +414,7 @@ func (c *Client) sendKeepAlive(ctx context.Context, stream cloudproxyv1alpha.Clo var errBadRequest = fmt.Errorf("bad request") -func (c *Client) toHTTPRequest(req *cloudproxyv1alpha.HTTPRequest) (*http.Request, error) { +func toHTTPRequest(req *cloudproxyv1alpha.HTTPRequest) (*http.Request, error) { if req == nil { return nil, fmt.Errorf("nil http request %w", errBadRequest) } @@ -430,7 +433,7 @@ func (c *Client) toHTTPRequest(req *cloudproxyv1alpha.HTTPRequest) (*http.Reques return reqHTTP, nil } -func (c *Client) toResponse(resp *http.Response) *cloudproxyv1alpha.HTTPResponse { +func toResponse(resp *http.Response) *cloudproxyv1alpha.HTTPResponse { if resp == nil { return &cloudproxyv1alpha.HTTPResponse{ Error: lo.ToPtr("nil response"), diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index 0a91cc3..f9dec3e 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -1,4 +1,4 @@ -// nolint: gocritic +// nolint: govet,gocritic package proxy import ( @@ -31,19 +31,14 @@ func (m mockReadCloserErr) Close() error { return nil } func TestClient_toResponse(t *testing.T) { t.Parallel() - type fields struct { - // tuneMockCredentials func(m *mock_gcp.MockCredentials) - // httpClient *http.Client. - } type args struct { msgID string resp *http.Response } tests := []struct { - name string - fields fields - args args - want *cloudproxyv1alpha.HTTPResponse + name string + args args + want *cloudproxyv1alpha.HTTPResponse }{ { name: "nil response", @@ -83,17 +78,7 @@ func TestClient_toResponse(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - c := New(nil, logrus.New(), "version", &config.Config{ - ClusterID: "clusterID", - PodMetadata: config.PodMetadata{ - PodName: "podName", - }, - KeepAlive: time.Second, - KeepAliveTimeout: time.Minute, - }) - got := c.toResponse(tt.args.resp) - // diff := cmp.Diff(got, tt.want, protocmp.Transform()) - // require.Empty(t, diff). + got := toResponse(tt.args.resp) require.Equal(t, tt.want, got) }) } @@ -153,15 +138,7 @@ func TestClient_toHTTPRequest(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - c := New(nil, logrus.New(), "version", &config.Config{ - ClusterID: "clusterID", - PodMetadata: config.PodMetadata{ - PodName: "podName", - }, - KeepAlive: time.Second, - KeepAliveTimeout: time.Minute, - }) - got, err := c.toHTTPRequest(tt.args.req) + got, err := toHTTPRequest(tt.args.req) require.Equal(t, tt.wantErr, err != nil, err) if err != nil { return @@ -180,8 +157,9 @@ func TestClient_handleMessage(t *testing.T) { tuneMockCloudClient func(m *mock_proxy.MockCloudClient) } type args struct { - in *cloudproxyv1alpha.StreamCloudProxyResponse - tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) + ctx func() context.Context + in *cloudproxyv1alpha.StreamCloudProxyResponse + sendCh chan *cloudproxyv1alpha.StreamCloudProxyRequest } tests := []struct { name string @@ -190,15 +168,25 @@ func TestClient_handleMessage(t *testing.T) { wantKeepAlive int64 wantKeepAliveTimeout int64 wantErrCount int64 + wantMsgID string + wantMsgError string }{ { - name: "nil response", + name: "nil response", + args: args{ + ctx: func() context.Context { + return context.Background() + }, + }, wantKeepAlive: int64(config.KeepAliveDefault), wantKeepAliveTimeout: int64(config.KeepAliveTimeoutDefault), }, { name: "keep alive", args: args{ + ctx: func() context.Context { + return context.Background() + }, in: &cloudproxyv1alpha.StreamCloudProxyResponse{ MessageId: KeepAliveMessageID, }, @@ -209,6 +197,9 @@ func TestClient_handleMessage(t *testing.T) { { name: "keep alive timeout and keepalive", args: args{ + ctx: func() context.Context { + return context.Background() + }, in: &cloudproxyv1alpha.StreamCloudProxyResponse{ MessageId: KeepAliveMessageID, Response: &cloudproxyv1alpha.StreamCloudProxyResponse_ConfigurationRequest{ @@ -222,40 +213,40 @@ func TestClient_handleMessage(t *testing.T) { wantKeepAlive: 1, wantKeepAliveTimeout: 2, }, - //{ - // name: "http error, send error", - // args: args{ - // in: &cloudproxyv1alpha.StreamCloudProxyResponse{ - // MessageId: "msgID", - // HttpRequest: &cloudproxyv1alpha.HTTPRequest{}, - // }, - // tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { - // m.EXPECT().Send(&cloudproxyv1alpha.StreamCloudProxyRequest{ - // Request: &cloudproxyv1alpha.StreamCloudProxyRequest_Response{ - // Response: &cloudproxyv1alpha.ClusterResponse{ - // ClientMetadata: &cloudproxyv1alpha.ClientMetadata{ - // PodName: "podName", - // ClusterId: "clusterID", - // }, - // MessageId: "msgID", - // HttpResponse: &cloudproxyv1alpha.HTTPResponse{ - // Error: lo.ToPtr("c.cloudClient.DoHTTPRequest: error"), - // }, - // }, - // }, - // }).Return(fmt.Errorf("error")) - // }, - // }, - // fields: fields{ - // tuneMockCloudClient: func(m *mock_proxy.MockCloudClient) { - // m.EXPECT().DoHTTPRequest(gomock.Any()).Return(nil, fmt.Errorf("error")) - // }, - // }, - // wantLastSeenUpdated: false, - // wantKeepAlive: int64(config.KeepAliveDefault), - // wantKeepAliveTimeout: int64(config.KeepAliveTimeoutDefault), - // wantErrCount: 1, - //}. + { + name: "http request is nil", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + return ctx + }, + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 1), + in: &cloudproxyv1alpha.StreamCloudProxyResponse{ + MessageId: "msgID", + }, + }, + wantErrCount: 0, + wantKeepAlive: int64(config.KeepAliveDefault), + wantKeepAliveTimeout: int64(config.KeepAliveTimeoutDefault), + wantMsgID: "msgID", + wantMsgError: "nil http request", + }, + { + name: "send is full exit on context done", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + return ctx + }, + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest), + in: &cloudproxyv1alpha.StreamCloudProxyResponse{ + MessageId: "msgID", + }, + }, + wantErrCount: 0, + wantKeepAlive: int64(config.KeepAliveDefault), + wantKeepAliveTimeout: int64(config.KeepAliveTimeoutDefault), + }, } for _, tt := range tests { tt := tt @@ -267,23 +258,26 @@ func TestClient_handleMessage(t *testing.T) { if tt.fields.tuneMockCloudClient != nil { tt.fields.tuneMockCloudClient(cloudClient) } - c := New(cloudClient, logrus.New(), "version", &config.Config{ - ClusterID: "clusterID", - PodMetadata: config.PodMetadata{ - PodName: "podName", - }, - KeepAlive: config.KeepAliveDefault, - KeepAliveTimeout: config.KeepAliveTimeoutDefault, - }) - stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) - if tt.args.tuneMockStream != nil { - tt.args.tuneMockStream(stream) + c := &Client{ + sendCh: tt.args.sendCh, + log: logrus.New(), + cloudClient: cloudClient, } + c.keepAlive.Store(int64(config.KeepAliveDefault)) + c.keepAliveTimeout.Store(int64(config.KeepAliveTimeoutDefault)) - c.handleMessage(context.Background(), tt.args.in) + c.handleMessage(tt.args.ctx(), tt.args.in) require.Equal(t, tt.wantKeepAlive, c.keepAlive.Load(), "keepAlive: %v", c.keepAlive.Load()) require.Equal(t, tt.wantKeepAliveTimeout, c.keepAliveTimeout.Load(), "keepAliveTimeout: %v", c.keepAliveTimeout.Load()) require.Equal(t, tt.wantErrCount, c.errCount.Load(), "errCount: %v", c.errCount.Load()) + var msg *cloudproxyv1alpha.StreamCloudProxyRequest + select { + case msg = <-c.sendCh: + default: + break + } + require.Equal(t, tt.wantMsgID, msg.GetResponse().GetMessageId(), "msgID: %v", c.sendCh) + require.Equal(t, tt.wantMsgError, msg.GetResponse().GetHttpResponse().GetError(), "msg response error: %v", c.sendCh) }) } } @@ -374,74 +368,6 @@ func TestClient_processHttpRequest(t *testing.T) { } } -// nolint -//func TestClient_sendKeepAlive(t *testing.T) { -// t.Parallel() -// -// type args struct { -// tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) -// keepAlive int64 -// keepAliveTimeout int64 -// } -// tests := []struct { -// name string -// args args -// isLastSeenZero bool -// }{ -// { -// name: "end of ticker", -// args: args{ -// keepAlive: 0, -// tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { -// m.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes() -// m.EXPECT().Context().Return(context.Background()).AnyTimes() -// }, -// }, -// }, -// { -// name: "send returned error, should exit", -// args: args{ -// tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { -// m.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("error")) -// m.EXPECT().Context().Return(context.Background()).AnyTimes() -// }, -// keepAlive: int64(time.Second), -// keepAliveTimeout: int64(10 * time.Minute), -// }, -// isLastSeenZero: true, -// }, -// } -// for _, tt := range tests { -// tt := tt -// t.Run(tt.name, func(t *testing.T) { -// t.Parallel() -// ctrl := gomock.NewController(t) -// defer ctrl.Finish() -// -// c := New(nil, nil, logrus.New(), "podName", "clusterID", -// "version", "apiKey", config.KeepAliveDefault, config.KeepAliveTimeoutDefault) -// c.keepAlive.Store(tt.args.keepAlive) -// c.keepAliveTimeout.Store(tt.args.keepAliveTimeout) -// -// stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) -// if tt.args.tuneMockStream != nil { -// tt.args.tuneMockStream(stream) -// } -// c.lastSeen.Store(time.Now().UnixNano()) -// -// kaCh := make(chan *cloudproxyv1alpha.StreamCloudProxyRequest) -// go func() { -// for { -// <-kaCh -// } -// }() -// -// c.sendKeepAlive(stream, kaCh) -// require.Equal(t, tt.isLastSeenZero, c.lastSeen.Load() == 0, "lastSeen: %v", c.lastSeen.Load()) -// }) -// } -//}. - func TestClient_sendAndReceive(t *testing.T) { t.Parallel() @@ -454,24 +380,6 @@ func TestClient_sendAndReceive(t *testing.T) { args args wantErr bool }{ - { - name: "send initial error", - args: args{ - ctx: func() context.Context { - return context.Background() - }, - tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { - m.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("test error")) - m.EXPECT().Recv().DoAndReturn(func() (*cloudproxyv1alpha.StreamCloudProxyResponse, error) { - time.Sleep(time.Millisecond) - return &cloudproxyv1alpha.StreamCloudProxyResponse{}, nil - }).AnyTimes() - m.EXPECT().Context().Return(context.Background()).AnyTimes() - m.EXPECT().CloseSend().Return(nil) - }, - }, - wantErr: true, - }, { name: "context done", args: args{ @@ -521,7 +429,7 @@ func TestClient_sendAndReceive(t *testing.T) { PodName: "podName", }, KeepAlive: time.Second, - KeepAliveTimeout: time.Second * 2, + KeepAliveTimeout: time.Hour, }) c.lastSeenReceive.Store(time.Now().UnixNano()) c.lastSeenSend.Store(time.Now().UnixNano()) @@ -536,3 +444,50 @@ func TestClient_sendAndReceive(t *testing.T) { }) } } + +func TestClient_Run(t *testing.T) { + t.Parallel() + type args struct { + ctx func() context.Context + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "context done", + args: args{ + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + }, + wantErr: true, + }, + { + name: "get stream error", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + return ctx + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + c := New(nil, logrus.New(), "tt.fields.version", &config.Config{KeepAlive: time.Second, KeepAliveTimeout: time.Hour}) + if err := c.Run(tt.args.ctx()); (err != nil) != tt.wantErr { + t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} From b0a2b46119f6d7ec06063980adccdca43b9df81b Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 4 Oct 2024 13:01:43 +0200 Subject: [PATCH 2/3] fix tests --- internal/proxy/client.go | 2 +- internal/proxy/client_test.go | 96 +++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/internal/proxy/client.go b/internal/proxy/client.go index bcbe3b9..7c07b88 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -404,7 +404,7 @@ func (c *Client) sendKeepAlive(ctx context.Context, stream cloudproxyv1alpha.Clo return fmt.Errorf("stream ended with %w", stream.Context().Err()) } if err := c.isAlive(); err != nil { - return err + return fmt.Errorf("isAlive: %w", err) } } } diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index f9dec3e..30a5b01 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -491,3 +491,99 @@ func TestClient_Run(t *testing.T) { }) } } + +func TestClient_sendKeepAlive(t *testing.T) { + t.Parallel() + + type args struct { + ctx func() context.Context + tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) + lastSeenSend int64 + lastSeenReceive int64 + keepAlive int64 + keepAliveTimeout int64 + sendCh chan *cloudproxyv1alpha.StreamCloudProxyRequest + } + tests := []struct { + name string + args args + wantErrMsg string + }{ + { + name: "context done", + args: args{ + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { // expected 0 or 1 times. + m.EXPECT().Context().Return(context.Background()).AnyTimes() // expected 0 or 1 times. + }, + lastSeenSend: time.Now().UnixNano(), + lastSeenReceive: time.Now().UnixNano(), + keepAlive: int64(time.Hour), + keepAliveTimeout: int64(time.Hour), + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 100), + }, + wantErrMsg: "context ended with context canceled", + }, + { + name: "skip keep alive", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second) + return ctx + }, + lastSeenReceive: time.Now().Add(time.Minute).UnixNano(), + lastSeenSend: time.Now().Add(time.Minute).UnixNano(), + keepAlive: int64(time.Millisecond), + keepAliveTimeout: int64(time.Hour), + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest), + }, + wantErrMsg: "context ended with context deadline exceeded", + }, + { + name: "sendch is full and exit with not alive", + args: args{ + ctx: func() context.Context { + ctx, _ := context.WithTimeout(context.Background(), time.Second*2) + return ctx + }, + lastSeenReceive: time.Now().Add(-time.Minute).UnixNano(), + lastSeenSend: time.Now().Add(-time.Minute).UnixNano(), + keepAlive: int64(time.Millisecond), + keepAliveTimeout: int64(time.Second * 2), + sendCh: make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 1), + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { + m.EXPECT().Context().Return(context.Background()).AnyTimes() + }, + }, + wantErrMsg: "isAlive: client connection is not alive", + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + c := &Client{ + log: logrus.New(), + } + c.keepAlive.Store(tt.args.keepAlive) + c.keepAliveTimeout.Store(tt.args.keepAliveTimeout) + c.lastSeenSend.Store(tt.args.lastSeenSend) + c.lastSeenReceive.Store(tt.args.lastSeenReceive) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) + if tt.args.tuneMockStream != nil { + tt.args.tuneMockStream(stream) + } + err := c.sendKeepAlive(tt.args.ctx(), stream) + require.Equal(t, len(tt.wantErrMsg) > 0, err != nil, "error: %v", err) + if err != nil { + require.Equal(t, tt.wantErrMsg, err.Error(), "error: %v", err) + } + }) + } +} From 02aa5596ae10700f69149a49118b1c9027c44250 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 4 Oct 2024 13:34:27 +0200 Subject: [PATCH 3/3] fix tests --- internal/proxy/client_test.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index 30a5b01..f759da9 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -451,9 +451,9 @@ func TestClient_Run(t *testing.T) { ctx func() context.Context } tests := []struct { - name string - args args - wantErr bool + name string + args args + wantErrMsg string }{ { name: "context done", @@ -464,17 +464,17 @@ func TestClient_Run(t *testing.T) { return ctx }, }, - wantErr: true, + wantErrMsg: "context canceled", }, { - name: "get stream error", + name: "retrying get stream error till context exceeded", args: args{ ctx: func() context.Context { ctx, _ := context.WithTimeout(context.Background(), time.Second*2) return ctx }, }, - wantErr: true, + wantErrMsg: "context deadline exceeded", }, } for _, tt := range tests { @@ -485,8 +485,10 @@ func TestClient_Run(t *testing.T) { defer ctrl.Finish() c := New(nil, logrus.New(), "tt.fields.version", &config.Config{KeepAlive: time.Second, KeepAliveTimeout: time.Hour}) - if err := c.Run(tt.args.ctx()); (err != nil) != tt.wantErr { - t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) + err := c.Run(tt.args.ctx()) + require.Equal(t, len(tt.wantErrMsg) > 0, err != nil, "error: %v", err) + if err != nil { + require.Equal(t, tt.wantErrMsg, err.Error(), "error: %v", err) } }) }