diff --git a/pkg/cacheutil/memcached_client.go b/pkg/cacheutil/memcached_client.go index 8c10a9a874..54a596fb36 100644 --- a/pkg/cacheutil/memcached_client.go +++ b/pkg/cacheutil/memcached_client.go @@ -14,6 +14,7 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/golang/groupcache/singleflight" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -206,6 +207,8 @@ type memcachedClient struct { p *AsyncOperationProcessor setAsyncCircuitBreaker CircuitBreaker + + g singleflight.Group } // AddressProvider performs node address resolution given a list of clusters. @@ -388,11 +391,14 @@ func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) c.operations.WithLabelValues(opSet).Inc() err := c.setAsyncCircuitBreaker.Execute(func() error { - return c.client.Set(&memcache.Item{ - Key: key, - Value: value, - Expiration: int32(time.Now().Add(ttl).Unix()), + _, err := c.g.Do(key, func() (interface{}, error) { + return nil, c.client.Set(&memcache.Item{ + Key: key, + Value: value, + Expiration: int32(time.Now().Add(ttl).Unix()), + }) }) + return err }) if err != nil { if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index 09e664b4dc..19aa3cd29d 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/golang/groupcache/singleflight" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -160,6 +161,8 @@ type RedisClient struct { p *AsyncOperationProcessor setAsyncCircuitBreaker CircuitBreaker + + g singleflight.Group } // NewRedisClient makes a new RedisClient. @@ -262,7 +265,10 @@ func (c *RedisClient) SetAsync(key string, value []byte, ttl time.Duration) erro return c.p.EnqueueAsync(func() { start := time.Now() err := c.setAsyncCircuitBreaker.Execute(func() error { - return c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() + _, err := c.g.Do(key, func() (interface{}, error) { + return nil, c.client.Do(context.Background(), c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error() + }) + return err }) if err != nil { level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))