Skip to content

Commit

Permalink
Merge pull request #17 from castai/fix-try
Browse files Browse the repository at this point in the history
add timeouts and return
  • Loading branch information
ValyaB authored Oct 3, 2024
2 parents 1f2dd97 + 8f9cfb5 commit 4952af9
Show file tree
Hide file tree
Showing 6 changed files with 634 additions and 705 deletions.
54 changes: 2 additions & 52 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import (
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"

"cloud-proxy/internal/cloud/gcp"
"cloud-proxy/internal/cloud/gcp/gcpauth"
Expand All @@ -39,53 +34,8 @@ func main() {
logger.WithError(err).Panicf("Failed to create GCP credentials source")
}

dialOpts := make([]grpc.DialOption, 0)
if cfg.CastAI.DisableGRPCTLS {
// ONLY For testing purposes.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(nil)))
}

connectParams := grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 2 * time.Second,
Jitter: 0.1,
MaxDelay: 5 * time.Second,
Multiplier: 1.2,
},
}
dialOpts = append(dialOpts, grpc.WithConnectParams(connectParams))

if cfg.UseCompression {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name),
))
}

logger.Infof(
"Creating grpc channel against (%s) with connection config (%+v) and TLS enabled=%v",
cfg.CastAI.GrpcURL,
connectParams,
!cfg.CastAI.DisableGRPCTLS,
)
conn, err := grpc.NewClient(cfg.CastAI.GrpcURL, dialOpts...)
if err != nil {
logger.Panicf("Failed to connect to server: %v", err)
panic(err)
}

defer func(conn *grpc.ClientConn) {
logger.Info("Closing grpc connection")
err := conn.Close()
if err != nil {
logger.Panicf("Failed to close gRPC connection: %v", err)
panic(err)
}
}(conn)

client := proxy.New(conn, gcp.New(tokenSource), logger,
cfg.GetPodName(), cfg.ClusterID, GetVersion(), cfg.CastAI.APIKey, cfg.KeepAlive, cfg.KeepAliveTimeout)
client := proxy.New(gcp.New(tokenSource), logger,
GetVersion(), &cfg)

go startHealthServer(logger, cfg.HealthAddress)

Expand Down
30 changes: 16 additions & 14 deletions internal/e2etest/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,22 @@ func (p *HTTPOverGrpcRoundTripper) RoundTrip(request *http.Request) (*http.Respo

protoReq := &cloudproxyv1alpha.StreamCloudProxyResponse{
MessageId: requestID,
HttpRequest: &cloudproxyv1alpha.HTTPRequest{
Method: request.Method,
Path: request.URL.String(),
Headers: headers,
Body: func() []byte {
if request.Body == nil {
return []byte{}
}
body, err := io.ReadAll(request.Body)
if err != nil {
panic(fmt.Sprintf("Failed to read body: %v", err))
}
return body
}(),
Response: &cloudproxyv1alpha.StreamCloudProxyResponse_HttpRequest{
HttpRequest: &cloudproxyv1alpha.HTTPRequest{
Method: request.Method,
Path: request.URL.String(),
Headers: headers,
Body: func() []byte {
if request.Body == nil {
return []byte{}
}
body, err := io.ReadAll(request.Body)
if err != nil {
panic(fmt.Sprintf("Failed to read body: %v", err))
}
return body
}(),
},
},
}
waiter, err := p.dispatcher.SendRequest(protoReq)
Expand Down
Loading

0 comments on commit 4952af9

Please sign in to comment.