-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cache: implement single-flight caching in the inmemory cache #4379
Changes from all commits
052bfc3
48c85ba
af0c1e8
371b99a
493701e
e661c86
cc3f9da
f3177fb
74aaa17
bf1facf
0ebd645
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,12 +35,26 @@ type InMemoryCacheConfig struct { | |
MaxSize model.Bytes `yaml:"max_size"` | ||
// MaxItemSize represents maximum size of single item. | ||
MaxItemSize model.Bytes `yaml:"max_item_size"` | ||
// Singleflight represents whether we should try to avoid cache stampede | ||
// by only doing a request once. In practice, this means some very small | ||
// locking on each cache operation, and that Fetch() context's deadline | ||
// becomes "attached" to the other in-flight request. | ||
Singleflight bool `yaml:"single_flight"` | ||
} | ||
|
||
type pubsub struct { | ||
listeners []chan []byte | ||
originalCtx context.Context | ||
} | ||
|
||
type InMemoryCache struct { | ||
logger log.Logger | ||
maxSizeBytes uint64 | ||
maxItemSizeBytes uint64 | ||
singleFlight bool | ||
|
||
subs map[string]*pubsub | ||
mu sync.RWMutex | ||
|
||
mtx sync.Mutex | ||
curSize uint64 | ||
|
@@ -53,6 +67,7 @@ type InMemoryCache struct { | |
// instead of simply using the one sent by the caller. | ||
added prometheus.Counter | ||
current prometheus.Gauge | ||
sfSaved prometheus.Counter | ||
currentSize prometheus.Gauge | ||
totalCurrentSize prometheus.Gauge | ||
overflow prometheus.Counter | ||
|
@@ -100,6 +115,16 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R | |
logger: logger, | ||
maxSizeBytes: uint64(config.MaxSize), | ||
maxItemSizeBytes: uint64(config.MaxItemSize), | ||
singleFlight: config.Singleflight, | ||
} | ||
|
||
if config.Singleflight { | ||
c.subs = make(map[string]*pubsub) | ||
c.sfSaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
Name: "thanos_cache_inmemory_singleflight_saved_calls_total", | ||
Help: "Total number of calls saved by the singleflight mechanism.", | ||
ConstLabels: prometheus.Labels{"name": name}, | ||
}) | ||
} | ||
|
||
c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{ | ||
|
@@ -289,6 +314,24 @@ func (c *InMemoryCache) reset() { | |
func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) { | ||
for key, val := range data { | ||
c.set(key, val, ttl) | ||
if !c.singleFlight { | ||
continue | ||
} | ||
|
||
c.mu.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it ok to be sharing this mutex across the read and write path? The naive argument would say - might this actually increase the time it takes to respond as we're waiting around a bunch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading and writing from a |
||
|
||
if _, ok := c.subs[key]; !ok { | ||
c.mu.Unlock() | ||
continue | ||
} | ||
|
||
for _, listener := range c.subs[key].listeners { | ||
listener <- val | ||
close(listener) | ||
} | ||
|
||
delete(c.subs, key) | ||
c.mu.Unlock() | ||
} | ||
} | ||
|
||
|
@@ -299,7 +342,41 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b | |
for _, key := range keys { | ||
if b, ok := c.get(key); ok { | ||
results[key] = b | ||
continue | ||
} | ||
if !c.singleFlight { | ||
continue | ||
} | ||
|
||
// Singleflight works via contexts. If a context already exists and it has expired | ||
// then it means that it is our turn to get the resource. | ||
// If it has not expired yet then it means that we will be able to get results. | ||
// In the worst case, the old context could expire after we check but that's OK, | ||
// it only means that some data will need to be fetched. | ||
c.mu.Lock() | ||
if _, ok := c.subs[key]; !ok { | ||
c.subs[key] = &pubsub{originalCtx: ctx} | ||
c.mu.Unlock() | ||
} else { | ||
if c.subs[key].originalCtx.Err() != nil { | ||
c.subs[key] = &pubsub{originalCtx: ctx} | ||
} | ||
GiedriusS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
originalCtx := c.subs[key].originalCtx | ||
respReceiver := make(chan []byte) | ||
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver) | ||
c.mu.Unlock() | ||
|
||
select { | ||
case b := <-respReceiver: | ||
results[key] = b | ||
c.sfSaved.Inc() | ||
case <-ctx.Done(): | ||
return results | ||
case <-originalCtx.Done(): | ||
continue | ||
GiedriusS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
|
||
return results | ||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -5,8 +5,12 @@ package cache | |||||||
|
||||||||
import ( | ||||||||
"context" | ||||||||
"fmt" | ||||||||
"sync" | ||||||||
|
||||||||
"github.com/go-kit/kit/log" | ||||||||
"github.com/pkg/errors" | ||||||||
"golang.org/x/sync/errgroup" | ||||||||
|
||||||||
"testing" | ||||||||
"time" | ||||||||
|
@@ -15,6 +19,128 @@ import ( | |||||||
"github.com/thanos-io/thanos/pkg/testutil" | ||||||||
) | ||||||||
|
||||||||
// TestInmemorySingleflight tests whether the in flight mechanism works. | ||||||||
func TestInmemorySingleflight(t *testing.T) { | ||||||||
t.Parallel() | ||||||||
|
||||||||
conf := []byte(` | ||||||||
max_size: 1MB | ||||||||
max_item_size: 2KB | ||||||||
single_flight: true | ||||||||
`) | ||||||||
const testKey = "test" | ||||||||
|
||||||||
c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) | ||||||||
|
||||||||
ctx := context.Background() | ||||||||
|
||||||||
// Miss! We only block on further Fetch() calls | ||||||||
// due to the single-flight mode. Thus, we can continue. | ||||||||
hits := c.Fetch(ctx, []string{testKey}) | ||||||||
testutil.Assert(t, len(hits) == 0) | ||||||||
|
||||||||
g := &errgroup.Group{} | ||||||||
|
||||||||
g.Go(func() error { | ||||||||
// This blocks :( | ||||||||
hits := c.Fetch(ctx, []string{testKey}) | ||||||||
if len(hits) == 0 { | ||||||||
return errors.New("no hits") | ||||||||
} | ||||||||
return nil | ||||||||
}) | ||||||||
|
||||||||
// We need to wait until the previous goroutine has spawned and blocked on the Fetch() call. In practice, this will take only a few nano/milliseconds, so we wait for that to happen. | ||||||||
time.Sleep(1 * time.Second) | ||||||||
// This unblocks the other goroutine. | ||||||||
c.Store(ctx, map[string][]byte{testKey: []byte("aa")}, 1*time.Minute) | ||||||||
|
||||||||
testutil.Ok(t, g.Wait()) | ||||||||
|
||||||||
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved)) | ||||||||
} | ||||||||
|
||||||||
// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works | ||||||||
// when a multiple key request comes. | ||||||||
func TestInmemorySingleflightMultipleKeys(t *testing.T) { | ||||||||
t.Parallel() | ||||||||
|
||||||||
conf := []byte(` | ||||||||
max_size: 1MB | ||||||||
max_item_size: 2KB | ||||||||
single_flight: true | ||||||||
`) | ||||||||
|
||||||||
c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) | ||||||||
|
||||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||||
|
||||||||
testKeys := []string{"test", "test2"} | ||||||||
|
||||||||
hits := c.Fetch(ctx, testKeys) | ||||||||
testutil.Assert(t, len(hits) == 0) | ||||||||
|
||||||||
g := &errgroup.Group{} | ||||||||
|
||||||||
g.Go(func() error { | ||||||||
// This blocks :( | ||||||||
hits := c.Fetch(context.Background(), testKeys) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
To be consistent with the other test cases :) |
||||||||
if len(hits) != 1 { | ||||||||
return errors.New("expected to have 1 hit") | ||||||||
} | ||||||||
return nil | ||||||||
}) | ||||||||
|
||||||||
// We need to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side. | ||||||||
time.Sleep(1 * time.Second) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need these sleeps? Magic sleeps always make me wonder what wasn't working before these were added 😅 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To wait until the previous goroutine spawned and blocked on the Fetch() call. In practice, that will take only a few nano/milliseconds. It would be hard to implement some kind of synchronization primitive here because we are actually calling the caching code here hence a simple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Ok - let's make that explicit then 👍 |
||||||||
c.Store(context.Background(), map[string][]byte{testKeys[0]: []byte("foobar")}, 1*time.Minute) | ||||||||
time.Sleep(1 * time.Second) | ||||||||
cancel() | ||||||||
|
||||||||
testutil.Ok(t, g.Wait()) | ||||||||
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved)) | ||||||||
} | ||||||||
|
||||||||
// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works | ||||||||
// properly when Store() never comes. | ||||||||
func TestInmemorySingleflightInterrupted(t *testing.T) { | ||||||||
t.Parallel() | ||||||||
|
||||||||
conf := []byte(` | ||||||||
max_size: 1MB | ||||||||
max_item_size: 2KB | ||||||||
single_flight: true | ||||||||
`) | ||||||||
|
||||||||
c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) | ||||||||
|
||||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||||
|
||||||||
const testKey = "test" | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: does this need to be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not? It's constant and doesn't change. Ideally this would be a const slice but no such things exist in Go (: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Haha fair point - on the whole I only really see |
||||||||
|
||||||||
// Miss! We only block on further Fetch() calls | ||||||||
// due to the single-flight mode. Thus, we can continue. | ||||||||
hits := c.Fetch(ctx, []string{testKey}) | ||||||||
testutil.Assert(t, len(hits) == 0) | ||||||||
|
||||||||
g := &errgroup.Group{} | ||||||||
|
||||||||
g.Go(func() error { | ||||||||
// This blocks :( | ||||||||
hits := c.Fetch(ctx, []string{testKey}) | ||||||||
|
||||||||
if len(hits) != 0 { | ||||||||
return errors.New("got hits") | ||||||||
} | ||||||||
return nil | ||||||||
}) | ||||||||
cancel() | ||||||||
|
||||||||
time.Sleep(1 * time.Second) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto - are these magic sleeps needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, because we want to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
testutil.Ok(t, g.Wait()) | ||||||||
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.sfSaved)) | ||||||||
} | ||||||||
|
||||||||
func TestInmemoryCache(t *testing.T) { | ||||||||
t.Parallel() | ||||||||
|
||||||||
|
@@ -113,3 +239,47 @@ max_item_size: 1MB | |||||||
testutil.NotOk(t, err) | ||||||||
testutil.Equals(t, (*InMemoryCache)(nil), cache) | ||||||||
} | ||||||||
|
||||||||
func benchCacheGetSet(b *testing.B, cache *InMemoryCache, numKeys, concurrency int) { | ||||||||
wg := &sync.WaitGroup{} | ||||||||
|
||||||||
for k := 0; k < numKeys; k++ { | ||||||||
wg.Add(concurrency) | ||||||||
for i := 0; i < concurrency; i++ { | ||||||||
go func() { | ||||||||
defer wg.Done() | ||||||||
for j := 0; j < b.N; j++ { | ||||||||
cache.Fetch(context.Background(), []string{fmt.Sprintf("%d", k)}) | ||||||||
cache.Store(context.Background(), map[string][]byte{fmt.Sprintf("%d", k): {}}, 1*time.Minute) | ||||||||
} | ||||||||
}() | ||||||||
} | ||||||||
wg.Wait() | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
func BenchmarkInmemorySingleflight(b *testing.B) { | ||||||||
conf := []byte(`max_size: 2KB | ||||||||
max_item_size: 1KB`) | ||||||||
cache, err := NewInMemoryCache("test", log.NewNopLogger(), nil, conf) | ||||||||
testutil.Ok(b, err) | ||||||||
|
||||||||
singleflightConf := []byte(` | ||||||||
max_size: 2KB | ||||||||
max_item_size: 1KB | ||||||||
single_flight: true`) | ||||||||
sfCache, err := NewInMemoryCache("testsf", log.NewNopLogger(), nil, singleflightConf) | ||||||||
testutil.Ok(b, err) | ||||||||
|
||||||||
for _, numKeys := range []int{100, 1000, 10000} { | ||||||||
for _, concurrency := range []int{1, 5, 10} { | ||||||||
b.Run(fmt.Sprintf("inmemory_get_set_keys%d_c%d", numKeys, concurrency), func(b *testing.B) { | ||||||||
benchCacheGetSet(b, cache, numKeys, concurrency) | ||||||||
}) | ||||||||
|
||||||||
b.Run(fmt.Sprintf("inmemory_singleflight_get_set_keys%d_conc%d", numKeys, concurrency), func(b *testing.B) { | ||||||||
benchCacheGetSet(b, sfCache, numKeys, concurrency) | ||||||||
}) | ||||||||
} | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this implemented on the
InMemoryCache
and not a remote cache? I would have thought that the benefits of this code would be much greater with a remote cache 🤔 Perhaps this is a 'test' implementation to get right before implementing the remote cache?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The answer probably is that popular remote caches do not have such functionality - as far as I can tell from my research, it is impossible to block until a key appears on Redis and/or Memcached. For example, here is one implementation: https://github.com/sqlalchemy/dogpile.cache/blob/dbf57e4a7b8f4061b735be8b24bbb880fb75802f/dogpile/cache/backends/memcached.py#L42
As you can see, the "locking" works by creating another key and then sleeping on it. No bueno. Something like this https://github.com/mailgun/groupcache needs to be used but before that mailgun/groupcache#9 should be implemented. For now, let's implement the same functionality in memory - these two features are completely orthogonal :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Off-topic: My attempt to add groupcache failed because of the drastic changes that needs to be introduced to cache interfaces. The
Get
issue actually would help to solve this. It would be amazing to migrate groupcache and simplify what we have (we can drop memcached support when we move).