From 7c1d89e3c6ea0381ab482f77d07f221dc0931120 Mon Sep 17 00:00:00 2001 From: Abhipsa20 Date: Fri, 20 Sep 2024 18:18:09 +0530 Subject: [PATCH 1/4] Update virtual hosts + delete resources --- pkg/cache/v3/simple.go | 149 +++++++++++++++++++++++++++++++---------- 1 file changed, 114 insertions(+), 35 deletions(-) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 91da738e27..44fac9bb9e 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -101,6 +101,8 @@ type SnapshotCache interface { DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error DrainResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error + + UpdateVirtualHosts(ctx context.Context, node string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error } type snapshotCache struct { @@ -365,55 +367,132 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty return nil } -func (cache *snapshotCache) DeleteResources(ctx context.Context, _ string, typ string, resourcesToDeleted []string) error { +func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error { cache.mu.Lock() defer cache.mu.Unlock() - fmt.Printf("local DeleteResources %v", resourcesToDeleted) - - resourceToDelete := resourcesToDeleted[0] - resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/") - serviceName := resourceToDeleteParts[4] - zone := resourceToDeleteParts[5] - portString := strings.Split(resourcesToDeleted[0], "_")[1] - claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) + index := GetResponseType(typ) - for node_, snapshot := range cache.snapshots { - currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] - if rsc, found := currentResources.Items[claName]; found { - cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) - for i, _ := range cla.Endpoints { - if cla.Endpoints[i].Locality.Zone == zone { - newEndpoints := make([]*endpoint.LbEndpoint, 0) - for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { - if resourceToDelete == GetResourceName(lbEndpoint) { - continue - } - newEndpoints = append(newEndpoints, lbEndpoint) - } + for node, r := range resources { + snapshot := cache.snapshots[node] + prevResources := snapshot.(*Snapshot).Resources[index] + newResources := false + if prevResources.Items == nil { + newResources = true + prevResources.Items = make(map[string]types.ResourceWithTTL) + } + currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) - cla.Endpoints[i].LbEndpoints = newEndpoints - } + for k, v := range prevResources.Items { + _, ok := r[k] + if newResources && !ok { + prevResources.Items[k] = v + } else if !newResources && !ok { + delete(prevResources.Items, k) } - - currentResources.Items[claName] = types.ResourceWithTTL{ - Resource: cla, + } + for k, v := range r { + _, ok := prevResources.Items[k] + if !ok { + prevResources.Items[k] = *v } } + currentVersion++ + prevResources.Version = fmt.Sprintf("%d", currentVersion) // Update - currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) - currentVersion++ - currentResources.Version = fmt.Sprintf("%d", currentVersion) + snapshot.(*Snapshot).Resources[index] = prevResources + cache.snapshots[node] = snapshot - snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources - cache.snapshots[node_] = snapshot + // Respond deltas + if info, ok := cache.status[node]; ok { + info.mu.Lock() + defer info.mu.Unlock() + + // Respond to delta watches for the node. + return cache.respondDeltaWatches(ctx, info, snapshot) + } + } + + return nil +} + +func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + fmt.Printf("local DeleteResources %v", resourcesToDeleted) + + if typ == resource.ClusterType { + index := GetResponseType(typ) + snapshot := cache.snapshots[node] + prevResources := snapshot.(*Snapshot).Resources[index] + currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) + + for _, k := range resourcesToDeleted { + delete(prevResources.Items, k) + } + + currentVersion++ + prevResources.Version = fmt.Sprintf("%d", currentVersion) + // Update + snapshot.(*Snapshot).Resources[index] = prevResources + cache.snapshots[node] = snapshot // Respond deltas - if info, ok := cache.status[node_]; ok { + if info, ok := cache.status[node]; ok { info.mu.Lock() - _ = cache.respondDeltaWatches(ctx, info, snapshot) - info.mu.Unlock() + defer info.mu.Unlock() + + // Respond to delta watches for the node. + return cache.respondDeltaWatches(ctx, info, snapshot) + } + + } else if typ == resource.EndpointType { + resourceToDelete := resourcesToDeleted[0] + resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/") + serviceName := resourceToDeleteParts[4] + zone := resourceToDeleteParts[5] + portString := strings.Split(resourcesToDeleted[0], "_")[1] + claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) + + for node_, snapshot := range cache.snapshots { + currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] + if rsc, found := currentResources.Items[claName]; found { + cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) + for i, _ := range cla.Endpoints { + if cla.Endpoints[i].Locality.Zone == zone { + newEndpoints := make([]*endpoint.LbEndpoint, 0) + for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { + if resourceToDelete == GetResourceName(lbEndpoint) { + continue + } + newEndpoints = append(newEndpoints, lbEndpoint) + } + + cla.Endpoints[i].LbEndpoints = newEndpoints + } + } + + currentResources.Items[claName] = types.ResourceWithTTL{ + Resource: cla, + } + } + + // Update + currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) + currentVersion++ + currentResources.Version = fmt.Sprintf("%d", currentVersion) + + snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources + cache.snapshots[node_] = snapshot + + // Respond deltas + if info, ok := cache.status[node_]; ok { + info.mu.Lock() + _ = cache.respondDeltaWatches(ctx, info, snapshot) + info.mu.Unlock() + } } } From f5668771bc58ade9a9b50661681b473362b10c03 Mon Sep 17 00:00:00 2001 From: Abhipsa20 Date: Fri, 20 Sep 2024 18:42:06 +0530 Subject: [PATCH 2/4] added waitgroups --- pkg/cache/v3/simple.go | 82 ++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 44fac9bb9e..04314207b5 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -368,51 +368,63 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty } func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error { - cache.mu.Lock() - defer cache.mu.Unlock() - index := GetResponseType(typ) + var wg sync.WaitGroup for node, r := range resources { - snapshot := cache.snapshots[node] - prevResources := snapshot.(*Snapshot).Resources[index] - newResources := false - if prevResources.Items == nil { - newResources = true - prevResources.Items = make(map[string]types.ResourceWithTTL) - } - currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) + wg.Add(1) + go func(node string, resourcesUpserted map[string]*types.ResourceWithTTL) { + cache.mu.Lock() + defer cache.mu.Unlock() + defer wg.Done() - for k, v := range prevResources.Items { - _, ok := r[k] - if newResources && !ok { - prevResources.Items[k] = v - } else if !newResources && !ok { - delete(prevResources.Items, k) - } - } - for k, v := range r { - _, ok := prevResources.Items[k] + snapshot, ok := cache.snapshots[node] if !ok { - prevResources.Items[k] = *v + return } - } - currentVersion++ - prevResources.Version = fmt.Sprintf("%d", currentVersion) + prevResources := snapshot.(*Snapshot).Resources[index] + newResources := false + if prevResources.Items == nil { + newResources = true + prevResources.Items = make(map[string]types.ResourceWithTTL) + } + currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) + + for k, v := range prevResources.Items { + _, ok := r[k] + if newResources && !ok { + prevResources.Items[k] = v + } else if !newResources && !ok { + delete(prevResources.Items, k) + } + } + for k, v := range r { + _, ok := prevResources.Items[k] + if !ok { + prevResources.Items[k] = *v + } + } + currentVersion++ + prevResources.Version = fmt.Sprintf("%d", currentVersion) - // Update - snapshot.(*Snapshot).Resources[index] = prevResources - cache.snapshots[node] = snapshot + // Update + snapshot.(*Snapshot).Resources[index] = prevResources + cache.snapshots[node] = snapshot - // Respond deltas - if info, ok := cache.status[node]; ok { - info.mu.Lock() - defer info.mu.Unlock() + // Respond deltas + if info, ok := cache.status[node]; ok { + info.mu.Lock() + defer info.mu.Unlock() - // Respond to delta watches for the node. - return cache.respondDeltaWatches(ctx, info, snapshot) - } + // Respond to delta watches for the node. + err := cache.respondDeltaWatches(ctx, info, snapshot) + if err != nil { + return + } + } + }(node, r) } + wg.Wait() return nil } From f3b8f521acfb8e4db5cb38366b5902944c7fda85 Mon Sep 17 00:00:00 2001 From: Abhipsa20 Date: Tue, 24 Sep 2024 11:40:56 +0530 Subject: [PATCH 3/4] debug --- pkg/cache/v3/simple.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 04314207b5..310c666364 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -391,14 +391,14 @@ func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, ty currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) for k, v := range prevResources.Items { - _, ok := r[k] + _, ok := resourcesUpserted[k] if newResources && !ok { prevResources.Items[k] = v } else if !newResources && !ok { delete(prevResources.Items, k) } } - for k, v := range r { + for k, v := range resourcesUpserted { _, ok := prevResources.Items[k] if !ok { prevResources.Items[k] = *v From 0c665904f03c15232a01d2482f5aa23ae28ca6cd Mon Sep 17 00:00:00 2001 From: Abhipsa20 Date: Tue, 24 Sep 2024 12:37:33 +0530 Subject: [PATCH 4/4] removed logs --- pkg/cache/v3/simple.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 310c666364..e7e3662197 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -433,8 +433,6 @@ func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, ty cache.mu.Lock() defer cache.mu.Unlock() - fmt.Printf("local DeleteResources %v", resourcesToDeleted) - if typ == resource.ClusterType { index := GetResponseType(typ) snapshot := cache.snapshots[node]