Skip to content

Commit

Permalink
cache: implement single-flight caching in the inmemory cache
Browse files Browse the repository at this point in the history
Implement a single-flight mode for the inmemory cache. Since our code
goes and does something when Fetch() misses, let's implement the
single-flight mode based on this assumption - wait for the results of
the previous request and use it.

https://pkg.go.dev/golang.org/x/sync/singleflight is not used to avoid
complexity - there is no single function that we can call to get results
of some operation. Potentially it could be wrapped around the `Bucket`
interface but then the caching/remote object storage interfaces would be
muddled together and I really think we should avoid that.

Tested on a cluster and with unit tests.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Jun 28, 2021
1 parent bf96323 commit 052bfc3
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4327](https://github.com/thanos-io/thanos/pull/4327) Add environment variable substitution to all YAML configuration flags.
- [#4239](https://github.com/thanos-io/thanos/pull/4239) Add penalty based deduplication mode for compactor.
- [#4292](https://github.com/thanos-io/thanos/pull/4292) Receive: Enable exemplars ingestion and querying.
- [#4378](https://github.com/thanos-io/thanos/pull/4378) Implemented single-flight mechanism for the in-memory cache provider. It can be enabled by `single_flight: true`. It potentially greatly reduces the number of calls to remote object storage with very small locking overhead on each cache operation. Also, it means that the timeout of a given request becomes "attached" to a previous one. `thanos_cache_inmemory_singleflight_saved_calls_total` metric will show how many calls have been saved.

### Fixed

Expand Down
72 changes: 70 additions & 2 deletions pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,26 @@ type InMemoryCacheConfig struct {
MaxSize model.Bytes `yaml:"max_size"`
// MaxItemSize represents maximum size of single item.
MaxItemSize model.Bytes `yaml:"max_item_size"`
// Singleflight represents whether we should try to avoid cache stampede
// by only doing a request once. In practice, this means some very small
// locking on each cache operation, and that Fetch() context's deadline
// becomes "attached" to the other in-flight request.
Singleflight bool `yaml:"single_flight"`
}

type pubsub struct {
listeners []chan []byte
originalCtx context.Context
}

type InMemoryCache struct {
logger log.Logger
maxSizeBytes uint64
maxItemSizeBytes uint64
singleFlight bool

subs map[string]*pubsub
mu sync.RWMutex

mtx sync.Mutex
curSize uint64
Expand All @@ -51,8 +65,10 @@ type InMemoryCache struct {
hitsExpired prometheus.Counter
// The input cache value would be copied to an inmemory array
// instead of simply using the one sent by the caller.
added prometheus.Counter
current prometheus.Gauge
added prometheus.Counter
current prometheus.Gauge
singleflightsaved prometheus.Counter

currentSize prometheus.Gauge
totalCurrentSize prometheus.Gauge
overflow prometheus.Counter
Expand Down Expand Up @@ -100,6 +116,16 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R
logger: logger,
maxSizeBytes: uint64(config.MaxSize),
maxItemSizeBytes: uint64(config.MaxItemSize),
singleFlight: config.Singleflight,
}

if config.Singleflight {
c.subs = make(map[string]*pubsub)
c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_singleflight_saved_calls_total",
Help: "Total number of calls saved by the singleflight mechanism.",
ConstLabels: prometheus.Labels{"name": name},
})
}

c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -289,6 +315,23 @@ func (c *InMemoryCache) reset() {
func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
for key, val := range data {
c.set(key, val, ttl)

if c.singleFlight {
c.mu.Lock()

if c.subs[key] == nil {
c.mu.Unlock()
continue
}

for _, listener := range c.subs[key].listeners {
listener <- val
close(listener)
}

delete(c.subs, key)
c.mu.Unlock()
}
}
}

Expand All @@ -299,7 +342,32 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b
for _, key := range keys {
if b, ok := c.get(key); ok {
results[key] = b
} else if c.singleFlight {
c.mu.Lock()
if c.subs[key] == nil {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
}
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()

select {
case b := <-respReceiver:
results[key] = b
c.singleflightsaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
}
}
}
}

return results
}
119 changes: 119 additions & 0 deletions pkg/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"testing"
"time"
Expand All @@ -15,6 +17,123 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

// TestInmemorySingleflight tests whether the in flight mechanism works.
func TestInmemorySingleflight(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)
const testKey = "test"

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx := context.Background()

// Miss! Not blocking, we can continue.
hits := c.Fetch(ctx, []string{testKey})
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(ctx, []string{testKey})
if len(hits) == 0 {
return errors.New("no hits")
}
return nil
})

time.Sleep(1 * time.Second)
// This unblocks the other goroutine.
c.Store(ctx, map[string][]byte{testKey: []byte("aa")}, 1*time.Minute)

testutil.Ok(t, g.Wait())

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
}

// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works
// when a multiple key request comes.
func TestInmemorySingleflightMultipleKeys(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx, cancel := context.WithCancel(context.Background())

testKeys := []string{"test", "test2"}

hits := c.Fetch(ctx, testKeys)
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
hits := c.Fetch(context.Background(), testKeys)
if len(hits) != 1 {
return errors.New("expected to have 1 hit")
}
return nil
})

time.Sleep(1 * time.Second)
c.Store(context.Background(), map[string][]byte{testKeys[0]: []byte("foobar")}, 1*time.Minute)
time.Sleep(1 * time.Second)
cancel()

testutil.Ok(t, g.Wait())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
}

// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works
// properly when Store() never comes.
func TestInmemorySingleflightInterrupted(t *testing.T) {
t.Parallel()

conf := []byte(`
max_size: 1MB
max_item_size: 2KB
single_flight: true
`)

c, _ := NewInMemoryCache("test", log.NewNopLogger(), nil, conf)

ctx, cancel := context.WithCancel(context.Background())

const testKey = "test"

// Miss! Not blocking, we can continue.
hits := c.Fetch(ctx, []string{testKey})
testutil.Assert(t, len(hits) == 0)

g := &errgroup.Group{}

g.Go(func() error {
// This blocks :(
hits := c.Fetch(ctx, []string{testKey})

if len(hits) != 0 {
return errors.New("got hits")
}
return nil
})
cancel()

time.Sleep(1 * time.Second)
testutil.Ok(t, g.Wait())
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved))
}

func TestInmemoryCache(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 052bfc3

Please sign in to comment.