Skip to content

Commit

Permalink
cache: make adjustments according to the comments
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Jun 30, 2021
1 parent 48c85ba commit 706f07b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
54 changes: 29 additions & 25 deletions pkg/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type InMemoryCache struct {
hitsExpired prometheus.Counter
// The input cache value would be copied to an inmemory array
// instead of simply using the one sent by the caller.
added prometheus.Counter
current prometheus.Gauge
singleflightsaved prometheus.Counter
added prometheus.Counter
current prometheus.Gauge
sfSaved prometheus.Counter

currentSize prometheus.Gauge
totalCurrentSize prometheus.Gauge
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.R

if config.Singleflight {
c.subs = make(map[string]*pubsub)
c.singleflightsaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
c.sfSaved = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_singleflight_saved_calls_total",
Help: "Total number of calls saved by the singleflight mechanism.",
ConstLabels: prometheus.Labels{"name": name},
Expand Down Expand Up @@ -342,29 +342,33 @@ func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]b
for _, key := range keys {
if b, ok := c.get(key); ok {
results[key] = b
} else if c.singleFlight {
c.mu.Lock()
if c.subs[key] == nil {
continue
}
if !c.singleFlight {
continue
}

c.mu.Lock()
if c.subs[key] == nil {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
c.mu.Unlock()
} else {
if c.subs[key].originalCtx.Err() != nil {
c.subs[key] = &pubsub{originalCtx: ctx}
}
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()
}
originalCtx := c.subs[key].originalCtx
respReceiver := make(chan []byte)
c.subs[key].listeners = append(c.subs[key].listeners, respReceiver)
c.mu.Unlock()

select {
case b := <-respReceiver:
results[key] = b
c.singleflightsaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
}
select {
case b := <-respReceiver:
results[key] = b
c.sfSaved.Inc()
case <-ctx.Done():
return results
case <-originalCtx.Done():
continue
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ single_flight: true

testutil.Ok(t, g.Wait())

testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved))
}

// TestInmemorySingleflightMultipleKeys tests whether single-flight mechanism works
Expand Down Expand Up @@ -94,7 +94,7 @@ single_flight: true
cancel()

testutil.Ok(t, g.Wait())
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.singleflightsaved))
testutil.Equals(t, 1.0, prom_testutil.ToFloat64(c.sfSaved))
}

// TestInmemorySingleflightInterrupted tests whether single-flight mechanism still works
Expand Down Expand Up @@ -133,7 +133,7 @@ single_flight: true

time.Sleep(1 * time.Second)
testutil.Ok(t, g.Wait())
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.singleflightsaved))
testutil.Equals(t, 0.0, prom_testutil.ToFloat64(c.sfSaved))
}

func TestInmemoryCache(t *testing.T) {
Expand Down

0 comments on commit 706f07b

Please sign in to comment.