From 43d57e1684979354b05cfe6726e6fd50cfe8a273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saulius=20Ma=C5=A1nauskas?= Date: Wed, 4 Aug 2021 15:22:25 +0300 Subject: [PATCH] feat: gzip encode delta request content (#34) --- internal/castai/castai.go | 26 ++++++++++++----- internal/castai/castai_test.go | 53 ++++++++++++++++++++++++++++++++++ main.go | 1 - 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/internal/castai/castai.go b/internal/castai/castai.go index 52b7ec02..c304462d 100644 --- a/internal/castai/castai.go +++ b/internal/castai/castai.go @@ -3,6 +3,7 @@ package castai import ( "bytes" + "compress/gzip" "context" "encoding/json" "fmt" @@ -24,8 +25,9 @@ const ( ) var ( - hdrContentType = http.CanonicalHeaderKey("Content-Type") - hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey) + hdrContentType = http.CanonicalHeaderKey("Content-Type") + hdrContentEncoding = http.CanonicalHeaderKey("Content-Encoding") + hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey) ) var DoNotSendLogs = struct{}{} @@ -86,26 +88,34 @@ func (c *client) SendDelta(ctx context.Context, clusterID string, delta *Delta) return fmt.Errorf("invalid url: %w", err) } - r, w := io.Pipe() + pipeReader, pipeWriter := io.Pipe() go func() { defer func() { - if err := w.Close(); err != nil { - c.log.Errorf("closing pipe: %v", err) + if err := pipeWriter.Close(); err != nil { + c.log.Errorf("closing gzip pipe: %v", err) } }() - if err := json.NewEncoder(w).Encode(delta); err != nil { - c.log.Errorf("marshaling delta: %v", err) + gzipWriter := gzip.NewWriter(pipeWriter) + defer func() { + if err := gzipWriter.Close(); err != nil { + c.log.Errorf("closing gzip writer: %v", err) + } + }() + + if err := json.NewEncoder(gzipWriter).Encode(delta); err != nil { + c.log.Errorf("compressing json: %v", err) } }() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), r) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), pipeReader) if err != nil { return fmt.Errorf("creating delta request: %w", err) } req.Header.Set(hdrContentType, "application/json") + req.Header.Set(hdrContentEncoding, "gzip") req.Header.Set(hdrAPIKey, cfg.Key) rc := c.rest.GetClient() diff --git a/internal/castai/castai_test.go b/internal/castai/castai_test.go index 71c66193..c962dd77 100644 --- a/internal/castai/castai_test.go +++ b/internal/castai/castai_test.go @@ -1,10 +1,14 @@ package castai import ( + "compress/gzip" "context" "encoding/json" + "fmt" "net/http" + "os" "testing" + "time" "github.com/go-resty/resty/v2" "github.com/google/uuid" @@ -35,3 +39,52 @@ func TestClient_RegisterCluster(t *testing.T) { require.NoError(t, err) require.Equal(t, registerClusterResp, got) } + +func TestClient_SendDelta(t *testing.T) { + rest := resty.New() + httpmock.ActivateNonDefault(rest.GetClient()) + defer httpmock.Reset() + + c := NewClient(logrus.New(), rest) + + delta := &Delta{ + ClusterID: uuid.New().String(), + ClusterVersion: "1.19+", + FullSnapshot: true, + Items: []*DeltaItem{ + { + Event: EventAdd, + Kind: "Pod", + Data: "data", + CreatedAt: time.Now().UTC(), + }, + }, + } + + require.NoError(t, os.Setenv("API_KEY", "key")) + require.NoError(t, os.Setenv("API_URL", "example.com")) + + expectedURI := fmt.Sprintf("https://example.com/v1/kubernetes/clusters/%s/agent-deltas", delta.ClusterID) + + httpmock.RegisterResponder(http.MethodPost, expectedURI, func(r *http.Request) (*http.Response, error) { + defer r.Body.Close() + + require.Equal(t, "key", r.Header.Get(headerAPIKey)) + require.Equal(t, "application/json", r.Header.Get("Content-Type")) + require.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + + zr, err := gzip.NewReader(r.Body) + require.NoError(t, err) + defer zr.Close() + + actualDelta := &Delta{} + require.NoError(t, json.NewDecoder(zr).Decode(actualDelta)) + require.Equal(t, delta, actualDelta) + + return httpmock.NewStringResponse(203, ""), nil + }) + + err := c.SendDelta(context.Background(), delta.ClusterID, delta) + + require.NoError(t, err) +} diff --git a/main.go b/main.go index 9724e4db..6e7ee86f 100644 --- a/main.go +++ b/main.go @@ -72,7 +72,6 @@ func run(ctx context.Context, castaiclient castai.Client, logger *logrus.Logger) Version: Version, } - fields["version"] = agentVersion.Version log := logger.WithFields(fields) log.Infof("running agent version: %v", agentVersion)