From d3c12da94f756027cf9cb3800978d53584041a0d Mon Sep 17 00:00:00 2001 From: krhitesh7 Date: Thu, 16 Jan 2025 21:33:35 +0530 Subject: [PATCH] Concurrency --- pkg/cache/v3/simple.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 478309c95..ca2a162bb 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -585,18 +585,19 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu start := time.Now() info.orderResponseDeltaWatches() size := len(info.orderedDeltaWatches) + toDelete := make([]int64, 0) wg := sync.WaitGroup{} for _, k := range info.orderedDeltaWatches { wg.Add(1) - go func(k key) { + watch := info.deltaWatches[k.ID] + go func(k key, w DeltaResponseWatch) { defer wg.Done() - watch := info.deltaWatches[k.ID] res, err := cache.respondDelta( ctx, snapshot, - watch.Request, - watch.Response, - watch.StreamState, + w.Request, + w.Response, + w.StreamState, ) if err != nil { return @@ -604,13 +605,16 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu // If we detect a nil response here, that means there has been no state change // so we don't want to respond or remove any existing resource watches if res != nil { - delete(info.deltaWatches, k.ID) + toDelete = append(toDelete, k.ID) } - }(k) + }(k, watch) } wg.Wait() + for _, id := range toDelete { + delete(info.deltaWatches, id) + } elapsed := time.Since(start) - fmt.Printf("respondDeltaWatches took %s for %d watches\n", elapsed, size) + fmt.Printf("respondDeltaWatches took %s for %d watches and node %s\n", elapsed, size, info.node.Id) } else { for id, watch := range info.deltaWatches { res, err := cache.respondDelta(