Skip to content

Commit

Permalink
Merge pull request #9 from ShareChat/v0.12.0-v2.9.0-beta.3
Browse files Browse the repository at this point in the history
V0.12.0 v2.9.0 beta.3
  • Loading branch information
krhitesh7 authored Sep 24, 2024
2 parents addca50 + 0c66590 commit ef534dd
Showing 1 changed file with 125 additions and 36 deletions.
161 changes: 125 additions & 36 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type SnapshotCache interface {
DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error

DrainResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error

UpdateVirtualHosts(ctx context.Context, node string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error
}

type snapshotCache struct {
Expand Down Expand Up @@ -390,55 +392,142 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
return nil
}

func (cache *snapshotCache) DeleteResources(ctx context.Context, _ string, typ string, resourcesToDeleted []string) error {
cache.mu.Lock()
defer cache.mu.Unlock()
func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error {
index := GetResponseType(typ)

fmt.Printf("local DeleteResources %v", resourcesToDeleted)
var wg sync.WaitGroup
for node, r := range resources {
wg.Add(1)
go func(node string, resourcesUpserted map[string]*types.ResourceWithTTL) {
cache.mu.Lock()
defer cache.mu.Unlock()
defer wg.Done()

resourceToDelete := resourcesToDeleted[0]
resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/")
serviceName := resourceToDeleteParts[4]
zone := resourceToDeleteParts[5]
portString := strings.Split(resourcesToDeleted[0], "_")[1]
claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString)
snapshot, ok := cache.snapshots[node]
if !ok {
return
}
prevResources := snapshot.(*Snapshot).Resources[index]
newResources := false
if prevResources.Items == nil {
newResources = true
prevResources.Items = make(map[string]types.ResourceWithTTL)
}
currentVersion := cache.ParseSystemVersionInfo(prevResources.Version)

for k, v := range prevResources.Items {
_, ok := resourcesUpserted[k]
if newResources && !ok {
prevResources.Items[k] = v
} else if !newResources && !ok {
delete(prevResources.Items, k)
}
}
for k, v := range resourcesUpserted {
_, ok := prevResources.Items[k]
if !ok {
prevResources.Items[k] = *v
}
}
currentVersion++
prevResources.Version = fmt.Sprintf("%d", currentVersion)

for node_, snapshot := range cache.snapshots {
currentResources := snapshot.(*Snapshot).Resources[types.Endpoint]
if rsc, found := currentResources.Items[claName]; found {
cla := rsc.Resource.(*endpoint.ClusterLoadAssignment)
for i, _ := range cla.Endpoints {
if cla.Endpoints[i].Locality.Zone == zone {
newEndpoints := make([]*endpoint.LbEndpoint, 0)
for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints {
if resourceToDelete == GetResourceName(lbEndpoint) {
continue
}
newEndpoints = append(newEndpoints, lbEndpoint)
}
// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot

cla.Endpoints[i].LbEndpoints = newEndpoints
// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
err := cache.respondDeltaWatches(ctx, info, snapshot)
if err != nil {
return
}
}
}(node, r)
}
wg.Wait()

currentResources.Items[claName] = types.ResourceWithTTL{
Resource: cla,
}
return nil
}

func (cache *snapshotCache) DeleteResources(ctx context.Context, node string, typ string, resourcesToDeleted []string) error {
cache.mu.Lock()
defer cache.mu.Unlock()

if typ == resource.ClusterType {
index := GetResponseType(typ)
snapshot := cache.snapshots[node]
prevResources := snapshot.(*Snapshot).Resources[index]
currentVersion := cache.ParseSystemVersionInfo(prevResources.Version)

for _, k := range resourcesToDeleted {
delete(prevResources.Items, k)
}

// Update
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot
prevResources.Version = fmt.Sprintf("%d", currentVersion)
// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot

// Respond deltas
if info, ok := cache.status[node_]; ok {
if info, ok := cache.status[node]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
info.mu.Unlock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}

} else if typ == resource.EndpointType {
resourceToDelete := resourcesToDeleted[0]
resourceToDeleteParts := strings.Split(resourcesToDeleted[0], "/")
serviceName := resourceToDeleteParts[4]
zone := resourceToDeleteParts[5]
portString := strings.Split(resourcesToDeleted[0], "_")[1]
claName := fmt.Sprintf("xdstp://nexus/%s/%s/%s", strings.Split(resource.EndpointType, "/")[1], serviceName, portString)

for node_, snapshot := range cache.snapshots {
currentResources := snapshot.(*Snapshot).Resources[types.Endpoint]
if rsc, found := currentResources.Items[claName]; found {
cla := rsc.Resource.(*endpoint.ClusterLoadAssignment)
for i, _ := range cla.Endpoints {
if cla.Endpoints[i].Locality.Zone == zone {
newEndpoints := make([]*endpoint.LbEndpoint, 0)
for _, lbEndpoint := range cla.Endpoints[i].LbEndpoints {
if resourceToDelete == GetResourceName(lbEndpoint) {
continue
}
newEndpoints = append(newEndpoints, lbEndpoint)
}

cla.Endpoints[i].LbEndpoints = newEndpoints
}
}

currentResources.Items[claName] = types.ResourceWithTTL{
Resource: cla,
}
}

// Update
currentVersion := cache.ParseSystemVersionInfo(currentResources.Version)
currentVersion++
currentResources.Version = fmt.Sprintf("%d", currentVersion)

snapshot.(*Snapshot).Resources[types.Endpoint] = currentResources
cache.snapshots[node_] = snapshot

// Respond deltas
if info, ok := cache.status[node_]; ok {
info.mu.Lock()
_ = cache.respondDeltaWatches(ctx, info, snapshot)
info.mu.Unlock()
}
}
}

Expand Down

0 comments on commit ef534dd

Please sign in to comment.