Skip to content

Commit

Permalink
Update virtual hosts + delete resources
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhipsa20 committed Sep 20, 2024
1 parent 84600a0 commit 7c1d89e
Showing 1 changed file with 114 additions and 35 deletions.
149 changes: 114 additions & 35 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type SnapshotCache interface {
DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error

DrainResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error

UpdateVirtualHosts(ctx context.Context, node string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error
}

type snapshotCache struct {
Expand Down Expand Up @@ -365,55 +367,132 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
return nil
}

func (cache *snapshotCache) DeleteResources(ctx context.Context, _ string, typ string, resourcesToDeleted []string) error {
func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error {
cache.mu.Lock()
defer cache.mu.Unlock()

fmt.Printf("local DeleteResources %v", resourcesToDeleted)

resourceToDelete := resourcesToDeleted[0]
resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/")
serviceName := resourceToDeleteParts[4]
zone := resourceToDeleteParts[5]
portString := strings.Split(resourcesToDeleted[0], "_")[1]
claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString)
index := GetResponseType(typ)

for node_, snapshot := range cache.snapshots {
currentResources := snapshot.(*Snapshot).Resources[types.Endpoint]
if rsc, found := currentResources.Items[claName]; found {
cla := rsc.Resource.(*endpoint.ClusterLoadAssignment)
for i, _ := range cla.Endpoints {
if cla.Endpoints[i].Locality.Zone == zone {
newEndpoints := make([]*endpoint.LbEndpoint, 0)
for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints {
if resourceToDelete == GetResourceName(lbEndpoint) {
continue
}
newEndpoints = append(newEndpoints, lbEndpoint)
}
for node, r := range resources {
snapshot := cache.snapshots[node]
prevResources := snapshot.(*Snapshot).Resources[index]
newResources := false
if prevResources.Items == nil {
newResources = true
prevResources.Items = make(map[string]types.ResourceWithTTL)
}
currentVersion := cache.ParseSystemVersionInfo(prevResources.Version)

cla.Endpoints[i].LbEndpoints = newEndpoints
}
for k, v := range prevResources.Items {
_, ok := r[k]
if newResources && !ok {
prevResources.Items[k] = v
} else if !newResources && !ok {
delete(prevResources.Items, k)
}

currentResources.Items[claName] = types.ResourceWithTTL{
Resource: cla,
}
for k, v := range r {
_, ok := prevResources.Items[k]
if !ok {
prevResources.Items[k] = *v
}
}
currentVersion++
prevResources.Version = fmt.Sprintf("%d", currentVersion)

// Update
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot

snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot
// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}
}

return nil
}

func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error {
cache.mu.Lock()
defer cache.mu.Unlock()

fmt.Printf("local DeleteResources %v", resourcesToDeleted)

if typ == resource.ClusterType {
index := GetResponseType(typ)
snapshot := cache.snapshots[node]
prevResources := snapshot.(*Snapshot).Resources[index]
currentVersion := cache.ParseSystemVersionInfo(prevResources.Version)

for _, k := range resourcesToDeleted {
delete(prevResources.Items, k)
}

currentVersion++
prevResources.Version = fmt.Sprintf("%d", currentVersion)
// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot

// Respond deltas
if info, ok := cache.status[node_]; ok {
if info, ok := cache.status[node]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
info.mu.Unlock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}

} else if typ == resource.EndpointType {
resourceToDelete := resourcesToDeleted[0]
resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/")
serviceName := resourceToDeleteParts[4]
zone := resourceToDeleteParts[5]
portString := strings.Split(resourcesToDeleted[0], "_")[1]
claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString)

for node_, snapshot := range cache.snapshots {
currentResources := snapshot.(*Snapshot).Resources[types.Endpoint]
if rsc, found := currentResources.Items[claName]; found {
cla := rsc.Resource.(*endpoint.ClusterLoadAssignment)
for i, _ := range cla.Endpoints {
if cla.Endpoints[i].Locality.Zone == zone {
newEndpoints := make([]*endpoint.LbEndpoint, 0)
for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints {
if resourceToDelete == GetResourceName(lbEndpoint) {
continue
}
newEndpoints = append(newEndpoints, lbEndpoint)
}

cla.Endpoints[i].LbEndpoints = newEndpoints
}
}

currentResources.Items[claName] = types.ResourceWithTTL{
Resource: cla,
}
}

// Update
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot

// Respond deltas
if info, ok := cache.status[node_]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
info.mu.Unlock()
}
}
}

Expand Down

0 comments on commit 7c1d89e

Please sign in to comment.