diff --git a/pkg/cache/inmemory.go b/pkg/cache/inmemory.go index e21b5c38f4..e3156e02f1 100644 --- a/pkg/cache/inmemory.go +++ b/pkg/cache/inmemory.go @@ -65,9 +65,9 @@ type InMemoryCache struct { hitsExpired prometheus.Counter // The input cache value would be copied to an inmemory array // instead of simply using the one sent by the caller. - added prometheus.Counter - current prometheus.Gauge - singleflightsaved prometheus.Counter + added prometheus.Counter + current prometheus.Gauge + sfSaved prometheus.Counter currentSize prometheus.Gauge totalCurrentSize prometheus.Gauge @@ -121,7 +121,7 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R if config.Singleflight { c.subs = make(map[string]*pubsub) - c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + c.sfSaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_cache_inmemory_singleflight_saved_calls_total", Help: "Total number of calls saved by the singleflight mechanism.", ConstLabels: prometheus.Labels{"name": name}, @@ -315,23 +315,24 @@ func (c *InMemoryCache) reset() { func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { for key, val := range data { c.set(key, val, ttl) + if !c.singleFlight { + continue + } - if c.singleFlight { - c.mu.Lock() + c.mu.Lock() - if c.subs[key] == nil { - c.mu.Unlock() - continue - } - - for _, listener := range c.subs[key].listeners { - listener <- val - close(listener) - } - - delete(c.subs, key) + if c.subs[key] == nil { c.mu.Unlock() + continue + } + + for _, listener := range c.subs[key].listeners { + listener <- val + close(listener) } + + delete(c.subs, key) + c.mu.Unlock() } } @@ -342,29 +343,33 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b for _, key := range keys { if b, ok := c.get(key); ok { results[key] = b - } else if c.singleFlight { - c.mu.Lock() - if c.subs[key] == nil { + continue + } + if !c.singleFlight { + continue + } + + c.mu.Lock() + if c.subs[key] == nil { + c.subs[key] = &pubsub{originalCtx: ctx} + c.mu.Unlock() + } else { + if c.subs[key].originalCtx.Err() != nil { c.subs[key] = &pubsub{originalCtx: ctx} - c.mu.Unlock() - } else { - if c.subs[key].originalCtx.Err() != nil { - c.subs[key] = &pubsub{originalCtx: ctx} - } - originalCtx := c.subs[key].originalCtx - respReceiver := make(chan []byte) - c.subs[key].listeners = append(c.subs[key].listeners, respReceiver) - c.mu.Unlock() - - select { - case b := <-respReceiver: - results[key] = b - c.singleflightsaved.Inc() - case <-ctx.Done(): - return results - case <-originalCtx.Done(): - continue - } + } + originalCtx := c.subs[key].originalCtx + respReceiver := make(chan []byte) + c.subs[key].listeners = append(c.subs[key].listeners, respReceiver) + c.mu.Unlock() + + select { + case b := <-respReceiver: + results[key] = b + c.sfSaved.Inc() + case <-ctx.Done(): + return results + case <-originalCtx.Done(): + continue } } } diff --git a/pkg/cache/inmemory_test.go b/pkg/cache/inmemory_test.go index 887c2a348f..eede53fbba 100644 --- a/pkg/cache/inmemory_test.go +++ b/pkg/cache/inmemory_test.go @@ -55,7 +55,7 @@ single_flight: true testutil.Ok(t, g.Wait()) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved)) } // TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works @@ -94,7 +94,7 @@ single_flight: true cancel() testutil.Ok(t, g.Wait()) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved)) } // TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works @@ -133,7 +133,7 @@ single_flight: true time.Sleep(1 * time.Second) testutil.Ok(t, g.Wait()) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved)) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.sfSaved)) } func TestInmemoryCache(t *testing.T) { @@ -235,6 +235,24 @@ max_item_size: 1MB testutil.Equals(t, (*InMemoryCache)(nil), cache) } +func benchCacheGetSet(b *testing.B, cache *InMemoryCache, numKeys, concurrency int) { + wg := &sync.WaitGroup{} + + for k := 0; k < numKeys; k++ { + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for j := 0; j < b.N; j++ { + cache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)}) + cache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute) + } + }() + } + wg.Wait() + } +} + func BenchmarkInmemorySingleflight(b *testing.B) { conf := []byte(`max_size: 2KB max_item_size: 1KB`) @@ -248,46 +266,14 @@ single_flight: true`) sfCache, err := NewInMemoryCache("testsf", log.NewNopLogger(), nil, singleflightConf) testutil.Ok(b, err) - var _ = sfCache - for _, numKeys := range []int{100, 1000, 10000} { for _, concurrency := range []int{1, 5, 10} { - wg := &sync.WaitGroup{} - b.Run(fmt.Sprintf("inmemory_get_set_keys%d_c%d", numKeys, concurrency), func(b *testing.B) { - // b.N times get and set random number of numKeys keys. - for k := 0; k < numKeys; k++ { - wg.Add(concurrency) - for i := 0; i < concurrency; i++ { - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - cache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)}) - cache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute) - } - }() - } - wg.Wait() - } + benchCacheGetSet(b, cache, numKeys, concurrency) }) - wg = &sync.WaitGroup{} - b.Run(fmt.Sprintf("inmemory_singleflight_get_set_keys%d_conc%d", numKeys, concurrency), func(b *testing.B) { - // b.N times get and set random number of numKeys keys. - for k := 0; k < numKeys; k++ { - wg.Add(concurrency) - for i := 0; i < concurrency; i++ { - go func() { - defer wg.Done() - for j := 0; j < b.N; j++ { - sfCache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)}) - sfCache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute) - } - }() - } - wg.Wait() - } + benchCacheGetSet(b, sfCache, numKeys, concurrency) }) } }