Skip to content

Commit

Permalink
cleaned proto and fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ValyaB committed Oct 1, 2024
1 parent 3b95298 commit da055a0
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 523 deletions.
65 changes: 19 additions & 46 deletions internal/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ type CloudClient interface {
type Client struct {
cfg *config.Config

cloudClient CloudClient
log *logrus.Logger
podName string
clusterID string
cloudClient CloudClient
log *logrus.Logger
podName string
clusterID string
streamRuntimeName string

errCount atomic.Int64
processedCount atomic.Int64
Expand Down Expand Up @@ -141,30 +142,6 @@ func (c *Client) getStream(ctx context.Context) (cloudproxyv1alpha.CloudProxyAPI
return stream, cancelFunc, nil
}

func (c *Client) sendInitialRequest(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) error {
c.log.Info("Sending initial request to castai")

err := stream.Send(&cloudproxyv1alpha.StreamCloudProxyRequest{
Request: &cloudproxyv1alpha.StreamCloudProxyRequest_InitialRequest{
InitialRequest: &cloudproxyv1alpha.InitialCloudProxyRequest{
ClientMetadata: &cloudproxyv1alpha.ClientMetadata{
PodName: c.podName,
ClusterId: c.clusterID,
},
Version: c.version,
},
},
})
if err != nil {
return fmt.Errorf("stream.Send: initial request %w", err)
}
c.lastSeenSend.Store(time.Now().UnixNano())

c.log.Info("Stream to castai started successfully")

return nil
}

func (c *Client) prepareAndRun(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -175,18 +152,14 @@ func (c *Client) prepareAndRun(ctx context.Context) error {
}
defer closeConnection()

c.streamRuntimeName = fmt.Sprintf("stream-%s-%v", c.podName, time.Now().UnixNano())
c.lastSeenReceive.Store(time.Now().UnixNano())
c.lastSeenSend.Store(time.Now().UnixNano())

return c.sendAndReceive(ctx, stream)
}

func (c *Client) sendAndReceive(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) error {
err := c.sendInitialRequest(stream)
if err != nil {
return fmt.Errorf("c.Connect: %w", err)
}

eg, egctx := errgroup.WithContext(ctx)

sendCh := make(chan *cloudproxyv1alpha.StreamCloudProxyRequest, 10)
Expand Down Expand Up @@ -216,7 +189,7 @@ func (c *Client) sendAndReceive(ctx context.Context, stream cloudproxyv1alpha.Cl
}
}()

err = eg.Wait()
err := eg.Wait()
if err != nil {
c.log.Errorf("sendAndReceive: closing with error: %v", err)
}
Expand All @@ -237,7 +210,7 @@ func (c *Client) send(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAP
case <-stream.Context().Done():
return fmt.Errorf("stream closed %w", stream.Context().Err())
case req := <-sendCh:
c.log.Printf("Sending message to stream %v", req.GetResponse().GetMessageId())
c.log.Printf("Sending message to stream %v len=%v", req.GetResponse().GetMessageId(), len(req.GetResponse().GetHttpResponse().GetBody()))
if err := stream.Send(req); err != nil {
c.log.WithError(err).Warn("failed to send message to stream")
return fmt.Errorf("failed to send message to stream: %w", err)
Expand Down Expand Up @@ -276,7 +249,7 @@ func (c *Client) receive(ctx context.Context, stream cloudproxyv1alpha.CloudProx

c.lastSeenReceive.Store(time.Now().UnixNano())
c.log.Debugf("Handling message from castai")
go c.handleMessage(stream.Context(), in, respCh)
go c.handleMessage(ctx, in, respCh)
}
}

Expand Down Expand Up @@ -310,7 +283,7 @@ func (c *Client) handleMessage(ctx context.Context, in *cloudproxyv1alpha.Stream
Request: &cloudproxyv1alpha.StreamCloudProxyRequest_Response{
Response: &cloudproxyv1alpha.ClusterResponse{
ClientMetadata: &cloudproxyv1alpha.ClientMetadata{
PodName: c.podName,
PodName: c.streamRuntimeName,
ClusterId: c.clusterID,
},
MessageId: in.GetMessageId(),
Expand All @@ -323,15 +296,15 @@ func (c *Client) handleMessage(ctx context.Context, in *cloudproxyv1alpha.Stream
}

func (c *Client) processConfigurationRequest(in *cloudproxyv1alpha.StreamCloudProxyResponse) {
if in.ConfigurationRequest == nil {
if in.GetConfigurationRequest() == nil {
return
}

if in.ConfigurationRequest.GetKeepAlive() != 0 {
c.keepAlive.Store(in.ConfigurationRequest.GetKeepAlive())
if in.GetConfigurationRequest().GetKeepAlive() != 0 {
c.keepAlive.Store(in.GetConfigurationRequest().GetKeepAlive())
}
if in.ConfigurationRequest.GetKeepAliveTimeout() != 0 {
c.keepAliveTimeout.Store(in.ConfigurationRequest.GetKeepAliveTimeout())
if in.GetConfigurationRequest().GetKeepAliveTimeout() != 0 {
c.keepAliveTimeout.Store(in.GetConfigurationRequest().GetKeepAliveTimeout())
}

c.log.Debugf("Updated keep-alive configuration to %v and keep-alive timeout to %v", time.Duration(c.keepAlive.Load()).Seconds(), time.Duration(c.keepAliveTimeout.Load()).Seconds())
Expand Down Expand Up @@ -371,8 +344,8 @@ func (c *Client) isAlive() error {
lastSeenSendDiff := time.Now().UnixNano() - lastSeenSend

if lastSeenReceiveDiff > keepAliveTimeout || lastSeenSendDiff > keepAliveTimeout {
c.log.Warnf("last seen receive %v, last seen send %v",
time.Duration(lastSeenReceiveDiff).Seconds(), time.Duration(lastSeenSendDiff).Seconds())
c.log.Warnf("last seen receive %v, last seen send %v %v",
time.Duration(lastSeenReceiveDiff).Seconds(), time.Duration(lastSeenSendDiff).Seconds(), errAlive)
return errAlive
}

Expand All @@ -398,11 +371,11 @@ func (c *Client) sendKeepAlive(ctx context.Context, stream cloudproxyv1alpha.Clo
Request: &cloudproxyv1alpha.StreamCloudProxyRequest_ClientStats{
ClientStats: &cloudproxyv1alpha.ClientStats{
ClientMetadata: &cloudproxyv1alpha.ClientMetadata{
PodName: c.podName,
PodName: c.streamRuntimeName,
ClusterId: c.clusterID,
Version: c.version,
},
Stats: &cloudproxyv1alpha.ClientStats_Stats{
Status: cloudproxyv1alpha.ClientStats_Stats_OK,
Timestamp: time.Now().UnixNano(),
},
},
Expand Down
18 changes: 14 additions & 4 deletions internal/proxy/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,11 @@ func TestClient_handleMessage(t *testing.T) {
args: args{
in: &cloudproxyv1alpha.StreamCloudProxyResponse{
MessageId: KeepAliveMessageID,
ConfigurationRequest: &cloudproxyv1alpha.ConfigurationRequest{
KeepAlive: 1,
KeepAliveTimeout: 2,
Response: &cloudproxyv1alpha.StreamCloudProxyResponse_ConfigurationRequest{
ConfigurationRequest: &cloudproxyv1alpha.ConfigurationRequest{
KeepAlive: 1,
KeepAliveTimeout: 2,
},
},
},
},
Expand Down Expand Up @@ -465,6 +467,12 @@ func TestClient_sendAndReceive(t *testing.T) {
},
tuneMockStream: func(m *mock_proxy.MockCloudProxyAPI_StreamCloudProxyClient) {
m.EXPECT().Send(gomock.Any()).Return(fmt.Errorf("test error"))
m.EXPECT().Recv().DoAndReturn(func() (*cloudproxyv1alpha.StreamCloudProxyResponse, error) {
time.Sleep(time.Millisecond)
return &cloudproxyv1alpha.StreamCloudProxyResponse{}, nil
}).AnyTimes()
m.EXPECT().Context().Return(context.Background()).AnyTimes()
m.EXPECT().CloseSend().Return(nil)
},
},
wantErr: true,
Expand Down Expand Up @@ -518,8 +526,10 @@ func TestClient_sendAndReceive(t *testing.T) {
PodName: "podName",
},
KeepAlive: time.Second,
KeepAliveTimeout: time.Minute,
KeepAliveTimeout: time.Second * 2,
})
c.lastSeenReceive.Store(time.Now().UnixNano())
c.lastSeenSend.Store(time.Now().UnixNano())
stream := mock_proxy.NewMockCloudProxyAPI_StreamCloudProxyClient(ctrl)
if tt.args.tuneMockStream != nil {
tt.args.tuneMockStream(stream)
Expand Down
Loading

0 comments on commit da055a0

Please sign in to comment.