Skip to content

Commit

Permalink
updated simple.go
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek010 committed Nov 12, 2024
1 parent 105b6f9 commit b1f5227
Showing 1 changed file with 58 additions and 32 deletions.
90 changes: 58 additions & 32 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ package cache
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
log2 "github.com/rs/zerolog/log"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

// ResourceSnapshot is an abstract snapshot of a collection of resources that
Expand Down Expand Up @@ -60,6 +61,25 @@ type ResourceSnapshot interface {
GetVersionMap(typeURL string) map[string]string
}

type CustomSnapshotCacheOpts struct {
NodeId string
ResourceTypeUrl string
Resources map[string]*types.ResourceWithTTL
ResourcesBatch map[string]map[string]*types.ResourceWithTTL
ResourcesToRemove []string
Operation OperationOpts
}

type OperationOpts struct {
Id string
Checker func(nodestr string, ops OperationOpts) bool
AllowedNodesForOps []string
}

const (
CanaryOpsId = "canary"
)

// SnapshotCache is a snapshot-based cache that maintains a single versioned
// snapshot of responses per node. SnapshotCache consistently replies with the
// latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS
Expand Down Expand Up @@ -96,7 +116,7 @@ type SnapshotCache interface {

UpsertResources(ctx context.Context, node string, typ string, resourcesUpserted map[string]*types.ResourceWithTTL, isCanary bool) error

BatchUpsertResources(ctx context.Context, typ string, resourcesUpserted map[string]map[string]*types.ResourceWithTTL, isCanary bool) error
BatchUpsertResources(ctx context.Context, opts *CustomSnapshotCacheOpts) error

DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string, isCanary bool) error

Expand Down Expand Up @@ -244,20 +264,21 @@ func (cache *snapshotCache) ParseSystemVersionInfo(version string) int64 {
return parsed
}

func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string, batchResourcesUpserted map[string]map[string]*types.ResourceWithTTL, isCanary bool) error {
func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, opts *CustomSnapshotCacheOpts) error {
cache.mu.Lock()
defer cache.mu.Unlock()
for node, resourcesUpserted := range batchResourcesUpserted {
for node, resourcesUpserted := range opts.ResourcesBatch {
if snapshot, ok := cache.snapshots[node]; ok {
finalSnapshot := snapshot
// Add new/updated resources to the Resources map
index := GetResponseType(typ)
index := GetResponseType(opts.ResourceTypeUrl)
currentResources := snapshot.(*Snapshot).Resources[index]
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)

if currentResources.Items == nil {
// Fresh resources
// currentResources.Items = make(map[string]types.ResourceWithTTL)
log2.Info().Msgf("BatchUpsertResources: Not writing to cache as snapshot does not exist [node=%s][typeUrl=%s]", node, typ)
log2.Info().Msgf("BatchUpsertResources: Not writing to cache as snapshot does not exist [node=%s][typeUrl=%s]", node, opts.ResourceTypeUrl)
return nil
}

Expand All @@ -275,39 +296,34 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

// Update
if !isCanary {
// Prepare temporary snapshot for canary and do not update the cache
if opts.Operation.Id == CanaryOpsId {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[index] = currentResources
finalSnapshot = snap
} else {
snapshot.(*Snapshot).Resources[index] = currentResources
cache.snapshots[node] = snapshot
finalSnapshot = snapshot
}

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()

// Respond to delta watches for the node.
if !isCanary {
err := cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
if err != nil {
info.mu.Unlock()
continue
}
} else {
snap := &Snapshot{
Resources: [10]Resources{},
VersionMap: cache.snapshots[node].(*Snapshot).VersionMap,
}
snap.Resources[index] = currentResources
err := cache.respondDeltaWatches(ctx, info, snap, isCanary)
if err != nil {
info.mu.Unlock()
continue
}
err := cache.respondDeltaWatches(ctx, info, finalSnapshot, opts.Operation)
if err != nil {
info.mu.Unlock()
continue
}
info.mu.Unlock()
}
} else {
log2.Info().Msgf("BatchUpsertResources: Not writing to cache as snapshot does not exist [node=%s][typeUrl=%s]", node, typ)
log2.Info().Msgf("BatchUpsertResources: Not writing to cache as snapshot does not exist [node=%s][typeUrl=%s]", node, opts.ResourceTypeUrl)
return nil

//resources := make(map[resource.Type][]types.ResourceWithTTL)
Expand Down Expand Up @@ -380,7 +396,7 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty

// Respond to delta watches for the node.
if !isCanary {
return cache.respondDeltaWatches(ctx, info, snapshot, isCanary)
return cache.respondDeltaWatches(ctx, info, snapshot, nil)
} else {
snap := &Snapshot{
Resources: [10]Resources{},
Expand Down Expand Up @@ -735,7 +751,7 @@ func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *status
return nil
}

func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot, isCanary bool) error {
func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot, ops *OperationOpts) error {
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
Expand All @@ -755,6 +771,12 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
info.orderResponseDeltaWatches()
for _, key := range info.orderedDeltaWatches {
watch := info.deltaWatches[key.ID]

// Check if Checker allows to proceed
if !ops.Checker(GetEnvoyNodeStr(watch.Request.GetNode()), ops) {
return nil
}

res, err := cache.respondDelta(
ctx,
snapshot,
Expand All @@ -773,6 +795,10 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
}
} else {
for id, watch := range info.deltaWatches {
// Check if Checker allows to proceed
if !ops.Checker(GetEnvoyNodeStr(watch.Request.GetNode()), ops) {
return nil
}
res, err := cache.respondDelta(
ctx,
snapshot,
Expand Down

0 comments on commit b1f5227

Please sign in to comment.