diff --git a/CHANGELOG.md b/CHANGELOG.md index cb965c19fd..a509e3c149 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. - [#7961](https://github.com/thanos-io/thanos/pull/7961) Store Gateway: Add `--store.posting-group-max-keys` flag to mark posting group as lazy if it exceeds number of keys limit. Added `thanos_bucket_store_lazy_expanded_posting_groups_total` for total number of lazy posting groups and corresponding reasons. - [#8000](https://github.com/thanos-io/thanos/pull/8000) Query: Bump promql-engine, pass partial response through options +- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receiver: introduce optional cache for matchers in series calls. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 0372e83690..e3724d65fe 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -27,12 +27,11 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/wlog" - "google.golang.org/grpc" - "gopkg.in/yaml.v2" - "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" objstoretracing "github.com/thanos-io/objstore/tracing/opentracing" + "google.golang.org/grpc" + "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" @@ -50,6 +49,7 @@ import ( grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tls" @@ -225,6 +225,15 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } + var cache = storecache.NewNoopMatcherCache() + if conf.matcherCacheSize > 0 { + cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg)) + if err != nil { + return errors.Wrap(err, "failed to create matchers cache") + } + multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache)) + } + dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -345,6 +354,7 @@ func runReceive( options := []store.ProxyStoreOption{ store.WithProxyStoreDebugLogging(debugLogging), + store.WithMatcherCache(cache), store.WithoutDedup(), } @@ -893,6 +903,8 @@ type receiveConfig struct { asyncForwardWorkerCount uint + matcherCacheSize int + featureList *[]string headExpandedPostingsCacheSize uint64 @@ -1046,6 +1058,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "about order."). Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) + cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) + rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) diff --git a/docs/components/receive.md b/docs/components/receive.md index fc0a64d98b..536b2d2cdb 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -407,6 +407,8 @@ Flags: --log.format=logfmt Log format to use. Possible options: logfmt or json. --log.level=info Log filtering level. + --matcher-cache-size=0 The size of the cache used for matching against + external labels. Using 0 disables caching. --objstore.config= Alternative to 'objstore.config-file' flag (mutually exclusive). Content of diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 8d1df8593c..cc19f43ab6 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -14,8 +14,10 @@ import ( "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil/custom" @@ -54,6 +56,8 @@ func TestQuerier_Proxy(t *testing.T) { files, err := filepath.Glob("testdata/promql/**/*.test") testutil.Ok(t, err) testutil.Equals(t, 10, len(files), "%v", files) + cache, err := storecache.NewMatchersCache() + testutil.Ok(t, err) logger := log.NewLogfmtLogger(os.Stderr) t.Run("proxy", func(t *testing.T) { @@ -62,7 +66,7 @@ func TestQuerier_Proxy(t *testing.T) { logger, nil, store.NewProxyStore(logger, nil, func() []store.Client { return sc.get() }, - component.Debug, nil, 5*time.Minute, store.EagerRetrieval), + component.Debug, nil, 5*time.Minute, store.EagerRetrieval, store.WithMatcherCache(cache)), 1000000, 5*time.Minute, ) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index f3a58e128f..5711c67be8 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -6,6 +6,7 @@ package receive import ( "bytes" "context" + goerrors "errors" "fmt" "io" "math" @@ -24,8 +25,6 @@ import ( "gopkg.in/yaml.v3" - goerrors "errors" - "github.com/alecthomas/units" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 2cfa967a2a..96e4e43345 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -18,16 +18,14 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" - "go.uber.org/atomic" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/objstore" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/api/status" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -39,6 +37,7 @@ import ( "github.com/thanos-io/thanos/pkg/receive/expandedpostingscache" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -64,6 +63,8 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig + matcherCache storecache.MatchersCache + tsdbClients []store.Client exemplarClients map[string]*exemplars.TSDB @@ -95,6 +96,12 @@ func WithBlockExpandedPostingsCacheSize(size uint64) MultiTSDBOption { } } +func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption { + return func(s *MultiTSDB) { + s.matcherCache = cache + } +} + // NewMultiTSDB creates new MultiTSDB. // NOTE: Passed labels must be sorted lexicographically (alphabetically). func NewMultiTSDB( @@ -127,6 +134,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -755,10 +763,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - options := []store.TSDBStoreOption{} + var options []store.TSDBStoreOption if t.metricNameFilterEnabled { options = append(options, store.WithCuckooMetricNameStoreFilter()) } + if t.matcherCache != nil { + options = append(options, store.WithMatcherCacheInstance(t.matcherCache)) + } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index a36db4b402..eb1a999c94 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -46,21 +46,14 @@ func TestMultiTSDB(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) t.Run("run fresh", func(t *testing.T) { - m := NewMultiTSDB( - dir, logger, prometheus.NewRegistry(), &tsdb.Options{ - MinBlockDuration: (2 * time.Hour).Milliseconds(), - MaxBlockDuration: (2 * time.Hour).Milliseconds(), - RetentionDuration: (6 * time.Hour).Milliseconds(), - NoLockfile: true, - MaxExemplars: 100, - EnableExemplarStorage: true, - }, - labels.FromStrings("replica", "01"), - "tenant_id", - nil, - false, - metadata.NoneFunc, - ) + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + MaxExemplars: 100, + EnableExemplarStorage: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc) defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) @@ -175,19 +168,12 @@ func TestMultiTSDB(t *testing.T) { t.Run("flush with one sample produces a block", func(t *testing.T) { const testTenant = "test_tenant" - m := NewMultiTSDB( - dir, logger, prometheus.NewRegistry(), &tsdb.Options{ - MinBlockDuration: (2 * time.Hour).Milliseconds(), - MaxBlockDuration: (2 * time.Hour).Milliseconds(), - RetentionDuration: (6 * time.Hour).Milliseconds(), - NoLockfile: true, - }, - labels.FromStrings("replica", "01"), - "tenant_id", - nil, - false, - metadata.NoneFunc, - ) + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc) defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index bf38cb06ed..1ab909fd5f 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -8,13 +8,12 @@ import ( "time" "github.com/go-kit/log" - "github.com/stretchr/testify/require" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" - + "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 2db5e6a341..434512c694 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -10,8 +10,6 @@ import ( "testing" "time" - "github.com/thanos-io/thanos/pkg/receive/writecapnp" - "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" @@ -24,6 +22,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/receive/writecapnp" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" diff --git a/pkg/store/cache/matchers_cache.go b/pkg/store/cache/matchers_cache.go new file mode 100644 index 0000000000..fe4fd4c668 --- /dev/null +++ b/pkg/store/cache/matchers_cache.go @@ -0,0 +1,186 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + "fmt" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/singleflight" + + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" +) + +const DefaultCacheSize = 200 + +type NewItemFunc func(matcher ConversionLabelMatcher) (*labels.Matcher, error) + +type MatchersCache interface { + // GetOrSet retrieves a matcher from cache or creates and stores it if not present. + // If the matcher is not in cache, it uses the provided newItem function to create it. + GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) +} + +// Ensure implementations satisfy the interface. +var ( + _ MatchersCache = (*LruMatchersCache)(nil) + _ MatchersCache = (*NoopMatcherCache)(nil) +) + +// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything. +type NoopMatcherCache struct{} + +// NewNoopMatcherCache creates a new no-op matcher cache. +func NewNoopMatcherCache() MatchersCache { + return &NoopMatcherCache{} +} + +// GetOrSet implements MatchersCache by always creating a new matcher without caching. +func (n *NoopMatcherCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + return newItem(key) +} + +// LruMatchersCache implements MatchersCache with an LRU cache and metrics. +type LruMatchersCache struct { + reg prometheus.Registerer + cache *lru.Cache[ConversionLabelMatcher, *labels.Matcher] + metrics *matcherCacheMetrics + size int + sf singleflight.Group +} + +type MatcherCacheOption func(*LruMatchersCache) + +func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.reg = reg + } +} + +func WithSize(size int) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.size = size + } +} + +func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { + cache := &LruMatchersCache{ + size: DefaultCacheSize, + } + + for _, opt := range opts { + opt(cache) + } + cache.metrics = newMatcherCacheMetrics(cache.reg) + + lruCache, err := lru.NewWithEvict[ConversionLabelMatcher, *labels.Matcher](cache.size, cache.onEvict) + if err != nil { + return nil, err + } + cache.cache = lruCache + + return cache, nil +} + +func (c *LruMatchersCache) GetOrSet(key ConversionLabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + c.metrics.requestsTotal.Inc() + + v, err, _ := c.sf.Do(key.String(), func() (interface{}, error) { + if item, ok := c.cache.Get(key); ok { + c.metrics.hitsTotal.Inc() + return item, nil + } + + item, err := newItem(key) + if err != nil { + return nil, err + } + c.cache.Add(key, item) + c.metrics.numItems.Set(float64(c.cache.Len())) + return item, nil + }) + + if err != nil { + return nil, err + } + return v.(*labels.Matcher), nil +} + +func (c *LruMatchersCache) onEvict(_ ConversionLabelMatcher, _ *labels.Matcher) { + c.metrics.evicted.Inc() + c.metrics.numItems.Set(float64(c.cache.Len())) +} + +type matcherCacheMetrics struct { + requestsTotal prometheus.Counter + hitsTotal prometheus.Counter + numItems prometheus.Gauge + maxItems prometheus.Gauge + evicted prometheus.Counter +} + +func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { + return &matcherCacheMetrics{ + requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_requests_total", + Help: "Total number of cache requests for series matchers", + }), + hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_hits_total", + Help: "Total number of cache hits for series matchers", + }), + numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_items", + Help: "Total number of cached items", + }), + maxItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_max_items", + Help: "Maximum number of items that can be cached", + }), + evicted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_evicted_total", + Help: "Total number of items evicted from the cache", + }), + } +} + +// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. +// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. +// NOTE: It allocates memory. +func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for i := range ms { + pm, err := cache.GetOrSet(&ms[i], MatcherToPromMatcher) + if err != nil { + return nil, err + } + res = append(res, pm) + } + return res, nil +} + +func MatcherToPromMatcher(m ConversionLabelMatcher) (*labels.Matcher, error) { + mi, ok := m.(*storepb.LabelMatcher) + if !ok { + return nil, fmt.Errorf("invalid matcher type. Got: %T", m) + } + + return storepb.MatcherToPromMatcher(*mi) +} + +// ConversionLabelMatcher is a common interface for the Prometheus and Thanos label matchers. +type ConversionLabelMatcher interface { + String() string + GetName() string + GetValue() string +} + +var ( + _ ConversionLabelMatcher = (*storepb.LabelMatcher)(nil) + _ ConversionLabelMatcher = (*prompb.LabelMatcher)(nil) +) diff --git a/pkg/store/cache/matchers_cache_test.go b/pkg/store/cache/matchers_cache_test.go new file mode 100644 index 0000000000..957c10d9ff --- /dev/null +++ b/pkg/store/cache/matchers_cache_test.go @@ -0,0 +1,112 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache_test + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" + + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func TestMatchersCache(t *testing.T) { + cache, err := storecache.NewMatchersCache(storecache.WithSize(2)) + testutil.Ok(t, err) + + matcher := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key", + Value: "val", + } + + matcher2 := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_RE, + Name: "key2", + Value: "val2|val3", + } + + matcher3 := &storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key3", + Value: "val3", + } + + var cacheHit bool + newItem := func(matcher storecache.ConversionLabelMatcher) (*labels.Matcher, error) { + cacheHit = false + return storecache.MatcherToPromMatcher(matcher) + } + expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val") + expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3") + expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3") + + item, err := cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher3, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected3, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) +} + +func BenchmarkMatchersCache(b *testing.B) { + cache, err := storecache.NewMatchersCache(storecache.WithSize(100)) + if err != nil { + b.Fatalf("failed to create cache: %v", err) + } + + matchers := []*storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "key1", Value: "val1"}, + {Type: storepb.LabelMatcher_EQ, Name: "key2", Value: "val2"}, + {Type: storepb.LabelMatcher_EQ, Name: "key3", Value: "val3"}, + {Type: storepb.LabelMatcher_EQ, Name: "key4", Value: "val4"}, + {Type: storepb.LabelMatcher_RE, Name: "key5", Value: "^(val5|val6|val7|val8|val9).*$"}, + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + matcher := matchers[i%len(matchers)] + _, err := cache.GetOrSet(matcher, storecache.MatcherToPromMatcher) + if err != nil { + b.Fatalf("failed to get or set cache item: %v", err) + } + } +} diff --git a/pkg/store/local.go b/pkg/store/local.go index cb80f8f8cb..5d72ee28af 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -130,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 11d7f1ff77..a503d15689 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -36,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -125,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -488,8 +489,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) { - tms, err := storepb.MatchersToPromMatchers(ms...) +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storecache.MatchersCache) (bool, []*labels.Matcher, error) { + var ( + tms []*labels.Matcher + err error + ) + + tms, err = storecache.MatchersToPromMatchersCached(cache, ms...) if err != nil { return false, nil, err } @@ -537,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -600,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 498c80e2e7..af1ba9dae1 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -89,6 +90,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector + matcherCache storecache.MatchersCache enableDedup bool } @@ -113,7 +115,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* } } -// BucketStoreOption are functions that configure BucketStore. +// ProxyStoreOption are functions that configure the ProxyStore. type ProxyStoreOption func(s *ProxyStore) // WithProxyStoreDebugLogging toggles debug logging. @@ -137,6 +139,13 @@ func WithoutDedup() ProxyStoreOption { } } +// WithMatcherCache sets the matcher cache instance for the proxy. +func WithMatcherCache(cache storecache.MatchersCache) ProxyStoreOption { + return func(s *ProxyStore) { + s.matcherCache = cache + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( @@ -168,6 +177,7 @@ func NewProxyStore( retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, enableDedup: true, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -248,7 +258,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -353,7 +363,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La if s.debugLogging { reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -456,7 +466,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty") } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1389e89b1d..a9db1d11a0 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,22 +6,21 @@ package store import ( "context" "fmt" - "strings" - - "github.com/pkg/errors" - "math" "math/rand" "os" "path/filepath" + "strings" "sync" "testing" "time" "github.com/cespare/xxhash/v2" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -29,11 +28,10 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "google.golang.org/grpc" - "github.com/efficientgo/core/testutil" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" @@ -2086,6 +2084,47 @@ func BenchmarkProxySeries(b *testing.B) { }) } +func BenchmarkProxySeriesRegex(b *testing.B) { + tb := testutil.NewTB(b) + + cache, err := storecache.NewMatchersCache(storecache.WithSize(200)) + testutil.Ok(b, err) + + q := NewProxyStore(nil, + nil, + func() []Client { return nil }, + component.Query, + labels.EmptyLabels(), 0*time.Second, EagerRetrieval, + WithMatcherCache(cache), + ) + + words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"} + bigRegex := strings.Builder{} + for i := 0; i < 200; i++ { + bigRegex.WriteString(words[rand.Intn(len(words))]) + bigRegex.WriteString("|") + } + + matchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "foo", Value: ".*"}, + {Type: storepb.LabelMatcher_RE, Name: "bar", Value: bigRegex.String()}, + } + + // Create a regex that matches all series. + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: matchers, + } + s := newStoreSeriesServer(context.Background()) + + tb.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + testutil.Ok(b, q.Series(req, s)) + } +} + func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { tmpDir := t.TempDir() @@ -2136,6 +2175,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { responseTimeout: 5 * time.Second, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, + matcherCache: storecache.NewNoopMatcherCache(), } var allResps []*storepb.SeriesResponse @@ -2272,6 +2312,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, + matcherCache: storecache.NewNoopMatcherCache(), } ctx, cancel := context.WithCancel(context.Background()) @@ -2309,6 +2350,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { responseTimeout: 50 * time.Millisecond, retrievalStrategy: respStrategy, tsdbSelector: DefaultSelector, + matcherCache: storecache.NewNoopMatcherCache(), } ctx := context.Background() @@ -2469,5 +2511,4 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { tcase.testFn(tcase.responses, h) }) } - } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index d5461a5947..b165d76fcc 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -385,30 +385,35 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) { // NOTE: It allocates memory. func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) - for _, m := range ms { - var t labels.MatchType - - switch m.Type { - case LabelMatcher_EQ: - t = labels.MatchEqual - case LabelMatcher_NEQ: - t = labels.MatchNotEqual - case LabelMatcher_RE: - t = labels.MatchRegexp - case LabelMatcher_NRE: - t = labels.MatchNotRegexp - default: - return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) - } - m, err := labels.NewMatcher(t, m.Name, m.Value) + for i := range ms { + pm, err := MatcherToPromMatcher(ms[i]) if err != nil { return nil, err } - res = append(res, m) + res = append(res, pm) } return res, nil } +// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher. +func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { + var t labels.MatchType + + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + return labels.NewMatcher(t, m.Name, m.Value) +} + // MatchersToString converts label matchers to string format. // String should be parsable as a valid PromQL query metric selector. func MatchersToString(ms ...LabelMatcher) string { @@ -439,6 +444,14 @@ func (m *LabelMatcher) PromString() string { return fmt.Sprintf("%s%s%q", m.Name, m.Type.PromString(), m.Value) } +func (m *LabelMatcher) GetName() string { + return m.Name +} + +func (m *LabelMatcher) GetValue() string { + return m.Value +} + func (x LabelMatcher_Type) PromString() string { typeToStr := map[LabelMatcher_Type]string{ LabelMatcher_EQ: "=", diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 737fee3bbd..a62481a53f 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -53,6 +54,12 @@ func WithCuckooMetricNameStoreFilter() TSDBStoreOption { } } +func WithMatcherCacheInstance(cache storecache.MatchersCache) TSDBStoreOption { + return func(s *TSDBStore) { + s.matcherCache = cache + } +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. @@ -62,6 +69,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int + matcherCache storecache.MatchersCache extLset labels.Labels startStoreFilterUpdate bool @@ -112,6 +120,7 @@ func NewTSDBStore( b := make([]byte, 0, initialBufSize) return &b }}, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -177,13 +186,13 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { return labelSets } -func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo { - labels := p.LabelSet() +func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { + labels := s.LabelSet() if len(labels) == 0 { return []infopb.TSDBInfo{} } - mint, maxt := p.TimeRange() + mint, maxt := s.TimeRange() return []infopb.TSDBInfo{ { Labels: labelpb.ZLabelSet{ @@ -247,7 +256,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser srv = fs } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -370,7 +379,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -432,7 +441,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque } } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) }