Skip to content

Commit

Permalink
Merge pull request #48 from castai/lost_delete_delta_fix
Browse files Browse the repository at this point in the history
keep delete delta in cache
  • Loading branch information
mindaugasCast authored Dec 15, 2021
2 parents d57c2cd + 7b32b15 commit dbe5a2a
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 6 deletions.
143 changes: 143 additions & 0 deletions internal/services/controller/controller_exclude_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//go:build !race
// +build !race

package controller

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"castai-agent/internal/castai"
mock_castai "castai-agent/internal/castai/mock"
"castai-agent/internal/config"
mock_types "castai-agent/internal/services/providers/types/mock"
mock_version "castai-agent/internal/services/version/mock"
"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
)

func TestController_ShouldKeepDeltaAfterDelete(t *testing.T) {
mockctrl := gomock.NewController(t)
castaiclient := mock_castai.NewMockClient(mockctrl)
version := mock_version.NewMockInterface(mockctrl)
provider := mock_types.NewMockProvider(mockctrl)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: v1.NamespaceDefault, Name: "pod1"}}
podData, err := encode(pod)
require.NoError(t, err)

clientset := fake.NewSimpleClientset()
f := informers.NewSharedInformerFactory(clientset, 0)

version.EXPECT().MinorInt().Return(19)
version.EXPECT().Full().Return("1.19+")

clusterID := uuid.New()

var invocations int64

// initial full snapshot
castaiclient.EXPECT().
SendDelta(gomock.Any(), clusterID.String(), gomock.Any()).
DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error {
defer atomic.AddInt64(&invocations, 1)

require.Equal(t, clusterID, d.ClusterID)
require.Equal(t, "1.19+", d.ClusterVersion)
require.True(t, d.FullSnapshot)
require.Len(t, d.Items, 0)

clientset.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{})

return nil
})

// first delta add pod - fail and trigger pod delete
castaiclient.EXPECT().
SendDelta(gomock.Any(), clusterID.String(), gomock.Any()).
DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error {
defer atomic.AddInt64(&invocations, 1)

require.Equal(t, clusterID, d.ClusterID)
require.Equal(t, "1.19+", d.ClusterVersion)
require.False(t, d.FullSnapshot)
require.Len(t, d.Items, 1)

var actualValues []string
for _, item := range d.Items {
actualValues = append(actualValues, fmt.Sprintf("%s-%s-%s", item.Event, item.Kind, item.Data))
}

require.Contains(t, actualValues, fmt.Sprintf("%s-%s-%s", castai.EventAdd, "Pod", podData))

clientset.CoreV1().Pods("default").Delete(ctx, pod.Name, metav1.DeleteOptions{})

return fmt.Errorf("testError")
})

// second attempt to send data when pod delete is received
castaiclient.EXPECT().
SendDelta(gomock.Any(), clusterID.String(), gomock.Any()).
DoAndReturn(func(_ context.Context, clusterID string, d *castai.Delta) error {
defer atomic.AddInt64(&invocations, 1)

require.Equal(t, clusterID, d.ClusterID)
require.Equal(t, "1.19+", d.ClusterVersion)
require.False(t, d.FullSnapshot)
require.Len(t, d.Items, 1)

var actualValues []string
for _, item := range d.Items {
actualValues = append(actualValues, fmt.Sprintf("%s-%s-%s", item.Event, item.Kind, item.Data))
}

require.Contains(t, actualValues, fmt.Sprintf("%s-%s-%s", castai.EventDelete, "Pod", podData))

return nil
})

agentVersion := &config.AgentVersion{Version: "1.2.3"}
castaiclient.EXPECT().ExchangeAgentTelemetry(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
Return(&castai.AgentTelemetryResponse{}, nil).
Do(func(ctx context.Context, clusterID string, req *castai.AgentTelemetryRequest) {
require.Equalf(t, "1.2.3", req.AgentVersion, "got request: %+v", req)
})

log := logrus.New()
log.SetLevel(logrus.DebugLevel)
ctrl := New(
log,
f,
castaiclient,
provider,
clusterID.String(),
2*time.Second,
2*time.Second,
10*time.Millisecond,
version,
agentVersion,
)

f.Start(ctx.Done())

go ctrl.Run(ctx)

wait.Until(func() {
if atomic.LoadInt64(&invocations) >= 3 {
cancel()
}
}, 10*time.Millisecond, ctx.Done())
}
2 changes: 1 addition & 1 deletion internal/services/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestController_HappyPath(t *testing.T) {
clusterID.String(),
15*time.Second,
2*time.Second,
10 * time.Millisecond,
10*time.Millisecond,
version,
agentVersion,
)
Expand Down
4 changes: 1 addition & 3 deletions internal/services/controller/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ type delta struct {
func (d *delta) add(i *item) {
key := mustKeyObject(i.obj)

if other, ok := d.cache[key]; ok && other.event == eventAdd && i.event == eventDelete {
delete(d.cache, key)
} else if ok && other.event == eventAdd && i.event == eventUpdate {
if other, ok := d.cache[key]; ok && other.event == eventAdd && i.event == eventUpdate {
i.event = eventAdd
d.cache[key] = i
} else if ok && other.event == eventDelete && (i.event == eventAdd || i.event == eventUpdate) {
Expand Down
11 changes: 9 additions & 2 deletions internal/services/controller/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestDelta(t *testing.T) {
},
},
{
name: "debounce: entirely remove added item when it is deleted",
name: "debounce: keep only delete event when an added item is deleted",
items: []*item{
{
obj: pod1,
Expand All @@ -106,6 +106,13 @@ func TestDelta(t *testing.T) {
ClusterID: clusterID,
ClusterVersion: version,
FullSnapshot: true,
Items: []*castai.DeltaItem{
{
Event: castai.EventDelete,
Kind: "Pod",
Data: mustEncode(t, pod1),
},
},
},
},
{
Expand Down Expand Up @@ -198,7 +205,7 @@ func TestDelta(t *testing.T) {
require.Equal(t, clusterID, got.ClusterID)
require.Equal(t, version, got.ClusterVersion)
require.True(t, got.FullSnapshot)
require.Equal(t, len(got.Items), len(test.expected.Items))
require.Equal(t, len(test.expected.Items), len(got.Items))
for _, expectedItem := range test.expected.Items {
requireContains(t, got.Items, expectedItem)
}
Expand Down

0 comments on commit dbe5a2a

Please sign in to comment.