diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index e29d411cc4..8d92841e32 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -102,6 +102,8 @@ type SnapshotCache interface { DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error DrainResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error + + UpdateVirtualHosts(ctx context.Context, node string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error } type snapshotCache struct { @@ -390,55 +392,142 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty return nil } -func (cache *snapshotCache) DeleteResources(ctx context.Context, _ string, typ string, resourcesToDeleted []string) error { - cache.mu.Lock() - defer cache.mu.Unlock() +func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error { + index := GetResponseType(typ) - fmt.Printf("local DeleteResources %v", resourcesToDeleted) + var wg sync.WaitGroup + for node, r := range resources { + wg.Add(1) + go func(node string, resourcesUpserted map[string]*types.ResourceWithTTL) { + cache.mu.Lock() + defer cache.mu.Unlock() + defer wg.Done() - resourceToDelete := resourcesToDeleted[0] - resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/") - serviceName := resourceToDeleteParts[4] - zone := resourceToDeleteParts[5] - portString := strings.Split(resourcesToDeleted[0], "_")[1] - claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) + snapshot, ok := cache.snapshots[node] + if !ok { + return + } + prevResources := snapshot.(*Snapshot).Resources[index] + newResources := false + if prevResources.Items == nil { + newResources = true + prevResources.Items = make(map[string]types.ResourceWithTTL) + } + currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) + + for k, v := range prevResources.Items { + _, ok := resourcesUpserted[k] + if newResources && !ok { + prevResources.Items[k] = v + } else if !newResources && !ok { + delete(prevResources.Items, k) + } + } + for k, v := range resourcesUpserted { + _, ok := prevResources.Items[k] + if !ok { + prevResources.Items[k] = *v + } + } + currentVersion++ + prevResources.Version = fmt.Sprintf("%d", currentVersion) - for node_, snapshot := range cache.snapshots { - currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] - if rsc, found := currentResources.Items[claName]; found { - cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) - for i, _ := range cla.Endpoints { - if cla.Endpoints[i].Locality.Zone == zone { - newEndpoints := make([]*endpoint.LbEndpoint, 0) - for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { - if resourceToDelete == GetResourceName(lbEndpoint) { - continue - } - newEndpoints = append(newEndpoints, lbEndpoint) - } + // Update + snapshot.(*Snapshot).Resources[index] = prevResources + cache.snapshots[node] = snapshot - cla.Endpoints[i].LbEndpoints = newEndpoints + // Respond deltas + if info, ok := cache.status[node]; ok { + info.mu.Lock() + defer info.mu.Unlock() + + // Respond to delta watches for the node. + err := cache.respondDeltaWatches(ctx, info, snapshot) + if err != nil { + return } } + }(node, r) + } + wg.Wait() - currentResources.Items[claName] = types.ResourceWithTTL{ - Resource: cla, - } + return nil +} + +func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error { + cache.mu.Lock() + defer cache.mu.Unlock() + + if typ == resource.ClusterType { + index := GetResponseType(typ) + snapshot := cache.snapshots[node] + prevResources := snapshot.(*Snapshot).Resources[index] + currentVersion := cache.ParseSystemVersionInfo(prevResources.Version) + + for _, k := range resourcesToDeleted { + delete(prevResources.Items, k) } - // Update - currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) currentVersion++ - currentResources.Version = fmt.Sprintf("%d", currentVersion) - - snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources - cache.snapshots[node_] = snapshot + prevResources.Version = fmt.Sprintf("%d", currentVersion) + // Update + snapshot.(*Snapshot).Resources[index] = prevResources + cache.snapshots[node] = snapshot // Respond deltas - if info, ok := cache.status[node_]; ok { + if info, ok := cache.status[node]; ok { info.mu.Lock() - _ = cache.respondDeltaWatches(ctx, info, snapshot) - info.mu.Unlock() + defer info.mu.Unlock() + + // Respond to delta watches for the node. + return cache.respondDeltaWatches(ctx, info, snapshot) + } + + } else if typ == resource.EndpointType { + resourceToDelete := resourcesToDeleted[0] + resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/") + serviceName := resourceToDeleteParts[4] + zone := resourceToDeleteParts[5] + portString := strings.Split(resourcesToDeleted[0], "_")[1] + claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString) + + for node_, snapshot := range cache.snapshots { + currentResources := snapshot.(*Snapshot).Resources[types.Endpoint] + if rsc, found := currentResources.Items[claName]; found { + cla := rsc.Resource.(*endpoint.ClusterLoadAssignment) + for i, _ := range cla.Endpoints { + if cla.Endpoints[i].Locality.Zone == zone { + newEndpoints := make([]*endpoint.LbEndpoint, 0) + for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints { + if resourceToDelete == GetResourceName(lbEndpoint) { + continue + } + newEndpoints = append(newEndpoints, lbEndpoint) + } + + cla.Endpoints[i].LbEndpoints = newEndpoints + } + } + + currentResources.Items[claName] = types.ResourceWithTTL{ + Resource: cla, + } + } + + // Update + currentVersion := cache.ParseSystemVersionInfo(currentResources.Version) + currentVersion++ + currentResources.Version = fmt.Sprintf("%d", currentVersion) + + snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources + cache.snapshots[node_] = snapshot + + // Respond deltas + if info, ok := cache.status[node_]; ok { + info.mu.Lock() + _ = cache.respondDeltaWatches(ctx, info, snapshot) + info.mu.Unlock() + } } }