Skip to content

Commit

Permalink
feat: gzip encode delta request content (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
saumas authored Aug 4, 2021
1 parent 2eb562e commit 43d57e1
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 9 deletions.
26 changes: 18 additions & 8 deletions internal/castai/castai.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package castai

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
Expand All @@ -24,8 +25,9 @@ const (
)

var (
hdrContentType = http.CanonicalHeaderKey("Content-Type")
hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey)
hdrContentType = http.CanonicalHeaderKey("Content-Type")
hdrContentEncoding = http.CanonicalHeaderKey("Content-Encoding")
hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey)
)

var DoNotSendLogs = struct{}{}
Expand Down Expand Up @@ -86,26 +88,34 @@ func (c *client) SendDelta(ctx context.Context, clusterID string, delta *Delta)
return fmt.Errorf("invalid url: %w", err)
}

r, w := io.Pipe()
pipeReader, pipeWriter := io.Pipe()

go func() {
defer func() {
if err := w.Close(); err != nil {
c.log.Errorf("closing pipe: %v", err)
if err := pipeWriter.Close(); err != nil {
c.log.Errorf("closing gzip pipe: %v", err)
}
}()

if err := json.NewEncoder(w).Encode(delta); err != nil {
c.log.Errorf("marshaling delta: %v", err)
gzipWriter := gzip.NewWriter(pipeWriter)
defer func() {
if err := gzipWriter.Close(); err != nil {
c.log.Errorf("closing gzip writer: %v", err)
}
}()

if err := json.NewEncoder(gzipWriter).Encode(delta); err != nil {
c.log.Errorf("compressing json: %v", err)
}
}()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), r)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), pipeReader)
if err != nil {
return fmt.Errorf("creating delta request: %w", err)
}

req.Header.Set(hdrContentType, "application/json")
req.Header.Set(hdrContentEncoding, "gzip")
req.Header.Set(hdrAPIKey, cfg.Key)

rc := c.rest.GetClient()
Expand Down
53 changes: 53 additions & 0 deletions internal/castai/castai_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package castai

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/go-resty/resty/v2"
"github.com/google/uuid"
Expand Down Expand Up @@ -35,3 +39,52 @@ func TestClient_RegisterCluster(t *testing.T) {
require.NoError(t, err)
require.Equal(t, registerClusterResp, got)
}

func TestClient_SendDelta(t *testing.T) {
rest := resty.New()
httpmock.ActivateNonDefault(rest.GetClient())
defer httpmock.Reset()

c := NewClient(logrus.New(), rest)

delta := &Delta{
ClusterID: uuid.New().String(),
ClusterVersion: "1.19+",
FullSnapshot: true,
Items: []*DeltaItem{
{
Event: EventAdd,
Kind: "Pod",
Data: "data",
CreatedAt: time.Now().UTC(),
},
},
}

require.NoError(t, os.Setenv("API_KEY", "key"))
require.NoError(t, os.Setenv("API_URL", "example.com"))

expectedURI := fmt.Sprintf("https://example.com/v1/kubernetes/clusters/%s/agent-deltas", delta.ClusterID)

httpmock.RegisterResponder(http.MethodPost, expectedURI, func(r *http.Request) (*http.Response, error) {
defer r.Body.Close()

require.Equal(t, "key", r.Header.Get(headerAPIKey))
require.Equal(t, "application/json", r.Header.Get("Content-Type"))
require.Equal(t, "gzip", r.Header.Get("Content-Encoding"))

zr, err := gzip.NewReader(r.Body)
require.NoError(t, err)
defer zr.Close()

actualDelta := &Delta{}
require.NoError(t, json.NewDecoder(zr).Decode(actualDelta))
require.Equal(t, delta, actualDelta)

return httpmock.NewStringResponse(203, ""), nil
})

err := c.SendDelta(context.Background(), delta.ClusterID, delta)

require.NoError(t, err)
}
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func run(ctx context.Context, castaiclient castai.Client, logger *logrus.Logger)
Version: Version,
}


fields["version"] = agentVersion.Version
log := logger.WithFields(fields)
log.Infof("running agent version: %v", agentVersion)
Expand Down

0 comments on commit 43d57e1

Please sign in to comment.