diff --git a/pkg/cache/inmemory.go b/pkg/cache/inmemory.go index e21b5c38f4b..b45c43da8b6 100644 --- a/pkg/cache/inmemory.go +++ b/pkg/cache/inmemory.go @@ -65,9 +65,9 @@ type InMemoryCache struct { hitsExpired prometheus.Counter // The input cache value would be copied to an inmemory array // instead of simply using the one sent by the caller. - added prometheus.Counter - current prometheus.Gauge - singleflightsaved prometheus.Counter + added prometheus.Counter + current prometheus.Gauge + sfSaved prometheus.Counter currentSize prometheus.Gauge totalCurrentSize prometheus.Gauge @@ -121,7 +121,7 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R if config.Singleflight { c.subs = make(map[string]*pubsub) - c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + c.sfSaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_cache_inmemory_singleflight_saved_calls_total", Help: "Total number of calls saved by the singleflight mechanism.", ConstLabels: prometheus.Labels{"name": name}, @@ -342,29 +342,33 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b for _, key := range keys { if b, ok := c.get(key); ok { results[key] = b - } else if c.singleFlight { - c.mu.Lock() - if c.subs[key] == nil { + continue + } + if !c.singleFlight { + continue + } + + c.mu.Lock() + if c.subs[key] == nil { + c.subs[key] = &pubsub{originalCtx: ctx} + c.mu.Unlock() + } else { + if c.subs[key].originalCtx.Err() != nil { c.subs[key] = &pubsub{originalCtx: ctx} - c.mu.Unlock() - } else { - if c.subs[key].originalCtx.Err() != nil { - c.subs[key] = &pubsub{originalCtx: ctx} - } - originalCtx := c.subs[key].originalCtx - respReceiver := make(chan []byte) - c.subs[key].listeners = append(c.subs[key].listeners, respReceiver) - c.mu.Unlock() + } + originalCtx := c.subs[key].originalCtx + respReceiver := make(chan []byte) + c.subs[key].listeners = append(c.subs[key].listeners, respReceiver) + c.mu.Unlock() - select { - case b := <-respReceiver: - results[key] = b - c.singleflightsaved.Inc() - case <-ctx.Done(): - return results - case <-originalCtx.Done(): - continue - } + select { + case b := <-respReceiver: + results[key] = b + c.sfSaved.Inc() + case <-ctx.Done(): + return results + case <-originalCtx.Done(): + continue } } } diff --git a/pkg/cache/inmemory_test.go b/pkg/cache/inmemory_test.go index 887c2a348f7..281c1346e5f 100644 --- a/pkg/cache/inmemory_test.go +++ b/pkg/cache/inmemory_test.go @@ -55,7 +55,7 @@ single_flight: true testutil.Ok(t, g.Wait()) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved)) } // TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works @@ -94,7 +94,7 @@ single_flight: true cancel() testutil.Ok(t, g.Wait()) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved)) } // TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works @@ -133,7 +133,7 @@ single_flight: true time.Sleep(1 * time.Second) testutil.Ok(t, g.Wait()) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved)) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.sfSaved)) } func TestInmemoryCache(t *testing.T) {