From 10a5c654feabe9e74999be3aa28ad73c69ca9a41 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 13 Sep 2024 12:38:43 +0200 Subject: [PATCH 1/7] upd --- internal/proxy/client.go | 8 +- internal/proxy/client_test.go | 247 ++++++++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+), 2 deletions(-) diff --git a/internal/proxy/client.go b/internal/proxy/client.go index d6f6fcd..ea220aa 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -41,9 +41,9 @@ type Client struct { version string } -func New(executor CloudClient, logger *logrus.Logger, clusterID, version string) *Client { +func New(cloudClient CloudClient, logger *logrus.Logger, clusterID, version string) *Client { c := &Client{ - cloudClient: executor, + cloudClient: cloudClient, log: logger, clusterID: clusterID, version: version, @@ -123,6 +123,10 @@ func (c *Client) run(ctx context.Context, grpcClient cloudproxyv1alpha.CloudProx } func (c *Client) handleMessage(in *cloudproxyv1alpha.StreamCloudProxyResponse, stream StreamCloudProxyClient) { + if in == nil { + c.log.Error("nil message") + return + } c.processConfigurationRequest(in) // skip processing http request if keep alive message diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index c40d01d..85cf4ef 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -2,13 +2,19 @@ package proxy import ( "bytes" + mock_cloud "cloud-proxy/internal/proxy/mock" + cloudproxyv1alpha "cloud-proxy/proto/v1alpha" proto "cloud-proxy/proto/v1alpha" "context" + "fmt" + "github.com/golang/mock/gomock" "github.com/samber/lo" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "io" "net/http" "net/url" + "reflect" "testing" ) @@ -84,6 +90,7 @@ func TestClient_toResponse(t *testing.T) { func TestClient_toHTTPRequest(t *testing.T) { t.Parallel() + type args struct { req *proto.HTTPRequest } @@ -147,3 +154,243 @@ func TestClient_toHTTPRequest(t *testing.T) { }) } } + +func TestClient_handleMessage(t *testing.T) { + t.Parallel() + + type fields struct { + tuneMockCloudClient func(m *mock_cloud.MockCloudClient) + } + type args struct { + in *cloudproxyv1alpha.StreamCloudProxyResponse + tuneMockStream func(m *mock_cloud.MockStreamCloudProxyClient) + } + tests := []struct { + name string + fields fields + args args + wantLastSeenUpdated bool + wantKeepAlive int64 + wantKeepAliveTimeout int64 + wantErrCount int64 + }{ + { + name: "nil response", + wantLastSeenUpdated: false, + wantKeepAlive: int64(KeepAliveDefault), + wantKeepAliveTimeout: int64(KeepAliveTimeoutDefault), + }, + { + name: "keep alive", + args: args{ + in: &cloudproxyv1alpha.StreamCloudProxyResponse{ + MessageId: KeepAliveMessageID, + }, + }, + wantLastSeenUpdated: true, + wantKeepAlive: int64(KeepAliveDefault), + wantKeepAliveTimeout: int64(KeepAliveTimeoutDefault), + }, + { + name: "keep alive timeout and keepalive", + args: args{ + in: &cloudproxyv1alpha.StreamCloudProxyResponse{ + MessageId: KeepAliveMessageID, + ConfigurationRequest: &proto.ConfigurationRequest{ + KeepAlive: 1, + KeepAliveTimeout: 2, + }, + }, + }, + wantLastSeenUpdated: true, + wantKeepAlive: 1, + wantKeepAliveTimeout: 2, + }, + { + name: "http error, send error", + args: args{ + in: &cloudproxyv1alpha.StreamCloudProxyResponse{ + MessageId: "msgID", + HttpRequest: &proto.HTTPRequest{}, + }, + tuneMockStream: func(m *mock_cloud.MockStreamCloudProxyClient) { + m.EXPECT().Send(&cloudproxyv1alpha.StreamCloudProxyRequest{ + Request: &cloudproxyv1alpha.StreamCloudProxyRequest_Response{ + Response: &cloudproxyv1alpha.ClusterResponse{ + ClientMetadata: &cloudproxyv1alpha.ClientMetadata{ + ClusterId: "clusterID", + }, + MessageId: "msgID", + HttpResponse: &cloudproxyv1alpha.HTTPResponse{ + Error: lo.ToPtr("c.cloudClient.DoHTTPRequest: error"), + }, + }, + }, + }).Return(fmt.Errorf("error")) + }, + }, + fields: fields{ + tuneMockCloudClient: func(m *mock_cloud.MockCloudClient) { + m.EXPECT().DoHTTPRequest(gomock.Any()).Return(nil, fmt.Errorf("error")) + }, + }, + wantLastSeenUpdated: false, + wantKeepAlive: int64(KeepAliveDefault), + wantKeepAliveTimeout: int64(KeepAliveTimeoutDefault), + wantErrCount: 1, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + cloudClient := mock_cloud.NewMockCloudClient(ctrl) + if tt.fields.tuneMockCloudClient != nil { + tt.fields.tuneMockCloudClient(cloudClient) + } + c := New(cloudClient, logrus.New(), "clusterID", "version") + stream := mock_cloud.NewMockStreamCloudProxyClient(ctrl) + if tt.args.tuneMockStream != nil { + tt.args.tuneMockStream(stream) + } + + c.handleMessage(tt.args.in, stream) + require.Equal(t, tt.wantLastSeenUpdated, c.lastSeen.Load() > 0, "lastSeen: %v", c.lastSeen.Load()) + require.Equal(t, tt.wantKeepAlive, c.keepAlive.Load(), "keepAlive: %v", c.keepAlive.Load()) + require.Equal(t, tt.wantKeepAliveTimeout, c.keepAliveTimeout.Load(), "keepAliveTimeout: %v", c.keepAliveTimeout.Load()) + require.Equal(t, tt.wantErrCount, c.errCount.Load(), "errCount: %v", c.errCount.Load()) + }) + } +} + +func TestClient_processHttpRequest(t *testing.T) { + t.Parallel() + type fields struct { + tuneMockCloudClient func(m *mock_cloud.MockCloudClient) + } + type args struct { + req *cloudproxyv1alpha.HTTPRequest + } + tests := []struct { + name string + fields fields + args args + want *cloudproxyv1alpha.HTTPResponse + wantProcessCount int64 + }{ + { + name: "nil request", + want: &cloudproxyv1alpha.HTTPResponse{ + Error: lo.ToPtr("nil http request"), + }, + }, + { + name: "error creating http request", + args: args{ + req: &proto.HTTPRequest{ + Path: "\n\t\f", + }, + }, + want: &cloudproxyv1alpha.HTTPResponse{ + Error: lo.ToPtr("toHTTPRequest: http.NewRequest: error: parse \"\\n\\t\\f\": net/url: invalid control character in URL"), + }, + }, + { + name: "cloud client error", + args: args{ + req: &proto.HTTPRequest{}, + }, + fields: fields{ + tuneMockCloudClient: func(m *mock_cloud.MockCloudClient) { + m.EXPECT().DoHTTPRequest(gomock.Any()).Return(nil, fmt.Errorf("error")) + }, + }, + want: &cloudproxyv1alpha.HTTPResponse{ + Error: lo.ToPtr("c.cloudClient.DoHTTPRequest: error"), + }, + }, + { + name: "success", + args: args{ + req: &proto.HTTPRequest{}, + }, + fields: fields{ + tuneMockCloudClient: func(m *mock_cloud.MockCloudClient) { + m.EXPECT().DoHTTPRequest(gomock.Any()).Return(&http.Response{}, nil) + }, + }, + want: &cloudproxyv1alpha.HTTPResponse{}, + wantProcessCount: 1, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + cloudClient := mock_cloud.NewMockCloudClient(ctrl) + if tt.fields.tuneMockCloudClient != nil { + tt.fields.tuneMockCloudClient(cloudClient) + } + c := New(cloudClient, logrus.New(), "clusterID", "version") + if got := c.processHttpRequest(tt.args.req); !reflect.DeepEqual(got, tt.want) { + t.Errorf("processHttpRequest() = %v, want %v", got, tt.want) + } + require.Equal(t, tt.wantProcessCount, c.processedCount.Load(), "processedCount: %v", c.processedCount.Load()) + }) + } +} + +func TestClient_sendKeepAlive(t *testing.T) { + t.Parallel() + + type args struct { + ctx func() context.Context + tuneMockStream func(m *mock_cloud.MockStreamCloudProxyClient) + keepAlive int64 + } + tests := []struct { + name string + args args + }{ + { + name: "end of ticker", + args: args{ + ctx: func() context.Context { + return context.Background() + }, + keepAlive: 0, + }, + }, + { + name: "context done", + args: args{ + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + c := New(nil, logrus.New(), "clusterID", "version") + c.keepAlive.Store(tt.args.keepAlive) + + stream := mock_cloud.NewMockStreamCloudProxyClient(ctrl) + if tt.args.tuneMockStream != nil { + tt.args.tuneMockStream(stream) + } + c.sendKeepAlive(tt.args.ctx(), stream) + }) + } +} From cfc3d54182750613e9aa4705de29297fd3826b33 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 13 Sep 2024 15:50:02 +0200 Subject: [PATCH 2/7] upd --- internal/proxy/client_test.go | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index 85cf4ef..529e79b 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -16,6 +16,7 @@ import ( "net/url" "reflect" "testing" + "time" ) type mockReadCloserErr struct{} @@ -348,13 +349,15 @@ func TestClient_sendKeepAlive(t *testing.T) { t.Parallel() type args struct { - ctx func() context.Context - tuneMockStream func(m *mock_cloud.MockStreamCloudProxyClient) - keepAlive int64 + ctx func() context.Context + tuneMockStream func(m *mock_cloud.MockStreamCloudProxyClient) + keepAlive int64 + keepAliveTimeout int64 } tests := []struct { - name string - args args + name string + args args + isLastSeenZero bool }{ { name: "end of ticker", @@ -375,6 +378,20 @@ func TestClient_sendKeepAlive(t *testing.T) { }, }, }, + { + name: "send returned error, should exit", + args: args{ + ctx: func() context.Context { + return context.Background() + }, + tuneMockStream: func(m *mock_cloud.MockStreamCloudProxyClient) { + m.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("error")) + }, + keepAlive: int64(time.Second), + keepAliveTimeout: int64(10 * time.Minute), + }, + isLastSeenZero: true, + }, } for _, tt := range tests { tt := tt @@ -385,12 +402,16 @@ func TestClient_sendKeepAlive(t *testing.T) { c := New(nil, logrus.New(), "clusterID", "version") c.keepAlive.Store(tt.args.keepAlive) + c.keepAliveTimeout.Store(tt.args.keepAliveTimeout) stream := mock_cloud.NewMockStreamCloudProxyClient(ctrl) if tt.args.tuneMockStream != nil { tt.args.tuneMockStream(stream) } + c.lastSeen.Store(time.Now().UnixNano()) + c.sendKeepAlive(tt.args.ctx(), stream) + require.Equal(t, tt.isLastSeenZero, c.lastSeen.Load() == 0, "lastSeen: %v", c.lastSeen.Load()) }) } } From ed52617d8a075b258a1501ea6df769e2db84344b Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 13 Sep 2024 15:54:52 +0200 Subject: [PATCH 3/7] upd --- internal/proxy/client_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index 529e79b..7e4fc3a 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -376,6 +376,8 @@ func TestClient_sendKeepAlive(t *testing.T) { cancel() return ctx }, + keepAlive: int64(time.Second), + keepAliveTimeout: int64(10 * time.Minute), }, }, { From 88a7d10739a4c730c5d6cd8d14bb46aa2480d9b8 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 13 Sep 2024 18:36:55 +0200 Subject: [PATCH 4/7] upd --- Makefile | 5 +- cmd/proxy/main.go | 5 +- e2e/setup.go | 14 +- go.mod | 3 + internal/castai/dummy/mock_cast.go | 16 +-- internal/e2etest/dispatcher.go | 20 +-- internal/e2etest/roundtripper.go | 10 +- internal/proxy/client.go | 86 +++++++----- internal/proxy/client_test.go | 131 +++++++++++++----- internal/proxy/mock/{cloud.go => client.go} | 14 +- internal/proxy/mock/stream.go | 78 +++++------ proto/{ => gen/proto}/v1alpha/proxy.pb.go | 9 +- .../{ => gen/proto}/v1alpha/proxy_grpc.pb.go | 0 proto/v1alpha/proxy.proto | 2 +- 14 files changed, 238 insertions(+), 155 deletions(-) rename internal/proxy/mock/{cloud.go => client.go} (72%) rename proto/{ => gen/proto}/v1alpha/proxy.pb.go (99%) rename proto/{ => gen/proto}/v1alpha/proxy_grpc.pb.go (100%) diff --git a/Makefile b/Makefile index 958317b..5f706cf 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,9 @@ deploy: build push .PHONY: deploy generate-grpc: - protoc proto/v1alpha/proxy.proto --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. + mkdir -p proto/gen + protoc proto/v1alpha/proxy.proto \ + --go_out=proto/gen --go_opt paths=source_relative \ + --go-grpc_out=proto/gen --go-grpc_opt paths=source_relative .PHONY: generate-grpc diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index d56e2fb..1c92733 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -3,7 +3,6 @@ package main import ( "cloud-proxy/internal/cloud/gcp" "cloud-proxy/internal/cloud/gcp/gcpauth" - proto "cloud-proxy/proto/v1alpha" "context" "fmt" "net/http" @@ -79,8 +78,8 @@ func main() { "authorization", fmt.Sprintf("Token %s", cfg.CastAI.ApiKey), )) - client := proxy.New(gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger, cfg.ClusterID, GetVersion()) - err = client.Run(ctx, proto.NewCloudProxyAPIClient(conn)) + client := proxy.New(conn, gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger, cfg.ClusterID, GetVersion()) + err = client.Run(ctx) if err != nil { logger.Panicf("Failed to run client: %v", err) } diff --git a/e2e/setup.go b/e2e/setup.go index 87f1264..1d3da01 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -9,9 +9,9 @@ import ( "net/http" "cloud-proxy/internal/e2etest" - proto "cloud-proxy/proto/v1alpha" compute "cloud.google.com/go/compute/apiv1" container "cloud.google.com/go/container/apiv1" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "golang.org/x/sync/errgroup" "google.golang.org/api/option" "google.golang.org/grpc" @@ -24,7 +24,7 @@ type Server interface { } type TestSetup struct { - proto.UnimplementedCloudProxyAPIServer + cloudproxyv1alpha.UnimplementedCloudProxyAPIServer result bool @@ -32,14 +32,14 @@ type TestSetup struct { dispatcher *e2etest.Dispatcher roundTripper *e2etest.HttpOverGrpcRoundTripper - requestChan chan *proto.StreamCloudProxyResponse - responseChan chan *proto.StreamCloudProxyRequest + requestChan chan *cloudproxyv1alpha.StreamCloudProxyResponse + responseChan chan *cloudproxyv1alpha.StreamCloudProxyRequest logger *log.Logger } func NewTestSetup(logger *log.Logger) *TestSetup { - requestChan, respChan := make(chan *proto.StreamCloudProxyResponse), make(chan *proto.StreamCloudProxyRequest) + requestChan, respChan := make(chan *cloudproxyv1alpha.StreamCloudProxyResponse), make(chan *cloudproxyv1alpha.StreamCloudProxyRequest) dispatcher := e2etest.NewDispatcher(requestChan, respChan, logger) roundTrip := e2etest.NewHttpOverGrpcRoundTripper(dispatcher, logger) @@ -62,7 +62,7 @@ func (srv *TestSetup) StartServer() error { } srv.grpcServer = grpc.NewServer() - proto.RegisterCloudProxyAPIServer(srv.grpcServer, srv) + cloudproxyv1alpha.RegisterCloudProxyAPIServer(srv.grpcServer, srv) go func() { if err := srv.grpcServer.Serve(list); err != nil { @@ -81,7 +81,7 @@ func (srv *TestSetup) GracefulStopServer() { srv.grpcServer.Stop() } -func (srv *TestSetup) StreamCloudProxy(stream proto.CloudProxyAPI_StreamCloudProxyServer) error { +func (srv *TestSetup) StreamCloudProxy(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyServer) error { srv.logger.Println("Received a proxy connection from client") //md, ok := metadata.FromIncomingContext(stream.Context()) diff --git a/go.mod b/go.mod index ffe942a..4544eee 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.1 require ( cloud.google.com/go/compute v1.28.0 cloud.google.com/go/container v1.39.0 + github.com/castai/cloud-proxy v0.0.0-00010101000000-000000000000 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 github.com/samber/lo v1.47.0 @@ -63,3 +64,5 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/castai/cloud-proxy => ./ diff --git a/internal/castai/dummy/mock_cast.go b/internal/castai/dummy/mock_cast.go index c79ebfe..ec1989b 100644 --- a/internal/castai/dummy/mock_cast.go +++ b/internal/castai/dummy/mock_cast.go @@ -1,8 +1,8 @@ package dummy import ( - proto "cloud-proxy/proto/v1alpha" "fmt" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "io" "log" "net" @@ -25,7 +25,7 @@ type MockCast struct { func (mc *MockCast) Run() error { logger := log.New(os.Stderr, "[CAST-MOCK] ", log.LstdFlags) - requestChan, respChan := make(chan *proto.StreamCloudProxyResponse), make(chan *proto.StreamCloudProxyRequest) + requestChan, respChan := make(chan *cloudproxyv1alpha.StreamCloudProxyResponse), make(chan *cloudproxyv1alpha.StreamCloudProxyRequest) // Start the mock server listener, err := net.Listen("tcp", ":50051") @@ -34,7 +34,7 @@ func (mc *MockCast) Run() error { } grpcServer := grpc.NewServer() - proto.RegisterCloudProxyAPIServer(grpcServer, NewMockCastServer(requestChan, respChan, logger)) + cloudproxyv1alpha.RegisterCloudProxyAPIServer(grpcServer, NewMockCastServer(requestChan, respChan, logger)) dispatcher := e2etest.NewDispatcher(requestChan, respChan, logger) @@ -68,15 +68,15 @@ func (mc *MockCast) Run() error { } type MockCastServer struct { - proto.UnimplementedCloudProxyAPIServer + cloudproxyv1alpha.UnimplementedCloudProxyAPIServer - requestChan <-chan *proto.StreamCloudProxyResponse - responseChan chan<- *proto.StreamCloudProxyRequest + requestChan <-chan *cloudproxyv1alpha.StreamCloudProxyResponse + responseChan chan<- *cloudproxyv1alpha.StreamCloudProxyRequest logger *log.Logger } -func NewMockCastServer(requestChan <-chan *proto.StreamCloudProxyResponse, responseChan chan<- *proto.StreamCloudProxyRequest, logger *log.Logger) *MockCastServer { +func NewMockCastServer(requestChan <-chan *cloudproxyv1alpha.StreamCloudProxyResponse, responseChan chan<- *cloudproxyv1alpha.StreamCloudProxyRequest, logger *log.Logger) *MockCastServer { return &MockCastServer{ requestChan: requestChan, responseChan: responseChan, @@ -84,7 +84,7 @@ func NewMockCastServer(requestChan <-chan *proto.StreamCloudProxyResponse, respo } } -func (msrv *MockCastServer) Proxy(stream proto.CloudProxyAPI_StreamCloudProxyServer) error { +func (msrv *MockCastServer) Proxy(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyServer) error { msrv.logger.Println("Received a proxy connection from client") var eg errgroup.Group diff --git a/internal/e2etest/dispatcher.go b/internal/e2etest/dispatcher.go index 088c42c..a0cdba4 100644 --- a/internal/e2etest/dispatcher.go +++ b/internal/e2etest/dispatcher.go @@ -4,22 +4,22 @@ import ( "log" "sync" - proto "cloud-proxy/proto/v1alpha" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" ) type Dispatcher struct { - pendingRequests map[string]chan *proto.StreamCloudProxyRequest + pendingRequests map[string]chan *cloudproxyv1alpha.StreamCloudProxyRequest locker sync.Mutex - proxyRequestChan chan<- *proto.StreamCloudProxyResponse - proxyResponseChan <-chan *proto.StreamCloudProxyRequest + proxyRequestChan chan<- *cloudproxyv1alpha.StreamCloudProxyResponse + proxyResponseChan <-chan *cloudproxyv1alpha.StreamCloudProxyRequest logger *log.Logger } -func NewDispatcher(requestChan chan<- *proto.StreamCloudProxyResponse, responseChan <-chan *proto.StreamCloudProxyRequest, logger *log.Logger) *Dispatcher { +func NewDispatcher(requestChan chan<- *cloudproxyv1alpha.StreamCloudProxyResponse, responseChan <-chan *cloudproxyv1alpha.StreamCloudProxyRequest, logger *log.Logger) *Dispatcher { return &Dispatcher{ - pendingRequests: make(map[string]chan *proto.StreamCloudProxyRequest), + pendingRequests: make(map[string]chan *cloudproxyv1alpha.StreamCloudProxyRequest), locker: sync.Mutex{}, proxyRequestChan: requestChan, proxyResponseChan: responseChan, @@ -40,21 +40,21 @@ func (d *Dispatcher) Run() { }() } -func (d *Dispatcher) SendRequest(req *proto.StreamCloudProxyResponse) (<-chan *proto.StreamCloudProxyRequest, error) { +func (d *Dispatcher) SendRequest(req *cloudproxyv1alpha.StreamCloudProxyResponse) (<-chan *cloudproxyv1alpha.StreamCloudProxyRequest, error) { waiter := d.addRequestToWaitingList(req.MessageId) d.proxyRequestChan <- req return waiter, nil } -func (d *Dispatcher) addRequestToWaitingList(requestID string) <-chan *proto.StreamCloudProxyRequest { - waiter := make(chan *proto.StreamCloudProxyRequest, 1) +func (d *Dispatcher) addRequestToWaitingList(requestID string) <-chan *cloudproxyv1alpha.StreamCloudProxyRequest { + waiter := make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 1) d.locker.Lock() d.pendingRequests[requestID] = waiter d.locker.Unlock() return waiter } -func (d *Dispatcher) findWaiterForResponse(requestID string) chan *proto.StreamCloudProxyRequest { +func (d *Dispatcher) findWaiterForResponse(requestID string) chan *cloudproxyv1alpha.StreamCloudProxyRequest { d.locker.Lock() val, ok := d.pendingRequests[requestID] if !ok { diff --git a/internal/e2etest/roundtripper.go b/internal/e2etest/roundtripper.go index 03fef86..895584d 100644 --- a/internal/e2etest/roundtripper.go +++ b/internal/e2etest/roundtripper.go @@ -2,8 +2,8 @@ package e2etest import ( "bytes" - proto "cloud-proxy/proto/v1alpha" "fmt" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "io" "log" "net/http" @@ -24,14 +24,14 @@ func NewHttpOverGrpcRoundTripper(dispatcher *Dispatcher, logger *log.Logger) *Ht func (p *HttpOverGrpcRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) { requestID := uuid.New().String() - headers := make(map[string]*proto.HeaderValue) + headers := make(map[string]*cloudproxyv1alpha.HeaderValue) for h, v := range request.Header { - headers[h] = &proto.HeaderValue{Value: v} + headers[h] = &cloudproxyv1alpha.HeaderValue{Value: v} } - protoReq := &proto.StreamCloudProxyResponse{ + protoReq := &cloudproxyv1alpha.StreamCloudProxyResponse{ MessageId: requestID, - HttpRequest: &proto.HTTPRequest{ + HttpRequest: &cloudproxyv1alpha.HTTPRequest{ Method: request.Method, Path: request.URL.String(), Headers: headers, diff --git a/internal/proxy/client.go b/internal/proxy/client.go index ea220aa..f4475a7 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,15 +1,16 @@ -//go:generate mockgen -destination ./mock/cloud.go -package=mock_cloud cloud-proxy/internal/proxy CloudClient -//go:generate mockgen -destination ./mock/stream.go -package=mock_cloud cloud-proxy/internal/proxy StreamCloudProxyClient +//go:generate mockgen -package=mock_proxy -source $GOFILE -destination mock/$GOFILE . +//go:generate mockgen -package=mock_proxy -destination mock/stream.go github.com/castai/cloud-proxy/proto/gen/proto/v1alpha CloudProxyAPI_StreamCloudProxyClient + package proxy import ( "bytes" - cloudproxyv1alpha "cloud-proxy/proto/v1alpha" - proto "cloud-proxy/proto/v1alpha" "context" "fmt" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "github.com/samber/lo" "github.com/sirupsen/logrus" + "google.golang.org/grpc" "io" "net/http" "sync/atomic" @@ -22,12 +23,12 @@ const ( KeepAliveTimeoutDefault = time.Minute ) -type StreamCloudProxyClient = cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient type CloudClient interface { DoHTTPRequest(request *http.Request) (*http.Response, error) } type Client struct { + grpcConn *grpc.ClientConn cloudClient CloudClient log *logrus.Logger clusterID string @@ -41,8 +42,9 @@ type Client struct { version string } -func New(cloudClient CloudClient, logger *logrus.Logger, clusterID, version string) *Client { +func New(grpcConn *grpc.ClientConn, cloudClient CloudClient, logger *logrus.Logger, clusterID, version string) *Client { c := &Client{ + grpcConn: grpcConn, cloudClient: cloudClient, log: logger, clusterID: clusterID, @@ -54,27 +56,43 @@ func New(cloudClient CloudClient, logger *logrus.Logger, clusterID, version stri return c } -func (c *Client) Run(ctx context.Context, grpcClient cloudproxyv1alpha.CloudProxyAPIClient) error { +func (c *Client) Run(ctx context.Context) error { for { if ctx.Err() != nil { return nil } - err := c.run(ctx, grpcClient) + stream, closeStream, err := c.getStream() + if err != nil { + c.log.Errorf("c.getStream: %v", err) + time.Sleep(time.Second) + continue + } + err = c.run(ctx, stream, closeStream) if err != nil { - c.log.Printf("c.run: %v", err) + c.log.Errorf("c.run: %v", err) } } } -func (c *Client) initializeStream(ctx context.Context, proxyCastAIClient cloudproxyv1alpha.CloudProxyAPIClient) (StreamCloudProxyClient, error) { - c.log.Println("Connecting to castai") - - stream, err := proxyCastAIClient.StreamCloudProxy(ctx) +func (c *Client) getStream() (cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, func(), error) { + apiClient := cloudproxyv1alpha.NewCloudProxyAPIClient(c.grpcConn) + stream, err := apiClient.StreamCloudProxy(context.Background()) if err != nil { - return nil, fmt.Errorf("proxyCastAIClient.StreamCloudProxy: %w", err) + return nil, nil, fmt.Errorf("proxyCastAIClient.StreamCloudProxy: %w", err) } - err = stream.Send(&cloudproxyv1alpha.StreamCloudProxyRequest{ + return stream, func() { + err := stream.CloseSend() + if err != nil { + c.log.Errorf("error closing stream %v", err) + } + }, nil +} + +func (c *Client) sendInitialRequest(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) error { + c.log.Info("Seding initial request to castai") + + err := stream.Send(&cloudproxyv1alpha.StreamCloudProxyRequest{ Request: &cloudproxyv1alpha.StreamCloudProxyRequest_InitialRequest{ InitialRequest: &cloudproxyv1alpha.InitialCloudProxyRequest{ ClientMetadata: &cloudproxyv1alpha.ClientMetadata{ @@ -85,31 +103,29 @@ func (c *Client) initializeStream(ctx context.Context, proxyCastAIClient cloudpr }, }) if err != nil { - return nil, fmt.Errorf("stream.Send: initial request %w", err) + return fmt.Errorf("stream.Send: initial request %w", err) } c.lastSeen.Store(time.Now().UnixNano()) - return stream, nil + return nil } -func (c *Client) run(ctx context.Context, grpcClient cloudproxyv1alpha.CloudProxyAPIClient) error { - stream, err := c.initializeStream(ctx, grpcClient) +func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, closeStream func()) error { + defer closeStream() + + err := c.sendInitialRequest(stream) if err != nil { return fmt.Errorf("c.Connect: %w", err) } - defer func() { - err := stream.CloseSend() - if err != nil { - c.log.Println("error closing stream", err) - } - }() - ctxWithCancel, cancel := context.WithCancel(ctx) defer cancel() go c.sendKeepAlive(ctxWithCancel, stream) for { + if ctx.Err() != nil { + return nil + } if !c.isAlive() { return fmt.Errorf("last seen too old, closing stream") } @@ -122,7 +138,7 @@ func (c *Client) run(ctx context.Context, grpcClient cloudproxyv1alpha.CloudProx } } -func (c *Client) handleMessage(in *cloudproxyv1alpha.StreamCloudProxyResponse, stream StreamCloudProxyClient) { +func (c *Client) handleMessage(in *cloudproxyv1alpha.StreamCloudProxyResponse, stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) { if in == nil { c.log.Error("nil message") return @@ -196,7 +212,7 @@ func (c *Client) isAlive() bool { return true } -func (c *Client) sendKeepAlive(ctx context.Context, stream StreamCloudProxyClient) { +func (c *Client) sendKeepAlive(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) { ticker := time.NewTimer(time.Duration(c.keepAlive.Load())) defer ticker.Stop() @@ -230,7 +246,7 @@ func (c *Client) sendKeepAlive(ctx context.Context, stream StreamCloudProxyClien var errBadRequest = fmt.Errorf("bad request") -func (c *Client) toHTTPRequest(req *proto.HTTPRequest) (*http.Request, error) { +func (c *Client) toHTTPRequest(req *cloudproxyv1alpha.HTTPRequest) (*http.Request, error) { if req == nil { return nil, fmt.Errorf("nil http request %w", errBadRequest) } @@ -249,17 +265,17 @@ func (c *Client) toHTTPRequest(req *proto.HTTPRequest) (*http.Request, error) { return reqHTTP, nil } -func (c *Client) toResponse(resp *http.Response) *proto.HTTPResponse { +func (c *Client) toResponse(resp *http.Response) *cloudproxyv1alpha.HTTPResponse { if resp == nil { - return &proto.HTTPResponse{ + return &cloudproxyv1alpha.HTTPResponse{ Error: lo.ToPtr("nil response"), } } - var headers map[string]*proto.HeaderValue + var headers map[string]*cloudproxyv1alpha.HeaderValue if resp.Header != nil { - headers = make(map[string]*proto.HeaderValue) + headers = make(map[string]*cloudproxyv1alpha.HeaderValue) for h, v := range resp.Header { - headers[h] = &proto.HeaderValue{Value: v} + headers[h] = &cloudproxyv1alpha.HeaderValue{Value: v} } } var bodyResp []byte @@ -273,7 +289,7 @@ func (c *Client) toResponse(resp *http.Response) *proto.HTTPResponse { } } - return &proto.HTTPResponse{ + return &cloudproxyv1alpha.HTTPResponse{ Body: bodyResp, Error: errMessage, Status: int32(resp.StatusCode), diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index 7e4fc3a..a8408b6 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -2,11 +2,10 @@ package proxy import ( "bytes" - mock_cloud "cloud-proxy/internal/proxy/mock" - cloudproxyv1alpha "cloud-proxy/proto/v1alpha" - proto "cloud-proxy/proto/v1alpha" + mock_proxy "cloud-proxy/internal/proxy/mock" "context" "fmt" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "github.com/golang/mock/gomock" "github.com/samber/lo" "github.com/sirupsen/logrus" @@ -40,11 +39,11 @@ func TestClient_toResponse(t *testing.T) { name string fields fields args args - want *proto.HTTPResponse + want *cloudproxyv1alpha.HTTPResponse }{ { name: "nil response", - want: &proto.HTTPResponse{ + want: &cloudproxyv1alpha.HTTPResponse{ Error: lo.ToPtr("nil response"), }, }, @@ -55,7 +54,7 @@ func TestClient_toResponse(t *testing.T) { Body: &mockReadCloserErr{}, }, }, - want: &proto.HTTPResponse{ + want: &cloudproxyv1alpha.HTTPResponse{ Error: lo.ToPtr("io.ReadAll: body for error: unexpected EOF"), }, }, @@ -69,10 +68,10 @@ func TestClient_toResponse(t *testing.T) { Body: io.NopCloser(bytes.NewReader([]byte("body"))), }, }, - want: &proto.HTTPResponse{ + want: &cloudproxyv1alpha.HTTPResponse{ Body: []byte("body"), Status: 200, - Headers: map[string]*proto.HeaderValue{"header": {Value: []string{"value"}}}, + Headers: map[string]*cloudproxyv1alpha.HeaderValue{"header": {Value: []string{"value"}}}, }, }, } @@ -80,7 +79,7 @@ func TestClient_toResponse(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - c := New(nil, nil, "clusterID", "version") + c := New(nil, nil, nil, "clusterID", "version") got := c.toResponse(tt.args.resp) //diff := cmp.Diff(got, tt.want, protocmp.Transform()) //require.Empty(t, diff) @@ -93,7 +92,7 @@ func TestClient_toHTTPRequest(t *testing.T) { t.Parallel() type args struct { - req *proto.HTTPRequest + req *cloudproxyv1alpha.HTTPRequest } tests := []struct { name string @@ -108,7 +107,7 @@ func TestClient_toHTTPRequest(t *testing.T) { { name: "error creating http request", args: args{ - req: &proto.HTTPRequest{ + req: &cloudproxyv1alpha.HTTPRequest{ Path: "\n\t\f", }, }, @@ -117,9 +116,9 @@ func TestClient_toHTTPRequest(t *testing.T) { { name: "success", args: args{ - req: &proto.HTTPRequest{ + req: &cloudproxyv1alpha.HTTPRequest{ Method: "GET", - Headers: map[string]*proto.HeaderValue{ + Headers: map[string]*cloudproxyv1alpha.HeaderValue{ "header": {Value: []string{"value"}}, }, Body: []byte("body"), @@ -143,7 +142,7 @@ func TestClient_toHTTPRequest(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - c := New(nil, nil, "clusterID", "version") + c := New(nil, nil, nil, "clusterID", "version") got, err := c.toHTTPRequest(tt.args.req) require.Equal(t, tt.wantErr, err != nil, err) if err != nil { @@ -160,11 +159,11 @@ func TestClient_handleMessage(t *testing.T) { t.Parallel() type fields struct { - tuneMockCloudClient func(m *mock_cloud.MockCloudClient) + tuneMockCloudClient func(m *mock_proxy.MockCloudClient) } type args struct { in *cloudproxyv1alpha.StreamCloudProxyResponse - tuneMockStream func(m *mock_cloud.MockStreamCloudProxyClient) + tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) } tests := []struct { name string @@ -197,7 +196,7 @@ func TestClient_handleMessage(t *testing.T) { args: args{ in: &cloudproxyv1alpha.StreamCloudProxyResponse{ MessageId: KeepAliveMessageID, - ConfigurationRequest: &proto.ConfigurationRequest{ + ConfigurationRequest: &cloudproxyv1alpha.ConfigurationRequest{ KeepAlive: 1, KeepAliveTimeout: 2, }, @@ -212,9 +211,9 @@ func TestClient_handleMessage(t *testing.T) { args: args{ in: &cloudproxyv1alpha.StreamCloudProxyResponse{ MessageId: "msgID", - HttpRequest: &proto.HTTPRequest{}, + HttpRequest: &cloudproxyv1alpha.HTTPRequest{}, }, - tuneMockStream: func(m *mock_cloud.MockStreamCloudProxyClient) { + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { m.EXPECT().Send(&cloudproxyv1alpha.StreamCloudProxyRequest{ Request: &cloudproxyv1alpha.StreamCloudProxyRequest_Response{ Response: &cloudproxyv1alpha.ClusterResponse{ @@ -231,7 +230,7 @@ func TestClient_handleMessage(t *testing.T) { }, }, fields: fields{ - tuneMockCloudClient: func(m *mock_cloud.MockCloudClient) { + tuneMockCloudClient: func(m *mock_proxy.MockCloudClient) { m.EXPECT().DoHTTPRequest(gomock.Any()).Return(nil, fmt.Errorf("error")) }, }, @@ -247,12 +246,12 @@ func TestClient_handleMessage(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) defer ctrl.Finish() - cloudClient := mock_cloud.NewMockCloudClient(ctrl) + cloudClient := mock_proxy.NewMockCloudClient(ctrl) if tt.fields.tuneMockCloudClient != nil { tt.fields.tuneMockCloudClient(cloudClient) } - c := New(cloudClient, logrus.New(), "clusterID", "version") - stream := mock_cloud.NewMockStreamCloudProxyClient(ctrl) + c := New(nil, cloudClient, logrus.New(), "clusterID", "version") + stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) if tt.args.tuneMockStream != nil { tt.args.tuneMockStream(stream) } @@ -269,7 +268,7 @@ func TestClient_handleMessage(t *testing.T) { func TestClient_processHttpRequest(t *testing.T) { t.Parallel() type fields struct { - tuneMockCloudClient func(m *mock_cloud.MockCloudClient) + tuneMockCloudClient func(m *mock_proxy.MockCloudClient) } type args struct { req *cloudproxyv1alpha.HTTPRequest @@ -290,7 +289,7 @@ func TestClient_processHttpRequest(t *testing.T) { { name: "error creating http request", args: args{ - req: &proto.HTTPRequest{ + req: &cloudproxyv1alpha.HTTPRequest{ Path: "\n\t\f", }, }, @@ -301,10 +300,10 @@ func TestClient_processHttpRequest(t *testing.T) { { name: "cloud client error", args: args{ - req: &proto.HTTPRequest{}, + req: &cloudproxyv1alpha.HTTPRequest{}, }, fields: fields{ - tuneMockCloudClient: func(m *mock_cloud.MockCloudClient) { + tuneMockCloudClient: func(m *mock_proxy.MockCloudClient) { m.EXPECT().DoHTTPRequest(gomock.Any()).Return(nil, fmt.Errorf("error")) }, }, @@ -315,10 +314,10 @@ func TestClient_processHttpRequest(t *testing.T) { { name: "success", args: args{ - req: &proto.HTTPRequest{}, + req: &cloudproxyv1alpha.HTTPRequest{}, }, fields: fields{ - tuneMockCloudClient: func(m *mock_cloud.MockCloudClient) { + tuneMockCloudClient: func(m *mock_proxy.MockCloudClient) { m.EXPECT().DoHTTPRequest(gomock.Any()).Return(&http.Response{}, nil) }, }, @@ -332,11 +331,11 @@ func TestClient_processHttpRequest(t *testing.T) { t.Parallel() ctrl := gomock.NewController(t) defer ctrl.Finish() - cloudClient := mock_cloud.NewMockCloudClient(ctrl) + cloudClient := mock_proxy.NewMockCloudClient(ctrl) if tt.fields.tuneMockCloudClient != nil { tt.fields.tuneMockCloudClient(cloudClient) } - c := New(cloudClient, logrus.New(), "clusterID", "version") + c := New(nil, cloudClient, logrus.New(), "clusterID", "version") if got := c.processHttpRequest(tt.args.req); !reflect.DeepEqual(got, tt.want) { t.Errorf("processHttpRequest() = %v, want %v", got, tt.want) } @@ -350,7 +349,7 @@ func TestClient_sendKeepAlive(t *testing.T) { type args struct { ctx func() context.Context - tuneMockStream func(m *mock_cloud.MockStreamCloudProxyClient) + tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) keepAlive int64 keepAliveTimeout int64 } @@ -386,7 +385,7 @@ func TestClient_sendKeepAlive(t *testing.T) { ctx: func() context.Context { return context.Background() }, - tuneMockStream: func(m *mock_cloud.MockStreamCloudProxyClient) { + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { m.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("error")) }, keepAlive: int64(time.Second), @@ -402,11 +401,11 @@ func TestClient_sendKeepAlive(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - c := New(nil, logrus.New(), "clusterID", "version") + c := New(nil, nil, logrus.New(), "clusterID", "version") c.keepAlive.Store(tt.args.keepAlive) c.keepAliveTimeout.Store(tt.args.keepAliveTimeout) - stream := mock_cloud.NewMockStreamCloudProxyClient(ctrl) + stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) if tt.args.tuneMockStream != nil { tt.args.tuneMockStream(stream) } @@ -417,3 +416,65 @@ func TestClient_sendKeepAlive(t *testing.T) { }) } } + +func TestClient_run(t *testing.T) { + t.Parallel() + type fields struct { + } + type args struct { + ctx func() context.Context + tuneMockStream func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) + } + tests := []struct { + name string + fields fields + args args + wantErr bool + wantLastSeenUpdated bool + }{ + { + name: "send initial error", + args: args{ + ctx: func() context.Context { + return context.Background() + }, + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { + m.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("test error")) + }, + }, + wantErr: true, + }, + { + name: "context done", + args: args{ + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }, + tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) { + m.EXPECT().Send(gomock.Any()).Return(nil) + }, + }, + wantLastSeenUpdated: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + c := New(nil, nil, logrus.New(), "clusterID", "version") + stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl) + if tt.args.tuneMockStream != nil { + tt.args.tuneMockStream(stream) + } + if err := c.run(tt.args.ctx(), stream, func() {}); (err != nil) != tt.wantErr { + t.Errorf("run() error = %v, wantErr %v", err, tt.wantErr) + } + require.Equal(t, tt.wantLastSeenUpdated, c.lastSeen.Load() > 0, "lastSeen: %v", c.lastSeen.Load()) + }) + } +} diff --git a/internal/proxy/mock/cloud.go b/internal/proxy/mock/client.go similarity index 72% rename from internal/proxy/mock/cloud.go rename to internal/proxy/mock/client.go index fcacb6a..9e565cc 100644 --- a/internal/proxy/mock/cloud.go +++ b/internal/proxy/mock/client.go @@ -1,8 +1,8 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: cloud-proxy/internal/proxy (interfaces: CloudClient) +// Source: client.go -// Package mock_cloud is a generated GoMock package. -package mock_cloud +// Package mock_proxy is a generated GoMock package. +package mock_proxy import ( http "net/http" @@ -35,16 +35,16 @@ func (m *MockCloudClient) EXPECT() *MockCloudClientMockRecorder { } // DoHTTPRequest mocks base method. -func (m *MockCloudClient) DoHTTPRequest(arg0 *http.Request) (*http.Response, error) { +func (m *MockCloudClient) DoHTTPRequest(request *http.Request) (*http.Response, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DoHTTPRequest", arg0) + ret := m.ctrl.Call(m, "DoHTTPRequest", request) ret0, _ := ret[0].(*http.Response) ret1, _ := ret[1].(error) return ret0, ret1 } // DoHTTPRequest indicates an expected call of DoHTTPRequest. -func (mr *MockCloudClientMockRecorder) DoHTTPRequest(arg0 interface{}) *gomock.Call { +func (mr *MockCloudClientMockRecorder) DoHTTPRequest(request interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoHTTPRequest", reflect.TypeOf((*MockCloudClient)(nil).DoHTTPRequest), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoHTTPRequest", reflect.TypeOf((*MockCloudClient)(nil).DoHTTPRequest), request) } diff --git a/internal/proxy/mock/stream.go b/internal/proxy/mock/stream.go index 6a4836f..11f66ed 100644 --- a/internal/proxy/mock/stream.go +++ b/internal/proxy/mock/stream.go @@ -1,43 +1,43 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: cloud-proxy/internal/proxy (interfaces: StreamCloudProxyClient) +// Source: github.com/castai/cloud-proxy/proto/gen/proto/v1alpha (interfaces: CloudProxyAPI_StreamCloudProxyClient) -// Package mock_cloud is a generated GoMock package. -package mock_cloud +// Package mock_proxy is a generated GoMock package. +package mock_proxy import ( - cloudproxyv1alpha "cloud-proxy/proto/v1alpha" context "context" reflect "reflect" + cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" gomock "github.com/golang/mock/gomock" metadata "google.golang.org/grpc/metadata" ) -// MockStreamCloudProxyClient is a mock of StreamCloudProxyClient interface. -type MockStreamCloudProxyClient struct { +// MockCloudProxyAPI_StreamCloudProxyClient is a mock of CloudProxyAPI_StreamCloudProxyClient interface. +type MockCloudProxyAPI_StreamCloudProxyClient struct { ctrl *gomock.Controller - recorder *MockStreamCloudProxyClientMockRecorder + recorder *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder } -// MockStreamCloudProxyClientMockRecorder is the mock recorder for MockStreamCloudProxyClient. -type MockStreamCloudProxyClientMockRecorder struct { - mock *MockStreamCloudProxyClient +// MockCloudProxyAPI_StreamCloudProxyClientMockRecorder is the mock recorder for MockCloudProxyAPI_StreamCloudProxyClient. +type MockCloudProxyAPI_StreamCloudProxyClientMockRecorder struct { + mock *MockCloudProxyAPI_StreamCloudProxyClient } -// NewMockStreamCloudProxyClient creates a new mock instance. -func NewMockStreamCloudProxyClient(ctrl *gomock.Controller) *MockStreamCloudProxyClient { - mock := &MockStreamCloudProxyClient{ctrl: ctrl} - mock.recorder = &MockStreamCloudProxyClientMockRecorder{mock} +// NewMockCloudProxyAPI_StreamCloudProxyClient creates a new mock instance. +func NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl *gomock.Controller) *MockCloudProxyAPI_StreamCloudProxyClient { + mock := &MockCloudProxyAPI_StreamCloudProxyClient{ctrl: ctrl} + mock.recorder = &MockCloudProxyAPI_StreamCloudProxyClientMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockStreamCloudProxyClient) EXPECT() *MockStreamCloudProxyClientMockRecorder { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) EXPECT() *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder { return m.recorder } // CloseSend mocks base method. -func (m *MockStreamCloudProxyClient) CloseSend() error { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) CloseSend() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CloseSend") ret0, _ := ret[0].(error) @@ -45,13 +45,13 @@ func (m *MockStreamCloudProxyClient) CloseSend() error { } // CloseSend indicates an expected call of CloseSend. -func (mr *MockStreamCloudProxyClientMockRecorder) CloseSend() *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) CloseSend() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).CloseSend)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).CloseSend)) } // Context mocks base method. -func (m *MockStreamCloudProxyClient) Context() context.Context { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) Context() context.Context { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Context") ret0, _ := ret[0].(context.Context) @@ -59,13 +59,13 @@ func (m *MockStreamCloudProxyClient) Context() context.Context { } // Context indicates an expected call of Context. -func (mr *MockStreamCloudProxyClientMockRecorder) Context() *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) Context() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).Context)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).Context)) } // Header mocks base method. -func (m *MockStreamCloudProxyClient) Header() (metadata.MD, error) { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) Header() (metadata.MD, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Header") ret0, _ := ret[0].(metadata.MD) @@ -74,13 +74,13 @@ func (m *MockStreamCloudProxyClient) Header() (metadata.MD, error) { } // Header indicates an expected call of Header. -func (mr *MockStreamCloudProxyClientMockRecorder) Header() *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) Header() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).Header)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).Header)) } // Recv mocks base method. -func (m *MockStreamCloudProxyClient) Recv() (*cloudproxyv1alpha.StreamCloudProxyResponse, error) { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) Recv() (*cloudproxyv1alpha.StreamCloudProxyResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Recv") ret0, _ := ret[0].(*cloudproxyv1alpha.StreamCloudProxyResponse) @@ -89,13 +89,13 @@ func (m *MockStreamCloudProxyClient) Recv() (*cloudproxyv1alpha.StreamCloudProxy } // Recv indicates an expected call of Recv. -func (mr *MockStreamCloudProxyClientMockRecorder) Recv() *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) Recv() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).Recv)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).Recv)) } // RecvMsg mocks base method. -func (m *MockStreamCloudProxyClient) RecvMsg(arg0 interface{}) error { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) RecvMsg(arg0 interface{}) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RecvMsg", arg0) ret0, _ := ret[0].(error) @@ -103,13 +103,13 @@ func (m *MockStreamCloudProxyClient) RecvMsg(arg0 interface{}) error { } // RecvMsg indicates an expected call of RecvMsg. -func (mr *MockStreamCloudProxyClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).RecvMsg), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).RecvMsg), arg0) } // Send mocks base method. -func (m *MockStreamCloudProxyClient) Send(arg0 *cloudproxyv1alpha.StreamCloudProxyRequest) error { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) Send(arg0 *cloudproxyv1alpha.StreamCloudProxyRequest) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Send", arg0) ret0, _ := ret[0].(error) @@ -117,13 +117,13 @@ func (m *MockStreamCloudProxyClient) Send(arg0 *cloudproxyv1alpha.StreamCloudPro } // Send indicates an expected call of Send. -func (mr *MockStreamCloudProxyClientMockRecorder) Send(arg0 interface{}) *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) Send(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).Send), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).Send), arg0) } // SendMsg mocks base method. -func (m *MockStreamCloudProxyClient) SendMsg(arg0 interface{}) error { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) SendMsg(arg0 interface{}) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendMsg", arg0) ret0, _ := ret[0].(error) @@ -131,13 +131,13 @@ func (m *MockStreamCloudProxyClient) SendMsg(arg0 interface{}) error { } // SendMsg indicates an expected call of SendMsg. -func (mr *MockStreamCloudProxyClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).SendMsg), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).SendMsg), arg0) } // Trailer mocks base method. -func (m *MockStreamCloudProxyClient) Trailer() metadata.MD { +func (m *MockCloudProxyAPI_StreamCloudProxyClient) Trailer() metadata.MD { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Trailer") ret0, _ := ret[0].(metadata.MD) @@ -145,7 +145,7 @@ func (m *MockStreamCloudProxyClient) Trailer() metadata.MD { } // Trailer indicates an expected call of Trailer. -func (mr *MockStreamCloudProxyClientMockRecorder) Trailer() *gomock.Call { +func (mr *MockCloudProxyAPI_StreamCloudProxyClientMockRecorder) Trailer() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockStreamCloudProxyClient)(nil).Trailer)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockCloudProxyAPI_StreamCloudProxyClient)(nil).Trailer)) } diff --git a/proto/v1alpha/proxy.pb.go b/proto/gen/proto/v1alpha/proxy.pb.go similarity index 99% rename from proto/v1alpha/proxy.pb.go rename to proto/gen/proto/v1alpha/proxy.pb.go index d97649c..b8a0e45 100644 --- a/proto/v1alpha/proxy.pb.go +++ b/proto/gen/proto/v1alpha/proxy.pb.go @@ -1100,11 +1100,12 @@ var file_proto_v1alpha_proxy_proto_rawDesc = []byte{ 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0x3f, 0x5a, 0x3d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x49, 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x61, 0x73, 0x74, 0x61, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, - 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x61, 0x6c, - 0x70, 0x68, 0x61, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x76, 0x31, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x3b, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/v1alpha/proxy_grpc.pb.go b/proto/gen/proto/v1alpha/proxy_grpc.pb.go similarity index 100% rename from proto/v1alpha/proxy_grpc.pb.go rename to proto/gen/proto/v1alpha/proxy_grpc.pb.go diff --git a/proto/v1alpha/proxy.proto b/proto/v1alpha/proxy.proto index b52b3b5..a01db38 100644 --- a/proto/v1alpha/proxy.proto +++ b/proto/v1alpha/proxy.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package cloud.proxy.v1alpha; -option go_package = "github.com/castai/cloud-proxy/proto/v1alpha;cloudproxyv1alpha"; +option go_package = "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha;cloudproxyv1alpha"; // CloudProxyAPI provides the API for proxying cloud requests for CAST AI External Provisioner. service CloudProxyAPI { From 992b491ad5aa18c72e9b7a6f4a599169eae67f03 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Fri, 13 Sep 2024 18:41:23 +0200 Subject: [PATCH 5/7] add podname to proto --- proto/gen/proto/v1alpha/proxy.pb.go | 76 ++++++++++++++++++----------- proto/v1alpha/proxy.proto | 2 + 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/proto/gen/proto/v1alpha/proxy.pb.go b/proto/gen/proto/v1alpha/proxy.pb.go index b8a0e45..39cbe4a 100644 --- a/proto/gen/proto/v1alpha/proxy.pb.go +++ b/proto/gen/proto/v1alpha/proxy.pb.go @@ -863,6 +863,8 @@ type ClientMetadata struct { ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` OrganizationId *string `protobuf:"bytes,2,opt,name=organization_id,json=organizationId,proto3,oneof" json:"organization_id,omitempty"` + Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` + PodName string `protobuf:"bytes,4,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` } func (x *ClientMetadata) Reset() { @@ -911,6 +913,20 @@ func (x *ClientMetadata) GetOrganizationId() string { return "" } +func (x *ClientMetadata) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + +func (x *ClientMetadata) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + var File_proto_v1alpha_proxy_proto protoreflect.FileDescriptor var file_proto_v1alpha_proxy_proto_rawDesc = []byte{ @@ -1078,34 +1094,38 @@ var file_proto_v1alpha_proxy_proto_rawDesc = []byte{ 0x70, 0x68, 0x61, 0x2e, 0x48, 0x54, 0x54, 0x50, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x0c, 0x68, 0x74, 0x74, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x22, 0x71, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x0f, 0x6f, 0x72, 0x67, 0x61, 0x6e, 0x69, 0x7a, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, - 0x52, 0x0e, 0x6f, 0x72, 0x67, 0x61, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, - 0x88, 0x01, 0x01, 0x42, 0x12, 0x0a, 0x10, 0x5f, 0x6f, 0x72, 0x67, 0x61, 0x6e, 0x69, 0x7a, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x32, 0xea, 0x01, 0x0a, 0x0d, 0x43, 0x6c, 0x6f, 0x75, - 0x64, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x41, 0x50, 0x49, 0x12, 0x75, 0x0a, 0x10, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x2c, 0x2e, - 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, - 0x70, 0x68, 0x61, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x50, - 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, - 0x61, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x50, 0x72, 0x6f, - 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, - 0x12, 0x62, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, - 0x27, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x50, 0x72, 0x6f, 0x78, - 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, - 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, - 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0x49, 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x63, 0x61, 0x73, 0x74, 0x61, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, - 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x3b, 0x63, 0x6c, - 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x72, 0x6f, 0x72, 0x22, 0xaa, 0x01, 0x0a, 0x0e, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, + 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2c, 0x0a, 0x0f, 0x6f, 0x72, 0x67, 0x61, 0x6e, 0x69, + 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, + 0x00, 0x52, 0x0e, 0x6f, 0x72, 0x67, 0x61, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, + 0x64, 0x88, 0x01, 0x01, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, + 0x63, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x12, 0x0a, + 0x10, 0x5f, 0x6f, 0x72, 0x67, 0x61, 0x6e, 0x69, 0x7a, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, + 0x64, 0x32, 0xea, 0x01, 0x0a, 0x0d, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x50, 0x72, 0x6f, 0x78, 0x79, + 0x41, 0x50, 0x49, 0x12, 0x75, 0x0a, 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x6f, + 0x75, 0x64, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x2c, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, + 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, + 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x62, 0x0a, 0x0b, 0x53, 0x65, + 0x6e, 0x64, 0x54, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x27, 0x2e, 0x63, 0x6c, 0x6f, 0x75, + 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, + 0x53, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x50, + 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x49, + 0x5a, 0x47, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x61, 0x73, + 0x74, 0x61, 0x69, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x2d, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x3b, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x70, 0x72, 0x6f, + 0x78, 0x79, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/proto/v1alpha/proxy.proto b/proto/v1alpha/proxy.proto index a01db38..ea53657 100644 --- a/proto/v1alpha/proxy.proto +++ b/proto/v1alpha/proxy.proto @@ -110,4 +110,6 @@ message SendToProxyResponse { message ClientMetadata { string cluster_id = 1; optional string organization_id = 2; + string namespace = 3; + string pod_name = 4; } From 379475ee10f0751e14a530ee47823aa1a7abe60d Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Mon, 16 Sep 2024 10:28:22 +0200 Subject: [PATCH 6/7] upd --- cmd/proxy/main.go | 2 +- internal/proxy/client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 1c92733..e9ac15c 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -81,7 +81,7 @@ func main() { client := proxy.New(conn, gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger, cfg.ClusterID, GetVersion()) err = client.Run(ctx) if err != nil { - logger.Panicf("Failed to run client: %v", err) + logger.Errorf("Failed to run client: %v", err) } } diff --git a/internal/proxy/client.go b/internal/proxy/client.go index f4475a7..98360b4 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -124,7 +124,7 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI for { if ctx.Err() != nil { - return nil + return ctx.Err() } if !c.isAlive() { return fmt.Errorf("last seen too old, closing stream") From fda889052f21e8d30346e874d0ae3ebee2e71f26 Mon Sep 17 00:00:00 2001 From: Valentyna Bukhalova Date: Mon, 16 Sep 2024 10:34:54 +0200 Subject: [PATCH 7/7] upd --- e2e/setup.go | 2 +- go.mod | 3 --- internal/castai/dummy/mock_cast.go | 2 +- internal/e2etest/dispatcher.go | 2 +- internal/e2etest/roundtripper.go | 2 +- internal/proxy/client.go | 4 ++-- internal/proxy/client_test.go | 3 ++- internal/proxy/mock/stream.go | 4 ++-- 8 files changed, 10 insertions(+), 12 deletions(-) diff --git a/e2e/setup.go b/e2e/setup.go index 1d3da01..1ac94de 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -9,9 +9,9 @@ import ( "net/http" "cloud-proxy/internal/e2etest" + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" compute "cloud.google.com/go/compute/apiv1" container "cloud.google.com/go/container/apiv1" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "golang.org/x/sync/errgroup" "google.golang.org/api/option" "google.golang.org/grpc" diff --git a/go.mod b/go.mod index 4544eee..ffe942a 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.23.1 require ( cloud.google.com/go/compute v1.28.0 cloud.google.com/go/container v1.39.0 - github.com/castai/cloud-proxy v0.0.0-00010101000000-000000000000 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 github.com/samber/lo v1.47.0 @@ -64,5 +63,3 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/castai/cloud-proxy => ./ diff --git a/internal/castai/dummy/mock_cast.go b/internal/castai/dummy/mock_cast.go index ec1989b..ff595fe 100644 --- a/internal/castai/dummy/mock_cast.go +++ b/internal/castai/dummy/mock_cast.go @@ -1,8 +1,8 @@ package dummy import ( + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" "fmt" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "io" "log" "net" diff --git a/internal/e2etest/dispatcher.go b/internal/e2etest/dispatcher.go index a0cdba4..df90caa 100644 --- a/internal/e2etest/dispatcher.go +++ b/internal/e2etest/dispatcher.go @@ -4,7 +4,7 @@ import ( "log" "sync" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" ) type Dispatcher struct { diff --git a/internal/e2etest/roundtripper.go b/internal/e2etest/roundtripper.go index 895584d..7914d77 100644 --- a/internal/e2etest/roundtripper.go +++ b/internal/e2etest/roundtripper.go @@ -2,8 +2,8 @@ package e2etest import ( "bytes" + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" "fmt" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "io" "log" "net/http" diff --git a/internal/proxy/client.go b/internal/proxy/client.go index 98360b4..6384cce 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,13 +1,13 @@ //go:generate mockgen -package=mock_proxy -source $GOFILE -destination mock/$GOFILE . -//go:generate mockgen -package=mock_proxy -destination mock/stream.go github.com/castai/cloud-proxy/proto/gen/proto/v1alpha CloudProxyAPI_StreamCloudProxyClient +//go:generate mockgen -package=mock_proxy -destination mock/stream.go cloud-proxy/proto/gen/proto/v1alpha CloudProxyAPI_StreamCloudProxyClient package proxy import ( "bytes" + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" "context" "fmt" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "github.com/samber/lo" "github.com/sirupsen/logrus" "google.golang.org/grpc" diff --git a/internal/proxy/client_test.go b/internal/proxy/client_test.go index a8408b6..3d573d3 100644 --- a/internal/proxy/client_test.go +++ b/internal/proxy/client_test.go @@ -3,9 +3,9 @@ package proxy import ( "bytes" mock_proxy "cloud-proxy/internal/proxy/mock" + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" "context" "fmt" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" "github.com/golang/mock/gomock" "github.com/samber/lo" "github.com/sirupsen/logrus" @@ -457,6 +457,7 @@ func TestClient_run(t *testing.T) { }, }, wantLastSeenUpdated: true, + wantErr: true, }, } for _, tt := range tests { diff --git a/internal/proxy/mock/stream.go b/internal/proxy/mock/stream.go index 11f66ed..b5f91f3 100644 --- a/internal/proxy/mock/stream.go +++ b/internal/proxy/mock/stream.go @@ -1,14 +1,14 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/castai/cloud-proxy/proto/gen/proto/v1alpha (interfaces: CloudProxyAPI_StreamCloudProxyClient) +// Source: cloud-proxy/proto/gen/proto/v1alpha (interfaces: CloudProxyAPI_StreamCloudProxyClient) // Package mock_proxy is a generated GoMock package. package mock_proxy import ( + cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha" context "context" reflect "reflect" - cloudproxyv1alpha "github.com/castai/cloud-proxy/proto/gen/proto/v1alpha" gomock "github.com/golang/mock/gomock" metadata "google.golang.org/grpc/metadata" )