Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
krhitesh7 committed Jan 16, 2025
1 parent d87ad3f commit 06eb56f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 84 deletions.
63 changes: 23 additions & 40 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cache

import (
"context"
"fmt"
"strings"

"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
Expand Down Expand Up @@ -78,20 +79,24 @@ func containsPrefixedKeyResources(data map[string]VTMarshaledResource, keyLike s
func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered map[string]VTMarshaledResource
var filtered []VTMarshaledResource
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
filtered = make(map[string]VTMarshaledResource)
nextVersionMap = make(map[string]string, 0)
if len(state.GetResourceVersions()) == 0 {
filtered = make([]VTMarshaledResource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
// Since we've already precomputed the version hashes of the new snapshot,
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
if !found || (prevVersion != version) {
filtered[name] = r
filtered = append(filtered, r)
}
}

Expand All @@ -103,55 +108,33 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
}
}
default:
filtered = make(map[string]VTMarshaledResource)
nextVersionMap = make(map[string]string, 0)
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
dirResourceName := name
if strings.Contains(dirResourceName, "*") {
dirResourceName = strings.Split(dirResourceName, "*")[0]
prevVersions, _ := containsPrefixedKey(state.GetResourceVersions(), dirResourceName)
currVersions, _ := containsPrefixedKeyResources(resources.resourceMap, dirResourceName)
combinedVersions := combineUnique(prevVersions, currVersions)

for _, versionName := range combinedVersions {
prevVersion, found := state.GetResourceVersions()[versionName]
if r, ok := resources.resourceMap[versionName]; ok {
nextVersion := resources.versionMap[versionName]
if prevVersion != nextVersion {
filtered[r.Name] = r
}
nextVersionMap[versionName] = nextVersion
} else if found {
toRemove = append(toRemove, versionName)
}
}
} else {
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered[r.Name] = r
}
nextVersionMap[name] = nextVersion
} else if found {
toRemove = append(toRemove, name)
prevVersion, found := state.GetResourceVersions()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
filtered = append(filtered, r)
}
nextVersionMap[name] = nextVersion
} else if found {
toRemove = append(toRemove, name)
}
}
}

filteredResources := make([]VTMarshaledResource, 0)
filteredResourceNames := make([]string, 0)
for name, r := range filtered {
filteredResources = append(filteredResources, r)
filteredResourceNames = append(filteredResourceNames, name)
for _, f := range filtered {
filteredResourceNames = append(filteredResourceNames, f.Name)
}

fmt.Printf("type %s, state %v filtered %v toRemove %v versionMap %v\n", req.GetTypeUrl(), state.GetResourceVersions(), filteredResourceNames, toRemove, resources.versionMap)

return &RawDeltaResponse{
DeltaRequest: req,
Resources: filteredResources,
Resources: filtered,
RemovedResources: toRemove,
NextVersionMap: nextVersionMap,
SystemVersionInfo: resources.systemVersion,
Expand Down
79 changes: 36 additions & 43 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)

if currentResources.Items == nil {
currentResources.Items = make(map[string]VTMarshaledResource)
// Batched resource are not state of the world. It is the delta resources.
// Only put state of the world items in the resources map.
return nil
}

for name, r := range resourcesUpserted {
Expand Down Expand Up @@ -293,29 +295,29 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string
info.mu.Unlock()
}
} else {
resources := make(map[resource.Type][]types.ResourceWithTTL)
resources[typ] = make([]types.ResourceWithTTL, 0)
for _, r := range resourcesUpserted {
resources[typ] = append(resources[typ], *r)
}
s, err := NewSnapshotWithTTLs("0", resources)
if err != nil {
continue
}
cache.snapshots[node] = s

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()

// Respond to delta watches for the node.
err := cache.respondDeltaWatches(ctx, info, s)
if err != nil {
info.mu.Unlock()
continue
}
info.mu.Unlock()
}
// resources := make(map[resource.Type][]types.ResourceWithTTL)
// resources[typ] = make([]types.ResourceWithTTL, 0)
// for _, r := range resourcesUpserted {
// resources[typ] = append(resources[typ], *r)
// }
// s, err := NewSnapshotWithTTLs("0", resources)
// if err != nil {
// continue
// }
// cache.snapshots[node] = s

// // Respond deltas
// if info, ok := cache.status[node]; ok {
// info.mu.Lock()

// // Respond to delta watches for the node.
// err := cache.respondDeltaWatches(ctx, info, s)
// if err != nil {
// info.mu.Unlock()
// continue
// }
// info.mu.Unlock()
// }
}
}

Expand All @@ -324,6 +326,7 @@ func (cache *snapshotCache) BatchUpsertResources(ctx context.Context, typ string

func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, typ string, resourcesUpserted map[string]*types.ResourceWithTTL) error {
cache.mu.Lock()
fmt.Printf("UpsertResources node %s, typ %s, resourcesUpserted %v\n", node, typ, resourcesUpserted)
if snapshot, ok := cache.snapshots[node]; ok {
defer cache.mu.Unlock()
// Add new/updated resources to the Resources map
Expand All @@ -334,6 +337,7 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
if currentResources.Items == nil {
// Fresh resources
currentResources.Items = make(map[string]VTMarshaledResource)
currentVersion = 0
}

for name, r := range resourcesUpserted {
Expand Down Expand Up @@ -367,22 +371,18 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
return cache.respondDeltaWatches(ctx, info, snapshot)
}
} else {
cache.mu.Unlock()
// Snapshot is not found. Create a new snapshot with these new resources.
resources := make(map[resource.Type][]types.ResourceWithTTL)
resources[typ] = make([]types.ResourceWithTTL, 0)
for _, r := range resourcesUpserted {
//if typ == resource.EndpointType {
// cla := r.Resource.(*endpoint.ClusterLoadAssignment)
// if len(cla.Endpoints) == 0 {
// log2.Info().Msgf("UpsertResources: Writing claname=%s endpoints=%d", cla.ClusterName, len(cla.Endpoints))
// }
//}
resources[typ] = append(resources[typ], *r)
}
s, err := NewSnapshotWithTTLs("0", resources)
if err != nil {
return err
}

cache.mu.Unlock()
err = cache.SetSnapshot(ctx, node, s)
if err != nil {
return err
Expand Down Expand Up @@ -914,9 +914,9 @@ func createResponse(ctx context.Context, request *Request, resources map[string]
}
}

// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) (func(), bool) {
nodeID := cache.hash.ID(request.GetNode())
fmt.Printf("CreateDeltaWatch node %s, typ %s\n", nodeID, request.GetTypeUrl())
t := request.GetTypeUrl()

cache.mu.Lock()
Expand All @@ -939,7 +939,6 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
// - a snapshot exists, but we failed to initialize its version map
// - we attempted to issue a response, but the caller is already up to date
delayedResponse := !exists
resourcesLength := 0
if exists {
err := snapshot.ConstructVersionMap()
if err != nil {
Expand All @@ -949,13 +948,8 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
if err != nil {
cache.log.Errorf("failed to respond with delta response: %s", err)
}

delayedResponse = response == nil
if !delayedResponse {
versionMap := snapshot.GetVersionMap(request.GetTypeUrl())
if versionMap != nil {
resourcesLength = len(versionMap)
}
}
}

if delayedResponse {
Expand All @@ -968,11 +962,10 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
}

info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})
return cache.cancelDeltaWatch(nodeID, watchID), true
} else {
watchID := cache.nextDeltaWatchID()
return cache.cancelDeltaWatch(nodeID, watchID), resourcesLength == 0
return cache.cancelDeltaWatch(nodeID, watchID), delayedResponse
}

return nil, false
}

func GetEnvoyNodeStr(node *core.Node) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses)
watch.cancel, _ = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses)
watches.deltaWatches[typeURL] = watch
}
}
Expand Down

0 comments on commit 06eb56f

Please sign in to comment.