Skip to content

Commit

Permalink
Merge pull request #10 from castai/tests
Browse files Browse the repository at this point in the history
rename proto package and unit tests
  • Loading branch information
ValyaB authored Sep 16, 2024
2 parents 0beddc5 + 5d67cb5 commit c33cc70
Show file tree
Hide file tree
Showing 13 changed files with 735 additions and 370 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ deploy: build push
.PHONY: deploy

generate-grpc:
protoc proto/v1alpha/proxy.proto --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:.
mkdir -p proto/gen
protoc proto/v1alpha/proxy.proto \
--go_out=proto/gen --go_opt paths=source_relative \
--go-grpc_out=proto/gen --go-grpc_opt paths=source_relative
.PHONY: generate-grpc

10 changes: 4 additions & 6 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package main

import (
"cloud-proxy/internal/cloud/gcp"
"cloud-proxy/internal/cloud/gcp/gcpauth"
"context"
"fmt"
"net/http"
"path"
"runtime"
"time"

"cloud-proxy/internal/cloud/gcp"
"cloud-proxy/internal/cloud/gcp/gcpauth"
proto "cloud-proxy/proto/v1alpha"

"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -92,8 +90,8 @@ func main() {
"authorization", fmt.Sprintf("Token %s", cfg.CastAI.ApiKey),
))

client := proxy.New(gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger, cfg.ClusterID, GetVersion())
err = client.Run(ctx, proto.NewCloudProxyAPIClient(conn))
client := proxy.New(conn, gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger, cfg.ClusterID, GetVersion())
err = client.Run(ctx)
if err != nil {
logger.Panicf("Failed to run client: %v", err)
panic(err)
Expand Down
14 changes: 7 additions & 7 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"net/http"

"cloud-proxy/internal/e2etest"
proto "cloud-proxy/proto/v1alpha"
cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha"
compute "cloud.google.com/go/compute/apiv1"
container "cloud.google.com/go/container/apiv1"
"golang.org/x/sync/errgroup"
Expand All @@ -24,22 +24,22 @@ type Server interface {
}

type TestSetup struct {
proto.UnimplementedCloudProxyAPIServer
cloudproxyv1alpha.UnimplementedCloudProxyAPIServer

result bool

grpcServer *grpc.Server
dispatcher *e2etest.Dispatcher
roundTripper *e2etest.HttpOverGrpcRoundTripper

requestChan chan *proto.StreamCloudProxyResponse
responseChan chan *proto.StreamCloudProxyRequest
requestChan chan *cloudproxyv1alpha.StreamCloudProxyResponse
responseChan chan *cloudproxyv1alpha.StreamCloudProxyRequest

logger *log.Logger
}

func NewTestSetup(logger *log.Logger) *TestSetup {
requestChan, respChan := make(chan *proto.StreamCloudProxyResponse), make(chan *proto.StreamCloudProxyRequest)
requestChan, respChan := make(chan *cloudproxyv1alpha.StreamCloudProxyResponse), make(chan *cloudproxyv1alpha.StreamCloudProxyRequest)
dispatcher := e2etest.NewDispatcher(requestChan, respChan, logger)
roundTrip := e2etest.NewHttpOverGrpcRoundTripper(dispatcher, logger)

Expand All @@ -62,7 +62,7 @@ func (srv *TestSetup) StartServer() error {
}

srv.grpcServer = grpc.NewServer()
proto.RegisterCloudProxyAPIServer(srv.grpcServer, srv)
cloudproxyv1alpha.RegisterCloudProxyAPIServer(srv.grpcServer, srv)

go func() {
if err := srv.grpcServer.Serve(list); err != nil {
Expand All @@ -81,7 +81,7 @@ func (srv *TestSetup) GracefulStopServer() {
srv.grpcServer.Stop()
}

func (srv *TestSetup) StreamCloudProxy(stream proto.CloudProxyAPI_StreamCloudProxyServer) error {
func (srv *TestSetup) StreamCloudProxy(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyServer) error {
srv.logger.Println("Received a proxy connection from client")

//md, ok := metadata.FromIncomingContext(stream.Context())
Expand Down
16 changes: 8 additions & 8 deletions internal/castai/dummy/mock_cast.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dummy

import (
proto "cloud-proxy/proto/v1alpha"
cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha"
"fmt"
"io"
"log"
Expand All @@ -25,7 +25,7 @@ type MockCast struct {
func (mc *MockCast) Run() error {
logger := log.New(os.Stderr, "[CAST-MOCK] ", log.LstdFlags)

requestChan, respChan := make(chan *proto.StreamCloudProxyResponse), make(chan *proto.StreamCloudProxyRequest)
requestChan, respChan := make(chan *cloudproxyv1alpha.StreamCloudProxyResponse), make(chan *cloudproxyv1alpha.StreamCloudProxyRequest)

// Start the mock server
listener, err := net.Listen("tcp", ":50051")
Expand All @@ -34,7 +34,7 @@ func (mc *MockCast) Run() error {
}

grpcServer := grpc.NewServer()
proto.RegisterCloudProxyAPIServer(grpcServer, NewMockCastServer(requestChan, respChan, logger))
cloudproxyv1alpha.RegisterCloudProxyAPIServer(grpcServer, NewMockCastServer(requestChan, respChan, logger))

dispatcher := e2etest.NewDispatcher(requestChan, respChan, logger)

Expand Down Expand Up @@ -68,23 +68,23 @@ func (mc *MockCast) Run() error {
}

type MockCastServer struct {
proto.UnimplementedCloudProxyAPIServer
cloudproxyv1alpha.UnimplementedCloudProxyAPIServer

requestChan <-chan *proto.StreamCloudProxyResponse
responseChan chan<- *proto.StreamCloudProxyRequest
requestChan <-chan *cloudproxyv1alpha.StreamCloudProxyResponse
responseChan chan<- *cloudproxyv1alpha.StreamCloudProxyRequest

logger *log.Logger
}

func NewMockCastServer(requestChan <-chan *proto.StreamCloudProxyResponse, responseChan chan<- *proto.StreamCloudProxyRequest, logger *log.Logger) *MockCastServer {
func NewMockCastServer(requestChan <-chan *cloudproxyv1alpha.StreamCloudProxyResponse, responseChan chan<- *cloudproxyv1alpha.StreamCloudProxyRequest, logger *log.Logger) *MockCastServer {
return &MockCastServer{
requestChan: requestChan,
responseChan: responseChan,
logger: logger,
}
}

func (msrv *MockCastServer) Proxy(stream proto.CloudProxyAPI_StreamCloudProxyServer) error {
func (msrv *MockCastServer) Proxy(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyServer) error {
msrv.logger.Println("Received a proxy connection from client")

var eg errgroup.Group
Expand Down
20 changes: 10 additions & 10 deletions internal/e2etest/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ import (
"log"
"sync"

proto "cloud-proxy/proto/v1alpha"
cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha"
)

type Dispatcher struct {
pendingRequests map[string]chan *proto.StreamCloudProxyRequest
pendingRequests map[string]chan *cloudproxyv1alpha.StreamCloudProxyRequest
locker sync.Mutex

proxyRequestChan chan<- *proto.StreamCloudProxyResponse
proxyResponseChan <-chan *proto.StreamCloudProxyRequest
proxyRequestChan chan<- *cloudproxyv1alpha.StreamCloudProxyResponse
proxyResponseChan <-chan *cloudproxyv1alpha.StreamCloudProxyRequest

logger *log.Logger
}

func NewDispatcher(requestChan chan<- *proto.StreamCloudProxyResponse, responseChan <-chan *proto.StreamCloudProxyRequest, logger *log.Logger) *Dispatcher {
func NewDispatcher(requestChan chan<- *cloudproxyv1alpha.StreamCloudProxyResponse, responseChan <-chan *cloudproxyv1alpha.StreamCloudProxyRequest, logger *log.Logger) *Dispatcher {
return &Dispatcher{
pendingRequests: make(map[string]chan *proto.StreamCloudProxyRequest),
pendingRequests: make(map[string]chan *cloudproxyv1alpha.StreamCloudProxyRequest),
locker: sync.Mutex{},
proxyRequestChan: requestChan,
proxyResponseChan: responseChan,
Expand All @@ -40,21 +40,21 @@ func (d *Dispatcher) Run() {
}()
}

func (d *Dispatcher) SendRequest(req *proto.StreamCloudProxyResponse) (<-chan *proto.StreamCloudProxyRequest, error) {
func (d *Dispatcher) SendRequest(req *cloudproxyv1alpha.StreamCloudProxyResponse) (<-chan *cloudproxyv1alpha.StreamCloudProxyRequest, error) {
waiter := d.addRequestToWaitingList(req.MessageId)
d.proxyRequestChan <- req
return waiter, nil
}

func (d *Dispatcher) addRequestToWaitingList(requestID string) <-chan *proto.StreamCloudProxyRequest {
waiter := make(chan *proto.StreamCloudProxyRequest, 1)
func (d *Dispatcher) addRequestToWaitingList(requestID string) <-chan *cloudproxyv1alpha.StreamCloudProxyRequest {
waiter := make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 1)
d.locker.Lock()
d.pendingRequests[requestID] = waiter
d.locker.Unlock()
return waiter
}

func (d *Dispatcher) findWaiterForResponse(requestID string) chan *proto.StreamCloudProxyRequest {
func (d *Dispatcher) findWaiterForResponse(requestID string) chan *cloudproxyv1alpha.StreamCloudProxyRequest {
d.locker.Lock()
val, ok := d.pendingRequests[requestID]
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions internal/e2etest/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package e2etest

import (
"bytes"
proto "cloud-proxy/proto/v1alpha"
cloudproxyv1alpha "cloud-proxy/proto/gen/proto/v1alpha"
"fmt"
"io"
"log"
Expand All @@ -24,14 +24,14 @@ func NewHttpOverGrpcRoundTripper(dispatcher *Dispatcher, logger *log.Logger) *Ht
func (p *HttpOverGrpcRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
requestID := uuid.New().String()

headers := make(map[string]*proto.HeaderValue)
headers := make(map[string]*cloudproxyv1alpha.HeaderValue)
for h, v := range request.Header {
headers[h] = &proto.HeaderValue{Value: v}
headers[h] = &cloudproxyv1alpha.HeaderValue{Value: v}
}

protoReq := &proto.StreamCloudProxyResponse{
protoReq := &cloudproxyv1alpha.StreamCloudProxyResponse{
MessageId: requestID,
HttpRequest: &proto.HTTPRequest{
HttpRequest: &cloudproxyv1alpha.HTTPRequest{
Method: request.Method,
Path: request.URL.String(),
Headers: headers,
Expand Down
Loading

0 comments on commit c33cc70

Please sign in to comment.