Skip to content

Commit

Permalink
Implement metadata API limit in Ingester (#6128)
Browse files Browse the repository at this point in the history
Co-authored-by: Alan Protasio <[email protected]>
  • Loading branch information
harry671003 and alanprot authored Aug 13, 2024
1 parent a046044 commit 22245aa
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 186 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
* [ENHANCEMENT] Query Frontend: Add cortex_query_samples_total metric. #6142
* [ENHANCEMENT] Ingester: Implement metadata API limit. #6128
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
Expand Down
62 changes: 43 additions & 19 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -671,7 +672,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return nil, err
}
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
if !removeReplica {
if !removeReplica { // False, Nil
d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples))
}
}
Expand Down Expand Up @@ -1021,7 +1022,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
})
}

func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelValues", opentracing.Tags{
"name": labelName,
"start": from.Unix(),
Expand All @@ -1032,8 +1033,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
if err != nil {
return nil, err
}

req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, matchers)
limit := getLimitFromLabelHints(hints)
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, limit, matchers)
if err != nil {
return nil, err
}
Expand All @@ -1053,13 +1054,16 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
if err != nil {
return nil, err
}
if limit > 0 && len(r) > limit {
r = r[:limit]
}
span.SetTag("result_length", len(r))
return r, nil
}

// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
Expand All @@ -1071,8 +1075,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
}

// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
Expand All @@ -1096,7 +1100,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
"start": from.Unix(),
"end": to.Unix(),
Expand All @@ -1107,9 +1111,11 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return nil, err
}

limit := getLimitFromLabelHints(hints)
req := &ingester_client.LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Limit: int64(limit),
}
resps, err := f(ctx, replicationSet, req)
if err != nil {
Expand All @@ -1126,13 +1132,17 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
if err != nil {
return nil, err
}
if limit > 0 && len(r) > limit {
r = r[:limit]
}

span.SetTag("result_length", len(r))

return r, nil
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
if err != nil {
Expand All @@ -1157,8 +1167,8 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
}

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
if err != nil {
Expand All @@ -1170,8 +1180,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
}

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.MetricsForLabelMatchers(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1199,8 +1209,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
}, matchers...)
}

func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1239,14 +1249,14 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, hints *storage.SelectHints, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, getLimitFromSelectHints(hints), matchers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1438,3 +1448,17 @@ func findHALabels(replicaLabel, clusterLabel string, labels []cortexpb.LabelAdap

return cluster, replica
}

func getLimitFromLabelHints(hints *storage.LabelHints) int {
if hints != nil {
return hints.Limit
}
return 0
}

func getLimitFromSelectHints(hints *storage.SelectHints) int {
if hints != nil {
return hints.Limit
}
return 0
}
14 changes: 7 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ func BenchmarkDistributor_GetLabelsValues(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__")
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__", nil)
require.NoError(b, err)
}
})
Expand Down Expand Up @@ -2270,7 +2270,7 @@ func TestDistributor_MetricsForLabelMatchers_SingleSlowIngester(t *testing.T) {
}

for i := 0; i < 50; i++ {
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -2439,7 +2439,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
}

{
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, testData.matchers...)

if testData.expectedErr != nil {
assert.ErrorIs(t, err, testData.expectedErr)
Expand All @@ -2457,7 +2457,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
}

{
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, testData.matchers...)
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, nil, testData.matchers...)
if testData.expectedErr != nil {
assert.ErrorIs(t, err, testData.expectedErr)
return
Expand Down Expand Up @@ -2544,7 +2544,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {

for n := 0; n < b.N; n++ {
now := model.Now()
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, testData.matchers...)

if testData.expectedErr != nil {
assert.EqualError(b, err, testData.expectedErr.Error())
Expand Down Expand Up @@ -3197,7 +3197,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c
return nil, errFail
}

_, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3229,7 +3229,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
return nil, errFail
}

_, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
if err != nil {
return nil, err
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse {
}

// ToMetricsForLabelMatchersRequest builds a MetricsForLabelMatchersRequest proto
func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
func ToMetricsForLabelMatchersRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
Expand All @@ -124,6 +124,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
MatchersSet: []*LabelMatchers{{Matchers: ms}},
Limit: int64(limit),
}, nil
}

Expand Down Expand Up @@ -174,22 +175,22 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
}

// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
for _, matchers := range req.MatchersSet {
matchers, err := FromLabelMatchers(matchers.Matchers)
if err != nil {
return 0, 0, nil, err
return 0, 0, 0, nil, err
}
matchersSet = append(matchersSet, matchers)
}
from := model.Time(req.StartTimestampMs)
to := model.Time(req.EndTimestampMs)
return from, to, matchersSet, nil
return from, to, int(req.Limit), matchersSet, nil
}

// ToLabelValuesRequest builds a LabelValuesRequest proto
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
Expand All @@ -200,22 +201,23 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matche
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: int64(limit),
}, nil
}

// FromLabelValuesRequest unpacks a LabelValuesRequest proto
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*labels.Matcher, error) {
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return "", 0, 0, nil, err
return "", 0, 0, 0, nil, err
}
}

return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, matchers, nil
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
Expand Down
Loading

0 comments on commit 22245aa

Please sign in to comment.