Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: implement single-flight caching in the inmemory cache #4379

Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
## Unreleased

### Added
- [#4378](https://github.com/thanos-io/thanos/pull/4378) Implemented single-flight mechanism for the in-memory cache provider. It can be enabled by `single_flight: true`. It greatly reduces the number of calls to remote object storage with very small locking overhead on each cache operation. Also, it means that the timeout of a given request becomes "attached" to a previous one. `thanos_cache_inmemory_singleflight_saved_calls_total` metric will show how many calls have been saved.
- [#4453](https://github.com/thanos-io/thanos/pull/4453) Tools: Add flag `--selector.relabel-config-file` / `--selector.relabel-config` / `--max-time` / `--min-time` to filter served blocks.
- [#4482](https://github.com/thanos-io/thanos/pull/4482) COS: Add http_config for cos object store client.

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
google.golang.org/api v0.46.0
google.golang.org/genproto v0.0.0-20210429181445-86c259c2b4ab
google.golang.org/grpc v1.37.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1664,8 +1664,9 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba h1:O8mE0/t419eoIwhTFpKVkHiTs/Igowgfkj25AcZrtiE=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
77 changes: 77 additions & 0 deletions pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,26 @@ type InMemoryCacheConfig struct {
MaxSize model.Bytes `yaml:"max_size"`
// MaxItemSize represents maximum size of single item.
MaxItemSize model.Bytes `yaml:"max_item_size"`
// Singleflight represents whether we should try to avoid cache stampede
// by only doing a request once. In practice, this means some very small
// locking on each cache operation, and that Fetch() context's deadline
// becomes "attached" to the other in-flight request.
Singleflight bool `yaml:"single_flight"`
}

type pubsub struct {
listeners []chan []byte
originalCtx context.Context
}

type InMemoryCache struct {
logger log.Logger
maxSizeBytes uint64
maxItemSizeBytes uint64
singleFlight bool

subs map[string]*pubsub
mu sync.RWMutex

mtx sync.Mutex
curSize uint64
Expand All @@ -53,6 +67,7 @@ type InMemoryCache struct {
// instead of simply using the one sent by the caller.
added prometheus.Counter
current prometheus.Gauge
sfSaved prometheus.Counter
currentSize prometheus.Gauge
totalCurrentSize prometheus.Gauge
overflow prometheus.Counter
Expand Down Expand Up @@ -100,6 +115,16 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R
logger: logger,
maxSizeBytes: uint64(config.MaxSize),
maxItemSizeBytes: uint64(config.MaxItemSize),
singleFlight: config.Singleflight,
}

if config.Singleflight {
c.subs = make(map[string]*pubsub)
c.sfSaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_singleflight_saved_calls_total",
Help: "Total number of calls saved by the singleflight mechanism.",
ConstLabels: prometheus.Labels{"name": name},
})
}

c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -289,6 +314,24 @@ func (c *InMemoryCache) reset() {
func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this implemented on the InMemoryCache and not a remote cache? I would have thought that the benefits of this code would be much greater with a remote cache 🤔 Perhaps this is a 'test' implementation to get right before implementing the remote cache?

Copy link
Member Author

@GiedriusS GiedriusS Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer probably is that popular remote caches do not have such functionality - as far as I can tell from my research, it is impossible to block until a key appears on Redis and/or Memcached. For example, here is one implementation: https://github.com/sqlalchemy/dogpile.cache/blob/dbf57e4a7b8f4061b735be8b24bbb880fb75802f/dogpile/cache/backends/memcached.py#L42

As you can see, the "locking" works by creating another key and then sleeping on it. No bueno. Something like this https://github.com/mailgun/groupcache needs to be used but before that mailgun/groupcache#9 should be implemented. For now, let's implement the same functionality in memory - these two features are completely orthogonal :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic: My attempt to add groupcache failed because of the drastic changes that needs to be introduced to cache interfaces. The Get issue actually would help to solve this. It would be amazing to migrate groupcache and simplify what we have (we can drop memcached support when we move).

for key, val := range data {
c.set(key, val, ttl)
if !c.singleFlight {
continue
}

c.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to be sharing this mutex across the read and write path? The naive argument would say - might this actually increase the time it takes to respond as we're waiting around a bunch.

Copy link
Member Author

@GiedriusS GiedriusS Jul 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading and writing from a map from more than one goroutine panics the whole program hence this lock is needed. Yes, the benchmarks show a negligible increase in the time it takes to respond however those extra nanoseconds are much smaller than requesting data from the remote object storage


if _, ok := c.subs[key]; !ok {
c.mu.Unlock()
continue
}

for _, listener := range c.subs[key].listeners {
listener <- val
close(listener)
}

delete(c.subs, key)
c.mu.Unlock()
}
}

Expand All @@ -299,7 +342,41 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b
for _, key := range keys {
if b, ok := c.get(key); ok {
results[key] = b
continue
}
if !c.singleFlight {
continue
}

// Singleflight works via contexts. If a context already exists and it has expired
// then it means that it is our turn to get the resource.
// If it has not expired yet then it means that we will be able to get results.
// In the worst case, the old context could expire after we check but that's OK,
// it only means that some data will need to be fetched.
c.mu.Lock()
if _, ok := c.subs[key]; !ok {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
}
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()

select {
case b := <-respReceiver:
results[key] = b
c.sfSaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

return results
}
170 changes: 170 additions & 0 deletions pkg/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ package cache

import (
"context"
"fmt"
"sync"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"testing"
"time"
Expand All @@ -15,6 +19,128 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

// TestInmemorySingleflight tests whether the in flight mechanism works.
func TestInmemorySingleflight(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)
const testKey = "test"

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx := context.Background()

// Miss! We only block on further Fetch() calls
// due to the single-flight mode. Thus, we can continue.
hits := c.Fetch(ctx, []string{testKey})
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(ctx, []string{testKey})
if len(hits) == 0 {
return errors.New("no hits")
}
return nil
})

// We need to wait until the previous goroutine has spawned and blocked on the Fetch() call. In practice, this will take only a few nano/milliseconds, so we wait for that to happen.
time.Sleep(1 * time.Second)
// This unblocks the other goroutine.
c.Store(ctx, map[string][]byte{testKey: []byte("aa")}, 1*time.Minute)

testutil.Ok(t, g.Wait())

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved))
}

// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works
// when a multiple key request comes.
func TestInmemorySingleflightMultipleKeys(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx, cancel := context.WithCancel(context.Background())

testKeys := []string{"test", "test2"}

hits := c.Fetch(ctx, testKeys)
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(context.Background(), testKeys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hits := c.Fetch(context.Background(), testKeys)
// This blocks :(
hits := c.Fetch(context.Background(), testKeys)

To be consistent with the other test cases :)

if len(hits) != 1 {
return errors.New("expected to have 1 hit")
}
return nil
})

// We need to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side.
time.Sleep(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these sleeps? Magic sleeps always make me wonder what wasn't working before these were added 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To wait until the previous goroutine spawned and blocked on the Fetch() call. In practice, that will take only a few nano/milliseconds. It would be hard to implement some kind of synchronization primitive here because we are actually calling the caching code here hence a simple time.Sleep() has been added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time.Sleep(1 * time.Second)
// We need to wait until the previous goroutine has spawned and blocked on the Fetch() call. In practice, this will take only a few nano/milliseconds, so we wait for that to happen.
time.Sleep(1 * time.Second)

Ok - let's make that explicit then 👍

c.Store(context.Background(), map[string][]byte{testKeys[0]: []byte("foobar")}, 1*time.Minute)
time.Sleep(1 * time.Second)
cancel()

testutil.Ok(t, g.Wait())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved))
}

// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works
// properly when Store() never comes.
func TestInmemorySingleflightInterrupted(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx, cancel := context.WithCancel(context.Background())

const testKey = "test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does this need to be a const?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? It's constant and doesn't change. Ideally this would be a const slice but no such things exist in Go (:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha fair point - on the whole I only really see consts defined at the package-scope instead of in the function-scope. Not a blocker ¯\_(ツ)_/¯


// Miss! We only block on further Fetch() calls
// due to the single-flight mode. Thus, we can continue.
hits := c.Fetch(ctx, []string{testKey})
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(ctx, []string{testKey})

if len(hits) != 0 {
return errors.New("got hits")
}
return nil
})
cancel()

time.Sleep(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto - are these magic sleeps needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, because we want to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time.Sleep(1 * time.Second)
// We need to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side.
time.Sleep(1 * time.Second)

testutil.Ok(t, g.Wait())
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.sfSaved))
}

func TestInmemoryCache(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -113,3 +239,47 @@ max_item_size: 1MB
testutil.NotOk(t, err)
testutil.Equals(t, (*InMemoryCache)(nil), cache)
}

func benchCacheGetSet(b *testing.B, cache *InMemoryCache, numKeys, concurrency int) {
wg := &sync.WaitGroup{}

for k := 0; k < numKeys; k++ {
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for j := 0; j < b.N; j++ {
cache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)})
cache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute)
}
}()
}
wg.Wait()
}
}

func BenchmarkInmemorySingleflight(b *testing.B) {
conf := []byte(`max_size: 2KB
max_item_size: 1KB`)
cache, err := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)
testutil.Ok(b, err)

singleflightConf := []byte(`
max_size: 2KB
max_item_size: 1KB
single_flight: true`)
sfCache, err := NewInMemoryCache("testsf", log.NewNopLogger(), nil, singleflightConf)
testutil.Ok(b, err)

for _, numKeys := range []int{100, 1000, 10000} {
for _, concurrency := range []int{1, 5, 10} {
b.Run(fmt.Sprintf("inmemory_get_set_keys%d_c%d", numKeys, concurrency), func(b *testing.B) {
benchCacheGetSet(b, cache, numKeys, concurrency)
})

b.Run(fmt.Sprintf("inmemory_singleflight_get_set_keys%d_conc%d", numKeys, concurrency), func(b *testing.B) {
benchCacheGetSet(b, sfCache, numKeys, concurrency)
})
}
}
}
Loading