Skip to content

Commit

Permalink
Concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
krhitesh7 committed Jan 16, 2025
1 parent 3ca8faf commit d3c12da
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,32 +585,36 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
start := time.Now()
info.orderResponseDeltaWatches()
size := len(info.orderedDeltaWatches)
toDelete := make([]int64, 0)
wg := sync.WaitGroup{}
for _, k := range info.orderedDeltaWatches {
wg.Add(1)
go func(k key) {
watch := info.deltaWatches[k.ID]
go func(k key, w DeltaResponseWatch) {
defer wg.Done()
watch := info.deltaWatches[k.ID]
res, err := cache.respondDelta(
ctx,
snapshot,
watch.Request,
watch.Response,
watch.StreamState,
w.Request,
w.Response,
w.StreamState,
)
if err != nil {
return
}
// If we detect a nil response here, that means there has been no state change
// so we don't want to respond or remove any existing resource watches
if res != nil {
delete(info.deltaWatches, k.ID)
toDelete = append(toDelete, k.ID)
}
}(k)
}(k, watch)
}
wg.Wait()
for _, id := range toDelete {
delete(info.deltaWatches, id)
}
elapsed := time.Since(start)
fmt.Printf("respondDeltaWatches took %s for %d watches\n", elapsed, size)
fmt.Printf("respondDeltaWatches took %s for %d watches and node %s\n", elapsed, size, info.node.Id)
} else {
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
Expand Down

0 comments on commit d3c12da

Please sign in to comment.