Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: implement single-flight caching in the inmemory cache #4379

Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags.
- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor.
- [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying.
- [#4378](https://github.com/thanos-io/thanos/pull/4378) Implemented single-flight mechanism for the in-memory cache provider. It can be enabled by `single_flight: true`. It potentially greatly reduces the number of calls to remote object storage with very small locking overhead on each cache operation. Also, it means that the timeout of a given request becomes "attached" to a previous one. `thanos_cache_inmemory_singleflight_saved_calls_total` metric will show how many calls have been saved.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

### Fixed

Expand Down
72 changes: 70 additions & 2 deletions pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,26 @@ type InMemoryCacheConfig struct {
MaxSize model.Bytes `yaml:"max_size"`
// MaxItemSize represents maximum size of single item.
MaxItemSize model.Bytes `yaml:"max_item_size"`
// Singleflight represents whether we should try to avoid cache stampede
// by only doing a request once. In practice, this means some very small
// locking on each cache operation, and that Fetch() context's deadline
// becomes "attached" to the other in-flight request.
Singleflight bool `yaml:"single_flight"`
}

type pubsub struct {
listeners []chan []byte
originalCtx context.Context
}

type InMemoryCache struct {
logger log.Logger
maxSizeBytes uint64
maxItemSizeBytes uint64
singleFlight bool

subs map[string]*pubsub
mu sync.RWMutex

mtx sync.Mutex
curSize uint64
Expand All @@ -51,8 +65,10 @@ type InMemoryCache struct {
hitsExpired prometheus.Counter
// The input cache value would be copied to an inmemory array
// instead of simply using the one sent by the caller.
added prometheus.Counter
current prometheus.Gauge
added prometheus.Counter
current prometheus.Gauge
singleflightsaved prometheus.Counter
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unintentional newline?

currentSize prometheus.Gauge
totalCurrentSize prometheus.Gauge
overflow prometheus.Counter
Expand Down Expand Up @@ -100,6 +116,16 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R
logger: logger,
maxSizeBytes: uint64(config.MaxSize),
maxItemSizeBytes: uint64(config.MaxItemSize),
singleFlight: config.Singleflight,
}

if config.Singleflight {
c.subs = make(map[string]*pubsub)
c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_singleflight_saved_calls_total",
Help: "Total number of calls saved by the singleflight mechanism.",
ConstLabels: prometheus.Labels{"name": name},
})
}

c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -289,6 +315,23 @@ func (c *InMemoryCache) reset() {
func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this implemented on the InMemoryCache and not a remote cache? I would have thought that the benefits of this code would be much greater with a remote cache 🤔 Perhaps this is a 'test' implementation to get right before implementing the remote cache?

Copy link
Member Author

@GiedriusS GiedriusS Aug 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The answer probably is that popular remote caches do not have such functionality - as far as I can tell from my research, it is impossible to block until a key appears on Redis and/or Memcached. For example, here is one implementation: https://github.com/sqlalchemy/dogpile.cache/blob/dbf57e4a7b8f4061b735be8b24bbb880fb75802f/dogpile/cache/backends/memcached.py#L42

As you can see, the "locking" works by creating another key and then sleeping on it. No bueno. Something like this https://github.com/mailgun/groupcache needs to be used but before that mailgun/groupcache#9 should be implemented. For now, let's implement the same functionality in memory - these two features are completely orthogonal :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic: My attempt to add groupcache failed because of the drastic changes that needs to be introduced to cache interfaces. The Get issue actually would help to solve this. It would be amazing to migrate groupcache and simplify what we have (we can drop memcached support when we move).

for key, val := range data {
c.set(key, val, ttl)

if c.singleFlight {
c.mu.Lock()

if c.subs[key] == nil {
c.mu.Unlock()
continue
}

for _, listener := range c.subs[key].listeners {
listener <- val
close(listener)
}

delete(c.subs, key)
c.mu.Unlock()
}
}
}

Expand All @@ -299,7 +342,32 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b
for _, key := range keys {
if b, ok := c.get(key); ok {
results[key] = b
} else if c.singleFlight {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
c.mu.Lock()
if c.subs[key] == nil {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
}
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()

select {
case b := <-respReceiver:
results[key] = b
c.singleflightsaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
}
}
}
}

return results
}
119 changes: 119 additions & 0 deletions pkg/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"testing"
"time"
Expand All @@ -15,6 +17,123 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

// TestInmemorySingleflight tests whether the in flight mechanism works.
func TestInmemorySingleflight(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)
const testKey = "test"

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx := context.Background()

// Miss! Not blocking, we can continue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC - this is non-blocking because this is the first time we have called Fetch?

Is there a reason why the first does not block, but subsequent ones do? That feels like quite surprising behaviour from the caller's point of view 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah from the PR description

Since our code goes and does something when Fetch() misses, let's implement the
single-flight mode based on this assumption

If that is the case, then IMO those assumptions should be documented in comments 🙌

hits := c.Fetch(ctx, []string{testKey})
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(ctx, []string{testKey})
if len(hits) == 0 {
return errors.New("no hits")
}
return nil
})

time.Sleep(1 * time.Second)
// This unblocks the other goroutine.
c.Store(ctx, map[string][]byte{testKey: []byte("aa")}, 1*time.Minute)

testutil.Ok(t, g.Wait())

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
}

// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works
// when a multiple key request comes.
func TestInmemorySingleflightMultipleKeys(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx, cancel := context.WithCancel(context.Background())

testKeys := []string{"test", "test2"}

hits := c.Fetch(ctx, testKeys)
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
hits := c.Fetch(context.Background(), testKeys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hits := c.Fetch(context.Background(), testKeys)
// This blocks :(
hits := c.Fetch(context.Background(), testKeys)

To be consistent with the other test cases :)

if len(hits) != 1 {
return errors.New("expected to have 1 hit")
}
return nil
})

time.Sleep(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these sleeps? Magic sleeps always make me wonder what wasn't working before these were added 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To wait until the previous goroutine spawned and blocked on the Fetch() call. In practice, that will take only a few nano/milliseconds. It would be hard to implement some kind of synchronization primitive here because we are actually calling the caching code here hence a simple time.Sleep() has been added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time.Sleep(1 * time.Second)
// We need to wait until the previous goroutine has spawned and blocked on the Fetch() call. In practice, this will take only a few nano/milliseconds, so we wait for that to happen.
time.Sleep(1 * time.Second)

Ok - let's make that explicit then 👍

c.Store(context.Background(), map[string][]byte{testKeys[0]: []byte("foobar")}, 1*time.Minute)
time.Sleep(1 * time.Second)
cancel()

testutil.Ok(t, g.Wait())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
}

// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works
// properly when Store() never comes.
func TestInmemorySingleflightInterrupted(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx, cancel := context.WithCancel(context.Background())

const testKey = "test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does this need to be a const?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? It's constant and doesn't change. Ideally this would be a const slice but no such things exist in Go (:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha fair point - on the whole I only really see consts defined at the package-scope instead of in the function-scope. Not a blocker ¯\_(ツ)_/¯


// Miss! Not blocking, we can continue.
hits := c.Fetch(ctx, []string{testKey})
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(ctx, []string{testKey})

if len(hits) != 0 {
return errors.New("got hits")
}
return nil
})
cancel()

time.Sleep(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto - are these magic sleeps needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, because we want to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
time.Sleep(1 * time.Second)
// We need to wait until c.Fetch() has been called. In practice, we should only wait for a millisecond or so but a second is here to be on the safe side.
time.Sleep(1 * time.Second)

testutil.Ok(t, g.Wait())
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved))
}

func TestInmemoryCache(t *testing.T) {
t.Parallel()

Expand Down