Skip to content

Commit

Permalink
added waitgroups
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhipsa20 committed Sep 20, 2024
1 parent 7c1d89e commit f566877
Showing 1 changed file with 47 additions and 35 deletions.
82 changes: 47 additions & 35 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,51 +368,63 @@ func (cache *snapshotCache) UpsertResources(ctx context.Context, node string, ty
}

func (cache *snapshotCache) UpdateVirtualHosts(ctx context.Context, _ string, typ string, resources map[string]map[string]*types.ResourceWithTTL) error {
cache.mu.Lock()
defer cache.mu.Unlock()

index := GetResponseType(typ)

var wg sync.WaitGroup
for node, r := range resources {
snapshot := cache.snapshots[node]
prevResources := snapshot.(*Snapshot).Resources[index]
newResources := false
if prevResources.Items == nil {
newResources = true
prevResources.Items = make(map[string]types.ResourceWithTTL)
}
currentVersion := cache.ParseSystemVersionInfo(prevResources.Version)
wg.Add(1)
go func(node string, resourcesUpserted map[string]*types.ResourceWithTTL) {
cache.mu.Lock()
defer cache.mu.Unlock()
defer wg.Done()

for k, v := range prevResources.Items {
_, ok := r[k]
if newResources && !ok {
prevResources.Items[k] = v
} else if !newResources && !ok {
delete(prevResources.Items, k)
}
}
for k, v := range r {
_, ok := prevResources.Items[k]
snapshot, ok := cache.snapshots[node]
if !ok {
prevResources.Items[k] = *v
return
}
}
currentVersion++
prevResources.Version = fmt.Sprintf("%d", currentVersion)
prevResources := snapshot.(*Snapshot).Resources[index]
newResources := false
if prevResources.Items == nil {
newResources = true
prevResources.Items = make(map[string]types.ResourceWithTTL)
}
currentVersion := cache.ParseSystemVersionInfo(prevResources.Version)

for k, v := range prevResources.Items {
_, ok := r[k]
if newResources && !ok {
prevResources.Items[k] = v
} else if !newResources && !ok {
delete(prevResources.Items, k)
}
}
for k, v := range r {
_, ok := prevResources.Items[k]
if !ok {
prevResources.Items[k] = *v
}
}
currentVersion++
prevResources.Version = fmt.Sprintf("%d", currentVersion)

// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot
// Update
snapshot.(*Snapshot).Resources[index] = prevResources
cache.snapshots[node] = snapshot

// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()
// Respond deltas
if info, ok := cache.status[node]; ok {
info.mu.Lock()
defer info.mu.Unlock()

// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}
// Respond to delta watches for the node.
err := cache.respondDeltaWatches(ctx, info, snapshot)
if err != nil {
return
}
}
}(node, r)
}
wg.Wait()

return nil
}
Expand Down

0 comments on commit f566877

Please sign in to comment.