From f7898c99d7dce3354414e4fa3012d8271aa1fb1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 28 May 2021 15:31:22 +0300 Subject: [PATCH 1/7] store: proxy: deduplicate same Series() requests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deduplicate same Series() requests occurring at the same moment. This is pretty crucial because query-frontend caches depending on the query itself and sometimes dashboads such as node_exporter might ask for the same series but with different functions. In the popular node_exporter dashboard I can count at least 9 different queries that use the same `node_cpu_seconds_total` albeit with different label selectors & functions. My tests show with this improvement show that indeed there is a lot of overlap: ``` thanos_proxy_store_deduplicated_stream_requests_total 633 ``` After opening the node_exporter dashboard a few times. Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 169 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 159 insertions(+), 10 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 74d4612ed0..2ad8ed1eef 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -28,6 +28,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -61,10 +62,23 @@ type ProxyStore struct { responseTimeout time.Duration metrics *proxyStoreMetrics + + // Request -> add yourself to list of listeners that are listening on that store+request. + // At the end, send the same data to each worker. + // Delete the request from the map at the end! + requestListeners map[string]*requestListenerVal + requestListenerMtx *sync.Mutex +} + +type requestListenerVal struct { + listeners []chan *storepb.SeriesResponse + listenerCnt int + sentFirstResponse bool } type proxyStoreMetrics struct { - emptyStreamResponses prometheus.Counter + emptyStreamResponses prometheus.Counter + deduplicatedStreamRequests prometheus.Counter } func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { @@ -75,6 +89,11 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { Help: "Total number of empty responses received.", }) + m.deduplicatedStreamRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_proxy_store_deduplicated_stream_requests_total", + Help: "How many requests we've avoided sending due to deduplication.", + }) + return &m } @@ -100,12 +119,14 @@ func NewProxyStore( metrics := newProxyStoreMetrics(reg) s := &ProxyStore{ - logger: logger, - stores: stores, - component: component, - selectorLabels: selectorLabels, - responseTimeout: responseTimeout, - metrics: metrics, + logger: logger, + stores: stores, + component: component, + selectorLabels: selectorLabels, + responseTimeout: responseTimeout, + metrics: metrics, + requestListenerMtx: &sync.Mutex{}, + requestListeners: make(map[string]*requestListenerVal), } return s } @@ -185,9 +206,137 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) { } } -// Series returns all series for a requested time range and label matcher. Requested series are taken from other -// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range. +type broadcastingSeriesServer struct { + cacheKey string + s *ProxyStore + srv storepb.Store_SeriesServer + setFirstResponse *sync.Once +} + +// Send is like a regular Send() but it fans out those responses to multiple channels. +func (b *broadcastingSeriesServer) Send(resp *storepb.SeriesResponse) error { + b.s.requestListenerMtx.Lock() + defer b.s.requestListenerMtx.Unlock() + + b.setFirstResponse.Do(func() { + b.s.requestListeners[b.cacheKey].sentFirstResponse = true + }) + + for _, listener := range b.s.requestListeners[b.cacheKey].listeners { + select { + case listener <- resp: + case <-b.Context().Done(): + for _, l := range b.s.requestListeners[b.cacheKey].listeners { + close(l) + } + return b.Context().Err() + } + } + return nil +} + +func (b *broadcastingSeriesServer) Context() context.Context { + return b.srv.Context() +} + +func (b *broadcastingSeriesServer) Close() { + b.s.requestListenerMtx.Lock() + defer b.s.requestListenerMtx.Unlock() + for _, l := range b.s.requestListeners[b.cacheKey].listeners { + close(l) + } +} + +func (b *broadcastingSeriesServer) RecvMsg(m interface{}) error { return b.srv.RecvMsg(m) } +func (b *broadcastingSeriesServer) SendMsg(m interface{}) error { return b.srv.SendMsg(m) } +func (b *broadcastingSeriesServer) SetHeader(m metadata.MD) error { return b.srv.SetHeader(m) } +func (b *broadcastingSeriesServer) SendHeader(m metadata.MD) error { return b.srv.SendHeader(m) } +func (b *broadcastingSeriesServer) SetTrailer(m metadata.MD) { b.srv.SetTrailer(m) } + +// Memoized version of realSeries() - it doesn't perform any Series() call unless such a request +// isn't happening already. This helps a lot in cases when a dashboard gets opened with lots +// of different queries that use the same metrics. func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + var ( + cacheKey string + shouldSendQuery bool + dataIn chan *storepb.SeriesResponse = make(chan *storepb.SeriesResponse) + cacheKeyID int + ) + stores := s.stores() + for _, st := range stores { + cacheKey += st.String() + } + cacheKey += r.String() + + // If already sending response back -> create a new cache key. + s.requestListenerMtx.Lock() + finalCacheKey := fmt.Sprintf("%s-%d", cacheKey, cacheKeyID) + if s.requestListeners[finalCacheKey] == nil { + s.requestListeners[finalCacheKey] = &requestListenerVal{} + } + if s.requestListeners[finalCacheKey].sentFirstResponse { + for { + cacheKeyID++ + finalCacheKey := fmt.Sprintf("%s-%d", cacheKey, cacheKeyID) + if s.requestListeners[finalCacheKey] == nil { + s.requestListeners[finalCacheKey] = &requestListenerVal{} + break + } + if !s.requestListeners[finalCacheKey].sentFirstResponse { + break + } + } + } + shouldSendQuery = s.requestListeners[finalCacheKey].listenerCnt == 0 + s.requestListeners[finalCacheKey].listenerCnt++ + s.requestListeners[finalCacheKey].listeners = append(s.requestListeners[finalCacheKey].listeners, dataIn) + s.requestListenerMtx.Unlock() + + g, _ := errgroup.WithContext(srv.Context()) + + bss := &broadcastingSeriesServer{ + finalCacheKey, + s, + srv, + &sync.Once{}, + } + + if shouldSendQuery { + g.Go(func() error { + return s.realSeries(stores, r, bss) + }) + } else { + s.metrics.deduplicatedStreamRequests.Inc() + } + + g.Go(func() error { + for din := range dataIn { + if err := srv.Send(din); err != nil { + return errors.Wrap(err, "sending cached Series() response") + } + } + return nil + }) + + if err := g.Wait(); err != nil { + return err + } + + s.requestListenerMtx.Lock() + s.requestListeners[finalCacheKey].listenerCnt-- + if s.requestListeners[finalCacheKey].listenerCnt == 0 { + delete(s.requestListeners, finalCacheKey) + } + s.requestListenerMtx.Unlock() + + return nil +} + +// realSeries returns all series for a requested time range and label matcher. Requested series are taken from other +// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range. +func (s *ProxyStore) realSeries(stores []Client, r *storepb.SeriesRequest, srv *broadcastingSeriesServer) error { + defer srv.Close() // TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be // tiggered by tracing span to reduce cognitive load. reqLogger := log.With(s.logger, "component", "proxy", "request", r.String()) @@ -234,7 +383,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe close(respCh) }() - for _, st := range s.stores() { + for _, st := range stores { // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(gctx, st, r.MinTime, r.MaxTime, matchers...); !ok { storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason)) From 2e0ef9fb1209f8e4cdbf9d469b482798f1c54de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 31 May 2021 12:29:38 +0300 Subject: [PATCH 2/7] store: add fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 2 +- pkg/store/proxy.go | 175 ++++++++++++++++++++++------------------ pkg/store/proxy_test.go | 21 +++-- 3 files changed, 112 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce256defea..0176285cae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re ### Changed -- +- [#4290](https://github.com/thanos-io/thanos/pull/4290) proxy: coalesce multiple requests for the same data; greatly improves performance when opening a dashboard without query-frontend where there are a lot of different panels (queries) asking for the same data ## [v0.21.0](https://github.com/thanos-io/thanos/releases/tag/v0.21.0) - 2021.05.28 diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 2ad8ed1eef..e7ce5f54c4 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -63,7 +64,7 @@ type ProxyStore struct { responseTimeout time.Duration metrics *proxyStoreMetrics - // Request -> add yourself to list of listeners that are listening on that store+request. + // Request -> add yourself to list of listeners that are listening on those stores+request. // At the end, send the same data to each worker. // Delete the request from the map at the end! requestListeners map[string]*requestListenerVal @@ -71,14 +72,12 @@ type ProxyStore struct { } type requestListenerVal struct { - listeners []chan *storepb.SeriesResponse - listenerCnt int - sentFirstResponse bool + listeners []chan *storepb.SeriesResponse } type proxyStoreMetrics struct { - emptyStreamResponses prometheus.Counter - deduplicatedStreamRequests prometheus.Counter + emptyStreamResponses prometheus.Counter + coalescedSeriesRequests prometheus.Counter } func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { @@ -89,9 +88,9 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { Help: "Total number of empty responses received.", }) - m.deduplicatedStreamRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_proxy_store_deduplicated_stream_requests_total", - Help: "How many requests we've avoided sending due to deduplication.", + m.coalescedSeriesRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_proxy_store_coalesced_series_requests_total", + Help: "How many Series() requests we've avoided sending due to coalescing.", }) return &m @@ -207,44 +206,94 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) { } type broadcastingSeriesServer struct { - cacheKey string - s *ProxyStore - srv storepb.Store_SeriesServer - setFirstResponse *sync.Once + ctx context.Context + + cacheKey string + s *ProxyStore + srv storepb.Store_SeriesServer + resps []*storepb.SeriesResponse } // Send is like a regular Send() but it fans out those responses to multiple channels. func (b *broadcastingSeriesServer) Send(resp *storepb.SeriesResponse) error { - b.s.requestListenerMtx.Lock() - defer b.s.requestListenerMtx.Unlock() - - b.setFirstResponse.Do(func() { - b.s.requestListeners[b.cacheKey].sentFirstResponse = true - }) - - for _, listener := range b.s.requestListeners[b.cacheKey].listeners { - select { - case listener <- resp: - case <-b.Context().Done(): - for _, l := range b.s.requestListeners[b.cacheKey].listeners { - close(l) - } - return b.Context().Err() - } - } + b.resps = append(b.resps, resp) return nil } func (b *broadcastingSeriesServer) Context() context.Context { - return b.srv.Context() + return b.ctx } -func (b *broadcastingSeriesServer) Close() { +// copySeriesResponse makes a copy of the given SeriesResponse if it is a Series. +// If not then the original response is returned. +func copySeriesResponse(r *storepb.SeriesResponse) *storepb.SeriesResponse { + originalSeries := r.GetSeries() + if originalSeries == nil { + return r + } + resp := &storepb.SeriesResponse{} + + newLabels := labels.Labels{} + for _, lbl := range originalSeries.Labels { + newLabels = append(newLabels, labels.Label{ + Name: lbl.Name, + Value: lbl.Value, + }) + } + + series := &storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(newLabels), + } + + if len(originalSeries.Chunks) > 0 { + chunks := make([]storepb.AggrChunk, len(originalSeries.Chunks)) + copy(chunks, originalSeries.Chunks) + series.Chunks = chunks + } + + resp.Result = &storepb.SeriesResponse_Series{ + Series: series, + } + + return resp +} + +func (b *broadcastingSeriesServer) Close() error { b.s.requestListenerMtx.Lock() defer b.s.requestListenerMtx.Unlock() - for _, l := range b.s.requestListeners[b.cacheKey].listeners { + defer func() { + delete(b.s.requestListeners, b.cacheKey) + }() + + for li, l := range b.s.requestListeners[b.cacheKey].listeners { + for _, resp := range b.resps { + // Make a copy here if it is sent to other listeners. + // This is because _a lot_ of upper-level code assumes + // that they have sole ownership of the series. + // For example, the replica labels might be different from query to query + // and deduplication happens in the upper layer. + // TODO(GiedriusS): remove this assumption. + if li > 0 { + resp = copySeriesResponse(resp) + } + + select { + case l <- resp: + case <-b.srv.Context().Done(): + err := b.srv.Context().Err() + for _, lc := range b.s.requestListeners[b.cacheKey].listeners { + select { + case lc <- storepb.NewWarnSeriesResponse(err): + default: + } + close(lc) + } + return b.srv.Context().Err() + } + } close(l) } + return nil } func (b *broadcastingSeriesServer) RecvMsg(m interface{}) error { return b.srv.RecvMsg(m) } @@ -261,7 +310,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe cacheKey string shouldSendQuery bool dataIn chan *storepb.SeriesResponse = make(chan *storepb.SeriesResponse) - cacheKeyID int ) stores := s.stores() for _, st := range stores { @@ -269,45 +317,29 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } cacheKey += r.String() - // If already sending response back -> create a new cache key. + g, gctx := errgroup.WithContext(srv.Context()) + s.requestListenerMtx.Lock() - finalCacheKey := fmt.Sprintf("%s-%d", cacheKey, cacheKeyID) - if s.requestListeners[finalCacheKey] == nil { - s.requestListeners[finalCacheKey] = &requestListenerVal{} - } - if s.requestListeners[finalCacheKey].sentFirstResponse { - for { - cacheKeyID++ - finalCacheKey := fmt.Sprintf("%s-%d", cacheKey, cacheKeyID) - if s.requestListeners[finalCacheKey] == nil { - s.requestListeners[finalCacheKey] = &requestListenerVal{} - break - } - if !s.requestListeners[finalCacheKey].sentFirstResponse { - break - } - } + if s.requestListeners[cacheKey] == nil { + s.requestListeners[cacheKey] = &requestListenerVal{} } - shouldSendQuery = s.requestListeners[finalCacheKey].listenerCnt == 0 - s.requestListeners[finalCacheKey].listenerCnt++ - s.requestListeners[finalCacheKey].listeners = append(s.requestListeners[finalCacheKey].listeners, dataIn) + shouldSendQuery = len(s.requestListeners[cacheKey].listeners) == 0 + s.requestListeners[cacheKey].listeners = append(s.requestListeners[cacheKey].listeners, dataIn) s.requestListenerMtx.Unlock() - g, _ := errgroup.WithContext(srv.Context()) - - bss := &broadcastingSeriesServer{ - finalCacheKey, - s, - srv, - &sync.Once{}, - } - if shouldSendQuery { + bss := &broadcastingSeriesServer{ + gctx, + cacheKey, + s, + srv, + []*storepb.SeriesResponse{}, + } g.Go(func() error { return s.realSeries(stores, r, bss) }) } else { - s.metrics.deduplicatedStreamRequests.Inc() + s.metrics.coalescedSeriesRequests.Inc() } g.Go(func() error { @@ -319,24 +351,13 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return nil }) - if err := g.Wait(); err != nil { - return err - } - - s.requestListenerMtx.Lock() - s.requestListeners[finalCacheKey].listenerCnt-- - if s.requestListeners[finalCacheKey].listenerCnt == 0 { - delete(s.requestListeners, finalCacheKey) - } - s.requestListenerMtx.Unlock() - - return nil + return g.Wait() } // realSeries returns all series for a requested time range and label matcher. Requested series are taken from other // stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range. func (s *ProxyStore) realSeries(stores []Client, r *storepb.SeriesRequest, srv *broadcastingSeriesServer) error { - defer srv.Close() + defer runutil.CloseWithLogOnErr(s.logger, srv, "closing broadcastingSeriesServer") // TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be // tiggered by tracing span to reduce cognitive load. reqLogger := log.With(s.logger, "component", "proxy", "request", r.String()) diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 366d16ee77..9d93b57543 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -12,6 +12,7 @@ import ( "math/rand" "os" "path/filepath" + "sync" "testing" "time" @@ -1742,10 +1743,12 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { logger := log.NewNopLogger() store := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + requestListeners: make(map[string]*requestListenerVal), + requestListenerMtx: &sync.Mutex{}, } var allResps []*storepb.SeriesResponse @@ -1863,10 +1866,12 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { logger := log.NewNopLogger() p := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + requestListeners: make(map[string]*requestListenerVal), + requestListenerMtx: &sync.Mutex{}, } t.Run("failling send", func(t *testing.T) { From 3293e91ac73d57678d816bc8e1239c18dd0e911a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 1 Jun 2021 12:39:07 +0300 Subject: [PATCH 3/7] proxy: set aggregations to some default value MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Set aggregations to some default value if only 0s resolution data has been requested because it doesn't matter in this case what aggregations we're asking for since at this resolution we only have RAW data. This potentially can lead to even more savings. Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index e7ce5f54c4..c404656f86 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -317,6 +317,13 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } cacheKey += r.String() + // For RAW data, it doesn't matter what aggregation has been set underneath + // so set the aggregations to some "default" value to save even more. + // TODO(GiedriusS): remove this once query push-down becomes a reality. + if r.MaxResolutionWindow == 0 { + r.Aggregates = []storepb.Aggr{storepb.Aggr_RAW} + } + g, gctx := errgroup.WithContext(srv.Context()) s.requestListenerMtx.Lock() From 7c1e0862dfd446c412674d9aeaa5a73072f5713e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 2 Jun 2021 15:46:22 +0300 Subject: [PATCH 4/7] proxy: use LRU, granular locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 174 +++++++++++++++++++++++++++------------- pkg/store/proxy_test.go | 36 ++++++--- 2 files changed, 141 insertions(+), 69 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index c404656f86..b7881caa85 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tracing" + lru "github.com/hashicorp/golang-lru/simplelru" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -35,6 +36,8 @@ import ( type ctxKey int +const rlkLRUSize = 250_000 + // StoreMatcherKey is the context key for the store's allow list. const StoreMatcherKey = ctxKey(0) @@ -67,12 +70,13 @@ type ProxyStore struct { // Request -> add yourself to list of listeners that are listening on those stores+request. // At the end, send the same data to each worker. // Delete the request from the map at the end! - requestListeners map[string]*requestListenerVal - requestListenerMtx *sync.Mutex + requestListenersLRU *lru.LRU + requestListenersLock *sync.Mutex } type requestListenerVal struct { listeners []chan *storepb.SeriesResponse + valLock *sync.Mutex } type proxyStoreMetrics struct { @@ -117,15 +121,16 @@ func NewProxyStore( } metrics := newProxyStoreMetrics(reg) + l, _ := lru.NewLRU(rlkLRUSize, nil) s := &ProxyStore{ - logger: logger, - stores: stores, - component: component, - selectorLabels: selectorLabels, - responseTimeout: responseTimeout, - metrics: metrics, - requestListenerMtx: &sync.Mutex{}, - requestListeners: make(map[string]*requestListenerVal), + logger: logger, + stores: stores, + component: component, + selectorLabels: selectorLabels, + responseTimeout: responseTimeout, + metrics: metrics, + requestListenersLRU: l, + requestListenersLock: &sync.Mutex{}, } return s } @@ -259,29 +264,38 @@ func copySeriesResponse(r *storepb.SeriesResponse) *storepb.SeriesResponse { } func (b *broadcastingSeriesServer) Close() error { - b.s.requestListenerMtx.Lock() - defer b.s.requestListenerMtx.Unlock() + val, ok := b.s.requestListenersLRU.Get(b.cacheKey) + if !ok { + return fmt.Errorf("%s key not found", b.cacheKey) + } + rlk := val.(*requestListenerVal) + + rlk.valLock.Lock() defer func() { - delete(b.s.requestListeners, b.cacheKey) + rlk.listeners = rlk.listeners[:0] + rlk.valLock.Unlock() }() - for li, l := range b.s.requestListeners[b.cacheKey].listeners { - for _, resp := range b.resps { - // Make a copy here if it is sent to other listeners. - // This is because _a lot_ of upper-level code assumes - // that they have sole ownership of the series. - // For example, the replica labels might be different from query to query - // and deduplication happens in the upper layer. - // TODO(GiedriusS): remove this assumption. - if li > 0 { - resp = copySeriesResponse(resp) + for li, l := range rlk.listeners { + if li > 0 { + wg := &sync.WaitGroup{} + for _, resp := range b.resps { + resp := resp + go func() { + resp = copySeriesResponse(resp) + select { + case l <- resp: + case <-b.srv.Context().Done(): + return + } + }() } - select { - case l <- resp: - case <-b.srv.Context().Done(): - err := b.srv.Context().Err() - for _, lc := range b.s.requestListeners[b.cacheKey].listeners { + wg.Wait() + + err := b.srv.Context().Err() + if err != nil { + for _, lc := range rlk.listeners { select { case lc <- storepb.NewWarnSeriesResponse(err): default: @@ -290,8 +304,26 @@ func (b *broadcastingSeriesServer) Close() error { } return b.srv.Context().Err() } + close(l) + + } else { + for _, resp := range b.resps { + select { + case l <- resp: + case <-b.srv.Context().Done(): + err := b.srv.Context().Err() + for _, lc := range rlk.listeners { + select { + case lc <- storepb.NewWarnSeriesResponse(err): + default: + } + close(lc) + } + return b.srv.Context().Err() + } + } + close(l) } - close(l) } return nil } @@ -302,42 +334,62 @@ func (b *broadcastingSeriesServer) SetHeader(m metadata.MD) error { return b.sr func (b *broadcastingSeriesServer) SendHeader(m metadata.MD) error { return b.srv.SendHeader(m) } func (b *broadcastingSeriesServer) SetTrailer(m metadata.MD) { b.srv.SetTrailer(m) } +func generateListenerKey(stores []Client, r *storepb.SeriesRequest) string { + var sb strings.Builder + + for _, st := range stores { + fmt.Fprint(&sb, st.String()) + } + + fmt.Fprintf(&sb, "%d%d%v%v%v%v%v", r.MaxTime, r.MinTime, r.Matchers, r.MaxResolutionWindow, r.PartialResponseStrategy, r.PartialResponseDisabled, r.Hints.String()) + + // For RAW data it doesn't matter what the aggregates are. + // TODO(GiedriusS): remove this once query push-down becomes a reality. + if r.MaxResolutionWindow != 0 { + fmt.Fprintf(&sb, "%v", r.Aggregates) + } + + return sb.String() +} + // Memoized version of realSeries() - it doesn't perform any Series() call unless such a request // isn't happening already. This helps a lot in cases when a dashboard gets opened with lots // of different queries that use the same metrics. func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { var ( - cacheKey string shouldSendQuery bool dataIn chan *storepb.SeriesResponse = make(chan *storepb.SeriesResponse) + ctx context.Context = srv.Context() + g *errgroup.Group ) stores := s.stores() - for _, st := range stores { - cacheKey += st.String() - } - cacheKey += r.String() + listenerKey := generateListenerKey(stores, r) - // For RAW data, it doesn't matter what aggregation has been set underneath - // so set the aggregations to some "default" value to save even more. - // TODO(GiedriusS): remove this once query push-down becomes a reality. - if r.MaxResolutionWindow == 0 { - r.Aggregates = []storepb.Aggr{storepb.Aggr_RAW} + s.requestListenersLock.Lock() + val, ok := s.requestListenersLRU.Get(listenerKey) + if !ok { + val = &requestListenerVal{ + valLock: &sync.Mutex{}, + } + s.requestListenersLRU.Add(listenerKey, val) } + s.requestListenersLock.Unlock() - g, gctx := errgroup.WithContext(srv.Context()) + rlk := val.(*requestListenerVal) - s.requestListenerMtx.Lock() - if s.requestListeners[cacheKey] == nil { - s.requestListeners[cacheKey] = &requestListenerVal{} - } - shouldSendQuery = len(s.requestListeners[cacheKey].listeners) == 0 - s.requestListeners[cacheKey].listeners = append(s.requestListeners[cacheKey].listeners, dataIn) - s.requestListenerMtx.Unlock() + rlk.valLock.Lock() + shouldSendQuery = len(rlk.listeners) == 0 + rlk.listeners = append(rlk.listeners, dataIn) + rlk.valLock.Unlock() if shouldSendQuery { + gr, gctx := errgroup.WithContext(ctx) + ctx = gctx + g = gr + bss := &broadcastingSeriesServer{ - gctx, - cacheKey, + ctx, + listenerKey, s, srv, []*storepb.SeriesResponse{}, @@ -349,16 +401,26 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe s.metrics.coalescedSeriesRequests.Inc() } - g.Go(func() error { - for din := range dataIn { - if err := srv.Send(din); err != nil { - return errors.Wrap(err, "sending cached Series() response") + if shouldSendQuery { + g.Go(func() error { + for din := range dataIn { + if err := srv.Send(din); err != nil { + return errors.Wrap(err, "sending cached Series() response") + } } + return nil + }) + + return g.Wait() + } + + for din := range dataIn { + if err := srv.Send(din); err != nil { + return errors.Wrap(err, "sending cached Series() response") } - return nil - }) + } + return nil - return g.Wait() } // realSeries returns all series for a requested time range and label matcher. Requested series are taken from other diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 9d93b57543..86a37f9b64 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -16,6 +16,8 @@ import ( "testing" "time" + lru "github.com/hashicorp/golang-lru/simplelru" + "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -1742,13 +1744,14 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { } logger := log.NewNopLogger() + l, _ := lru.NewLRU(rlkLRUSize, nil) store := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, - requestListeners: make(map[string]*requestListenerVal), - requestListenerMtx: &sync.Mutex{}, + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + requestListenersLRU: l, + requestListenersLock: &sync.Mutex{}, } var allResps []*storepb.SeriesResponse @@ -1773,7 +1776,13 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { } chunkLen := len(allResps[len(allResps)-1].GetSeries().Chunks) - maxTime := allResps[len(allResps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + var maxTime int64 + + if chunkLen == 0 { + maxTime = math.MaxInt64 + } else { + maxTime = allResps[len(allResps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + } storetestutil.TestServerSeries(t, store, &storetestutil.SeriesCase{ Name: fmt.Sprintf("%d client with %d samples, %d series each", numOfClients, samplesPerSeriesPerClient, seriesPerClient), @@ -1865,13 +1874,14 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { } logger := log.NewNopLogger() + l, _ := lru.NewLRU(rlkLRUSize, nil) p := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, - requestListeners: make(map[string]*requestListenerVal), - requestListenerMtx: &sync.Mutex{}, + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + requestListenersLRU: l, + requestListenersLock: &sync.Mutex{}, } t.Run("failling send", func(t *testing.T) { From de61a4ca620a0d35bd0969c321575b0e965fa566 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 2 Jun 2021 15:46:44 +0300 Subject: [PATCH 5/7] store: rename cacheKey MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index b7881caa85..67561f1a42 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -213,10 +213,10 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) { type broadcastingSeriesServer struct { ctx context.Context - cacheKey string - s *ProxyStore - srv storepb.Store_SeriesServer - resps []*storepb.SeriesResponse + listenerKey string + s *ProxyStore + srv storepb.Store_SeriesServer + resps []*storepb.SeriesResponse } // Send is like a regular Send() but it fans out those responses to multiple channels. @@ -264,9 +264,9 @@ func copySeriesResponse(r *storepb.SeriesResponse) *storepb.SeriesResponse { } func (b *broadcastingSeriesServer) Close() error { - val, ok := b.s.requestListenersLRU.Get(b.cacheKey) + val, ok := b.s.requestListenersLRU.Get(b.listenerKey) if !ok { - return fmt.Errorf("%s key not found", b.cacheKey) + return fmt.Errorf("%s key not found", b.listenerKey) } rlk := val.(*requestListenerVal) From a4f6eaa4f4373c33ec916b77aa51a5f8d82f4c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 2 Jun 2021 16:14:39 +0300 Subject: [PATCH 6/7] proxy: simplify MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 63 +++++++++++----------------------------------- 1 file changed, 15 insertions(+), 48 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 67561f1a42..d430d34cab 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -36,7 +36,8 @@ import ( type ctxKey int -const rlkLRUSize = 250_000 +// Seems good enough. In the worst case, there are going to be more allocations. +const rlkLRUSize = 1_000_000 // StoreMatcherKey is the context key for the store's allow list. const StoreMatcherKey = ctxKey(0) @@ -213,10 +214,9 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) { type broadcastingSeriesServer struct { ctx context.Context - listenerKey string - s *ProxyStore - srv storepb.Store_SeriesServer - resps []*storepb.SeriesResponse + rlk *requestListenerVal + srv storepb.Store_SeriesServer + resps []*storepb.SeriesResponse } // Send is like a regular Send() but it fans out those responses to multiple channels. @@ -264,11 +264,7 @@ func copySeriesResponse(r *storepb.SeriesResponse) *storepb.SeriesResponse { } func (b *broadcastingSeriesServer) Close() error { - val, ok := b.s.requestListenersLRU.Get(b.listenerKey) - if !ok { - return fmt.Errorf("%s key not found", b.listenerKey) - } - rlk := val.(*requestListenerVal) + rlk := b.rlk rlk.valLock.Lock() defer func() { @@ -277,24 +273,14 @@ func (b *broadcastingSeriesServer) Close() error { }() for li, l := range rlk.listeners { - if li > 0 { - wg := &sync.WaitGroup{} - for _, resp := range b.resps { - resp := resp - go func() { - resp = copySeriesResponse(resp) - select { - case l <- resp: - case <-b.srv.Context().Done(): - return - } - }() + for _, resp := range b.resps { + if li > 0 { + resp = copySeriesResponse(resp) } - - wg.Wait() - - err := b.srv.Context().Err() - if err != nil { + select { + case l <- resp: + case <-b.srv.Context().Done(): + err := b.srv.Context().Err() for _, lc := range rlk.listeners { select { case lc <- storepb.NewWarnSeriesResponse(err): @@ -304,26 +290,8 @@ func (b *broadcastingSeriesServer) Close() error { } return b.srv.Context().Err() } - close(l) - - } else { - for _, resp := range b.resps { - select { - case l <- resp: - case <-b.srv.Context().Done(): - err := b.srv.Context().Err() - for _, lc := range rlk.listeners { - select { - case lc <- storepb.NewWarnSeriesResponse(err): - default: - } - close(lc) - } - return b.srv.Context().Err() - } - } - close(l) } + close(l) } return nil } @@ -389,8 +357,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe bss := &broadcastingSeriesServer{ ctx, - listenerKey, - s, + rlk, srv, []*storepb.SeriesResponse{}, } From e39b97f8eb1c839b45877b9e3422405829c352d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 9 Sep 2021 22:30:51 +0300 Subject: [PATCH 7/7] store: add better key matching logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 46 ++++++++++++++++++++++++++++++------ pkg/store/proxy_test.go | 52 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 146a6588f8..5e7ae089e0 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -302,14 +302,18 @@ func (b *broadcastingSeriesServer) SetHeader(m metadata.MD) error { return b.sr func (b *broadcastingSeriesServer) SendHeader(m metadata.MD) error { return b.srv.SendHeader(m) } func (b *broadcastingSeriesServer) SetTrailer(m metadata.MD) { b.srv.SetTrailer(m) } -func generateListenerKey(stores []Client, r *storepb.SeriesRequest) string { +// findMostMatchingKey generates a most fitting listener key. Must be called under +// a lock. +func findMostMatchingKey(stores []Client, r *storepb.SeriesRequest, listeners *lru.LRU) string { var sb strings.Builder + const marker rune = 0xffff + for _, st := range stores { fmt.Fprint(&sb, st.String()) } - fmt.Fprintf(&sb, "%d%d%v%v%v%v%v", r.MaxTime, r.MinTime, r.Matchers, r.MaxResolutionWindow, r.PartialResponseStrategy, r.PartialResponseDisabled, r.Hints.String()) + fmt.Fprintf(&sb, "%d%d%v%v%v%v", r.MaxTime, r.MinTime, r.MaxResolutionWindow, r.PartialResponseStrategy, r.PartialResponseDisabled, r.Hints.String()) // For RAW data it doesn't matter what the aggregates are. // TODO(GiedriusS): remove this once query push-down becomes a reality. @@ -317,7 +321,37 @@ func generateListenerKey(stores []Client, r *storepb.SeriesRequest) string { fmt.Fprintf(&sb, "%v", r.Aggregates) } - return sb.String() + fmt.Fprintf(&sb, "%c", marker) + + markers := 0 + if len(r.Matchers) > 0 { + markers = len(r.Matchers) - 1 + } + markerPositions := make([]int, markers) + + for i, m := range r.Matchers { + if i > 0 { + markerPositions = append(markerPositions, sb.Len()) + } + fmt.Fprintf(&sb, "%s%c%s%c", m.Name, marker, m.Value, marker) + } + + _, ok := listeners.Get(sb.String()) + // Easy path - direct match. + if ok { + return sb.String() + } + + originalKey := sb.String() + + for _, markerPos := range markerPositions { + currentKey := originalKey[:markerPos] + _, ok := listeners.Get(currentKey) + if ok { + return currentKey + } + } + return originalKey } // Memoized version of realSeries() - it doesn't perform any Series() call unless such a request @@ -331,9 +365,9 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe g *errgroup.Group ) stores := s.stores() - listenerKey := generateListenerKey(stores, r) s.requestListenersLock.Lock() + listenerKey := findMostMatchingKey(stores, r, s.requestListenersLRU) val, ok := s.requestListenersLRU.Get(listenerKey) if !ok { val = &requestListenerVal{ @@ -351,9 +385,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe rlk.valLock.Unlock() if shouldSendQuery { - gr, gctx := errgroup.WithContext(ctx) - ctx = gctx - g = gr + g, ctx = errgroup.WithContext(ctx) bss := &broadcastingSeriesServer{ ctx, diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 86a37f9b64..17b6283e6f 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1913,3 +1913,55 @@ func TestProxyStore_storeMatchMetadata(t *testing.T) { testutil.Assert(t, ok) testutil.Equals(t, "", reason) } + +// TestProxyStore_MatchingKey tests that a subkey matches the upper +// key since it queries for a subset of the key. +func TestProxyStore_MatchingKey(t *testing.T) { + listeners, err := lru.NewLRU(100, nil) + testutil.Ok(t, err) + + testKey := findMostMatchingKey(nil, &storepb.SeriesRequest{ + MinTime: 123, + MaxTime: 123, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "test", + Value: "test", + }, + }, + }, listeners) + + listeners.Add(testKey, "test") + testSubkey := findMostMatchingKey(nil, &storepb.SeriesRequest{ + MinTime: 123, + MaxTime: 123, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "test", + Value: "test", + }, + { + Type: storepb.LabelMatcher_EQ, + Name: "test", + Value: "test123", + }, + }, + }, listeners) + + testNotMatchingKey := findMostMatchingKey(nil, &storepb.SeriesRequest{ + MinTime: 123, + MaxTime: 123, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "test", + Value: "test123", + }, + }, + }, listeners) + + testutil.Assert(t, testKey == testSubkey) + testutil.Assert(t, testNotMatchingKey != testKey) +}