diff --git a/CHANGELOG.md b/CHANGELOG.md index 1510350093..092a6e682d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags. - [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor. - [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying. +- [#4378](https://github.com/thanos-io/thanos/pull/4378) Implemented single-flight mechanism for the in-memory cache provider. It can be enabled by `single_flight: true`. It potentially greatly reduces the number of calls to remote object storage with very small locking overhead on each cache operation. Also, it means that the timeout of a given request becomes "attached" to a previous one. `thanos_cache_inmemory_singleflight_saved_calls_total` metric will show how many calls have been saved. ### Fixed diff --git a/pkg/cache/inmemory.go b/pkg/cache/inmemory.go index 6a6036a03a..e21b5c38f4 100644 --- a/pkg/cache/inmemory.go +++ b/pkg/cache/inmemory.go @@ -35,12 +35,26 @@ type InMemoryCacheConfig struct { MaxSize model.Bytes `yaml:"max_size"` // MaxItemSize represents maximum size of single item. MaxItemSize model.Bytes `yaml:"max_item_size"` + // Singleflight represents whether we should try to avoid cache stampede + // by only doing a request once. In practice, this means some very small + // locking on each cache operation, and that Fetch() context's deadline + // becomes "attached" to the other in-flight request. + Singleflight bool `yaml:"single_flight"` +} + +type pubsub struct { + listeners []chan []byte + originalCtx context.Context } type InMemoryCache struct { logger log.Logger maxSizeBytes uint64 maxItemSizeBytes uint64 + singleFlight bool + + subs map[string]*pubsub + mu sync.RWMutex mtx sync.Mutex curSize uint64 @@ -51,8 +65,10 @@ type InMemoryCache struct { hitsExpired prometheus.Counter // The input cache value would be copied to an inmemory array // instead of simply using the one sent by the caller. - added prometheus.Counter - current prometheus.Gauge + added prometheus.Counter + current prometheus.Gauge + singleflightsaved prometheus.Counter + currentSize prometheus.Gauge totalCurrentSize prometheus.Gauge overflow prometheus.Counter @@ -100,6 +116,16 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R logger: logger, maxSizeBytes: uint64(config.MaxSize), maxItemSizeBytes: uint64(config.MaxItemSize), + singleFlight: config.Singleflight, + } + + if config.Singleflight { + c.subs = make(map[string]*pubsub) + c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_cache_inmemory_singleflight_saved_calls_total", + Help: "Total number of calls saved by the singleflight mechanism.", + ConstLabels: prometheus.Labels{"name": name}, + }) } c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -289,6 +315,23 @@ func (c *InMemoryCache) reset() { func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { for key, val := range data { c.set(key, val, ttl) + + if c.singleFlight { + c.mu.Lock() + + if c.subs[key] == nil { + c.mu.Unlock() + continue + } + + for _, listener := range c.subs[key].listeners { + listener <- val + close(listener) + } + + delete(c.subs, key) + c.mu.Unlock() + } } } @@ -299,7 +342,32 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b for _, key := range keys { if b, ok := c.get(key); ok { results[key] = b + } else if c.singleFlight { + c.mu.Lock() + if c.subs[key] == nil { + c.subs[key] = &pubsub{originalCtx: ctx} + c.mu.Unlock() + } else { + if c.subs[key].originalCtx.Err() != nil { + c.subs[key] = &pubsub{originalCtx: ctx} + } + originalCtx := c.subs[key].originalCtx + respReceiver := make(chan []byte) + c.subs[key].listeners = append(c.subs[key].listeners, respReceiver) + c.mu.Unlock() + + select { + case b := <-respReceiver: + results[key] = b + c.singleflightsaved.Inc() + case <-ctx.Done(): + return results + case <-originalCtx.Done(): + continue + } + } } } + return results } diff --git a/pkg/cache/inmemory_test.go b/pkg/cache/inmemory_test.go index 17e3ce9504..622abbf3de 100644 --- a/pkg/cache/inmemory_test.go +++ b/pkg/cache/inmemory_test.go @@ -7,6 +7,8 @@ import ( "context" "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "testing" "time" @@ -15,6 +17,123 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) +// TestInmemorySingleflight tests whether the in flight mechanism works. +func TestInmemorySingleflight(t *testing.T) { + t.Parallel() + + conf := []byte(` +max_size: 1MB +max_item_size: 2KB +single_flight: true +`) + const testKey = "test" + + c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) + + ctx := context.Background() + + // Miss! Not blocking, we can continue. + hits := c.Fetch(ctx, []string{testKey}) + testutil.Assert(t, len(hits) == 0) + + g := &errgroup.Group{} + + g.Go(func() error { + // This blocks :( + hits := c.Fetch(ctx, []string{testKey}) + if len(hits) == 0 { + return errors.New("no hits") + } + return nil + }) + + time.Sleep(1 * time.Second) + // This unblocks the other goroutine. + c.Store(ctx, map[string][]byte{testKey: []byte("aa")}, 1*time.Minute) + + testutil.Ok(t, g.Wait()) + + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved)) +} + +// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works +// when a multiple key request comes. +func TestInmemorySingleflightMultipleKeys(t *testing.T) { + t.Parallel() + + conf := []byte(` +max_size: 1MB +max_item_size: 2KB +single_flight: true +`) + + c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) + + ctx, cancel := context.WithCancel(context.Background()) + + testKeys := []string{"test", "test2"} + + hits := c.Fetch(ctx, testKeys) + testutil.Assert(t, len(hits) == 0) + + g := &errgroup.Group{} + + g.Go(func() error { + hits := c.Fetch(context.Background(), testKeys) + if len(hits) != 1 { + return errors.New("expected to have 1 hit") + } + return nil + }) + + time.Sleep(1 * time.Second) + c.Store(context.Background(), map[string][]byte{testKeys[0]: []byte("foobar")}, 1*time.Minute) + time.Sleep(1 * time.Second) + cancel() + + testutil.Ok(t, g.Wait()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved)) +} + +// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works +// properly when Store() never comes. +func TestInmemorySingleflightInterrupted(t *testing.T) { + t.Parallel() + + conf := []byte(` +max_size: 1MB +max_item_size: 2KB +single_flight: true +`) + + c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) + + ctx, cancel := context.WithCancel(context.Background()) + + const testKey = "test" + + // Miss! Not blocking, we can continue. + hits := c.Fetch(ctx, []string{testKey}) + testutil.Assert(t, len(hits) == 0) + + g := &errgroup.Group{} + + g.Go(func() error { + // This blocks :( + hits := c.Fetch(ctx, []string{testKey}) + + if len(hits) != 0 { + return errors.New("got hits") + } + return nil + }) + cancel() + + time.Sleep(1 * time.Second) + testutil.Ok(t, g.Wait()) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved)) +} + func TestInmemoryCache(t *testing.T) { t.Parallel()