Skip to content

Commit

Permalink
feat: send cluster deltas instead of full snapshots (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
saumas authored May 12, 2021
1 parent cd7715b commit 7397bbe
Show file tree
Hide file tree
Showing 22 changed files with 694 additions and 887 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ go 1.16

require (
github.com/aws/aws-sdk-go v1.37.23
github.com/cenkalti/backoff/v4 v4.1.0
github.com/go-resty/resty/v2 v2.5.0
github.com/golang/mock v1.4.1
github.com/google/uuid v1.1.2
github.com/jarcoal/httpmock v1.0.8
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
Expand Down
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -91,6 +89,7 @@ github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -130,6 +129,7 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -204,6 +204,7 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
Expand Down Expand Up @@ -295,13 +296,12 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs=
github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -687,6 +687,7 @@ k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=
k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd h1:sOHNzJIkytDF6qadMNKhhDRpc6ODik8lVC6nOur7B2c=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210111153108-fddb29f9d009 h1:0T5IaWHO3sJTEmCP6mUlBvMukxPKUQWqiI/YuiBNMiQ=
Expand Down
107 changes: 32 additions & 75 deletions internal/castai/castai.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-resty/resty/v2"
"github.com/sirupsen/logrus"

Expand All @@ -27,21 +24,19 @@ const (
)

var (
hdrContentType = http.CanonicalHeaderKey("Content-Type")
hdrContentDisposition = http.CanonicalHeaderKey("Content-Disposition")
hdrContentType = http.CanonicalHeaderKey("Content-Type")
hdrAPIKey = http.CanonicalHeaderKey(headerAPIKey)
)

// Client responsible for communication between the agent and CAST AI API.
type Client interface {
// RegisterCluster sends a request to CAST AI containing discovered cluster properties used to authenticate the
// cluster and register it.
RegisterCluster(ctx context.Context, req *RegisterClusterRequest) (*RegisterClusterResponse, error)
// SendClusterSnapshot sends a cluster snapshot to CAST AI to enable savings estimations / autoscaling / etc.
SendClusterSnapshot(ctx context.Context, snap *Snapshot) error
// SendClusterSnapshotWithRetry sends cluster snapshot with retries to CAST AI to enable savings estimations / autoscaling / etc.
SendClusterSnapshotWithRetry(ctx context.Context, snap *Snapshot) error
// GetAgentCfg is used to poll CAST AI for agent configuration which can be updated via UI or other means.
GetAgentCfg(ctx context.Context, clusterID string) (*AgentCfgResponse, error)
// SendDelta sends the kubernetes state change to CAST AI. Function is noop when items are empty.
SendDelta(ctx context.Context, delta *Delta) error
}

// NewClient creates and configures the CAST AI client.
Expand All @@ -60,7 +55,7 @@ func NewDefaultClient() *resty.Client {
client.SetHostURL(fmt.Sprintf("https://%s", cfg.URL))
client.SetRetryCount(defaultRetryCount)
client.SetTimeout(defaultTimeout)
client.Header.Set(headerAPIKey, cfg.Key)
client.Header.Set(hdrAPIKey, cfg.Key)

return client
}
Expand All @@ -70,63 +65,41 @@ type client struct {
rest *resty.Client
}

func (c *client) RegisterCluster(ctx context.Context, req *RegisterClusterRequest) (*RegisterClusterResponse, error) {
body := &RegisterClusterResponse{}
resp, err := c.rest.R().
SetBody(req).
SetResult(body).
SetContext(ctx).
Post("/v1/kubernetes/external-clusters")
if err != nil {
return nil, err
}
if resp.IsError() {
return nil, fmt.Errorf("request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
}

c.log.Infof("cluster registered: %+v", body)

return body, nil
}
func (c *client) SendDelta(ctx context.Context, delta *Delta) error {
c.log.Infof("sending delta with items[%d]", len(delta.Items))

func (c *client) SendClusterSnapshot(ctx context.Context, snap *Snapshot) error {
cfg := config.Get().API

uri, err := url.Parse(fmt.Sprintf("https://%s/v1/agent/snapshot", cfg.URL))
uri, err := url.Parse(fmt.Sprintf("https://%s/v1/agent/cluster-delta", cfg.URL))
if err != nil {
return err
return fmt.Errorf("invalid url: %w", err)
}

r, w := io.Pipe()
mw := multipart.NewWriter(w)

go func() {
defer func() {
if err := w.Close(); err != nil {
c.log.Errorf("closing pipe: %v", err)
}
}()
defer func() {
if err := mw.Close(); err != nil {
c.log.Errorf("closing multipart writer: %w", err)
}
}()
if err := writeSnapshotPart(mw, snap); err != nil {
c.log.Errorf("writing snapshot content: %v", err)

if err := json.NewEncoder(w).Encode(delta); err != nil {
c.log.Errorf("marshaling delta: %v", err)
}
}()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri.String(), r)
if err != nil {
return err
return fmt.Errorf("creating delta request: %w", err)
}

req.Header.Set(hdrContentType, mw.FormDataContentType())
req.Header.Set(headerAPIKey, cfg.Key)
req.Header.Set(hdrContentType, "application/json")
req.Header.Set(hdrAPIKey, cfg.Key)

resp, err := c.rest.GetClient().Do(req)
if err != nil {
return err
return fmt.Errorf("sending delta request: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
Expand All @@ -139,30 +112,31 @@ func (c *client) SendClusterSnapshot(ctx context.Context, snap *Snapshot) error
if _, err := buf.ReadFrom(resp.Body); err != nil {
c.log.Errorf("failed reading error response body: %v", err)
}
return fmt.Errorf("request failed with status_code=%d", resp.StatusCode)
return fmt.Errorf("delta request error status_code=%d body=%s", resp.StatusCode, buf.String())
}

c.log.Infof(
"snapshot with nodes[%d], pods[%d] sent, response_code=%d",
len(snap.NodeList.Items),
len(snap.PodList.Items),
resp.StatusCode,
)
c.log.Infof("delta with items[%d] sent, response_code=%d", len(delta.Items), resp.StatusCode)

return nil
}

func (c *client) SendClusterSnapshotWithRetry(ctx context.Context, snap *Snapshot) error {
b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
op := func() error {
return c.SendClusterSnapshot(ctx, snap)
func (c *client) RegisterCluster(ctx context.Context, req *RegisterClusterRequest) (*RegisterClusterResponse, error) {
body := &RegisterClusterResponse{}
resp, err := c.rest.R().
SetBody(req).
SetResult(body).
SetContext(ctx).
Post("/v1/kubernetes/external-clusters")
if err != nil {
return nil, err
}

if err := backoff.Retry(op, b); err != nil {
return fmt.Errorf("sending snapshot data: %v", err)
if resp.IsError() {
return nil, fmt.Errorf("request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
}

return nil
c.log.Infof("cluster registered: %+v", body)

return body, nil
}

func (c *client) GetAgentCfg(ctx context.Context, clusterID string) (*AgentCfgResponse, error) {
Expand All @@ -180,20 +154,3 @@ func (c *client) GetAgentCfg(ctx context.Context, clusterID string) (*AgentCfgRe

return body, nil
}

func writeSnapshotPart(mw *multipart.Writer, snap *Snapshot) error {
header := textproto.MIMEHeader{}
header.Set(hdrContentDisposition, `form-data; name="payload"; filename="payload.json"`)
header.Set(hdrContentType, "application/json")

bw, err := mw.CreatePart(header)
if err != nil {
return fmt.Errorf("creating payload part: %w", err)
}

if err := json.NewEncoder(bw).Encode(snap); err != nil {
return fmt.Errorf("marshaling snapshot payload: %w", err)
}

return nil
}
58 changes: 0 additions & 58 deletions internal/castai/castai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@ import (
"context"
"encoding/json"
"net/http"
"os"
"testing"

"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"github.com/jarcoal/httpmock"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"castai-agent/internal/services/collector"
)

func TestClient_RegisterCluster(t *testing.T) {
Expand All @@ -40,56 +35,3 @@ func TestClient_RegisterCluster(t *testing.T) {
require.NoError(t, err)
require.Equal(t, registerClusterResp, got)
}

func TestClient_SendClusterSnapshot(t *testing.T) {
require.NoError(t, os.Setenv("API_KEY", "api-key"))
require.NoError(t, os.Setenv("API_URL", "localhost"))

rest := resty.New()
httpmock.ActivateNonDefault(rest.GetClient())
defer httpmock.Reset()

c := NewClient(logrus.New(), rest)

snapshot := &Snapshot{
ClusterID: uuid.New().String(),
ClusterData: &collector.ClusterData{
NodeList: &corev1.NodeList{
Items: []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
},
},
},
PodList: &corev1.PodList{
Items: []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
},
},
},
},
}

httpmock.RegisterResponder(http.MethodPost, "https://localhost/v1/agent/snapshot", func(req *http.Request) (*http.Response, error) {
f, _, err := req.FormFile("payload")
require.NoError(t, err)

actualRequest := &Snapshot{}
require.NoError(t, json.NewDecoder(f).Decode(actualRequest))

require.Equal(t, snapshot, actualRequest)

require.Equal(t, "api-key", req.Header.Get(headerAPIKey))

return httpmock.NewStringResponse(http.StatusNoContent, "ok"), nil
})

err := c.SendClusterSnapshot(context.Background(), snapshot)

require.NoError(t, err)
}
26 changes: 6 additions & 20 deletions internal/castai/mock/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7397bbe

Please sign in to comment.