From dc3ddfa730e9a1ddb2f74c34ade885ae7b6d245f Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 19 Nov 2024 10:16:04 +0100 Subject: [PATCH] kafka replay speed: add alert for when we miss records in Kafka (#9921) * kafka replay speed: don't trim fetchWants I realized that trimming `fetchWant`s can end up discarding offsets in extreme circumstances. ### How it works If the fetchWant is so big that its size would exceed 2GiB, then we trim it. We trim it by reducing the end offset. The idea is that the next fetchWant will pick up from where this one left off. ### How it can break We trim the `fetchWant` in `UpdateBytesPerRecord` too. `UpdateBytesPerRecord` can be invoked in `concurrentFEtchers.run` after the `fetchWant` is dispatched. In that case the next `fetchWant` would have already been calculated. And we would end up with a gap. ### Did it break? It's hard to tell, but it's very unlikely. To reach 2GiB we would have needed to have the estimation for bytes per record be 2 MiB. While these large records are possible, they should be rare and our rolling average estimation for records size shouldn't reach it. Signed-off-by: Dimitar Dimitrov * kafka replay speed: add alert for when we miss records in Kafka Signed-off-by: Dimitar Dimitrov * Restore local config Signed-off-by: Dimitar Dimitrov * Assert there are no missed records at the end of every test Signed-off-by: Dimitar Dimitrov * make doc Signed-off-by: Dimitar Dimitrov * Fix rebase Signed-off-by: Dimitar Dimitrov * Add support for gaps within a Fetch Signed-off-by: Dimitar Dimitrov * Reword runbook Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> * Update log fields Co-authored-by: Marco Pracucci * Update docs/sources/mimir/manage/mimir-runbooks/_index.md * Add TestFindGapsInRecords Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov Co-authored-by: Taylor C <41653732+tacole02@users.noreply.github.com> Co-authored-by: Marco Pracucci --- .../mimir-ingest-storage/config/mimir.yaml | 2 - .../docker-compose.jsonnet | 4 +- .../mimir-ingest-storage/docker-compose.yml | 2 +- .../mimir/manage/mimir-runbooks/_index.md | 19 ++++ .../metamonitoring/mixin-alerts.yaml | 9 ++ .../alerts.yaml | 9 ++ operations/mimir-mixin-compiled/alerts.yaml | 9 ++ .../alerts/ingest-storage.libsonnet | 15 ++++ pkg/storage/ingest/fetcher.go | 49 +++++++++-- pkg/storage/ingest/fetcher_test.go | 86 +++++++++++++++++++ pkg/storage/ingest/reader.go | 5 ++ 11 files changed, 198 insertions(+), 11 deletions(-) diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml index d9acd954384..2d36ab22bc8 100644 --- a/development/mimir-ingest-storage/config/mimir.yaml +++ b/development/mimir-ingest-storage/config/mimir.yaml @@ -16,9 +16,7 @@ ingest_storage: topic: mimir-ingest last_produced_offset_poll_interval: 500ms startup_fetch_concurrency: 15 - startup_records_per_fetch: 2400 ongoing_fetch_concurrency: 2 - ongoing_records_per_fetch: 30 ingester: track_ingester_owned_series: true diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet index 1a54972f61f..b120b416836 100644 --- a/development/mimir-ingest-storage/docker-compose.jsonnet +++ b/development/mimir-ingest-storage/docker-compose.jsonnet @@ -54,10 +54,10 @@ std.manifestYamlDoc({ '-ingester.ring.prefix=exclusive-prefix', '-ingest-storage.kafka.consume-from-position-at-startup=end', '-ingest-storage.kafka.consume-from-timestamp-at-startup=0', - '-ingest-storage.kafka.ingestion-concurrency=2', - '-ingest-storage.kafka.ingestion-concurrency-batch-size=150', '-ingest-storage.kafka.startup-fetch-concurrency=15', '-ingest-storage.kafka.ongoing-fetch-concurrency=2', + '-ingest-storage.kafka.ingestion-concurrency-max=2', + '-ingest-storage.kafka.ingestion-concurrency-batch-size=150', ], extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'], }), diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml index 37f8d9bef54..acb9a6cbb93 100644 --- a/development/mimir-ingest-storage/docker-compose.yml +++ b/development/mimir-ingest-storage/docker-compose.yml @@ -322,7 +322,7 @@ "command": - "sh" - "-c" - - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2" + - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150" "depends_on": "kafka_1": "condition": "service_healthy" diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md index 2eb98cdad9a..8dd771c8c0a 100644 --- a/docs/sources/mimir/manage/mimir-runbooks/_index.md +++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md @@ -1532,6 +1532,25 @@ How to **investigate**: - If the call exists and it's waiting on a lock then there may be a deadlock. - If the call doesn't exist then it could either mean processing is not stuck (false positive) or the `pushToStorage` wasn't called at all, and so you should investigate the callers in the code. +### MimirIngesterMissedRecordsFromKafka + +This alert fires when an ingester has missed processing some records from Kafka. In other words, there has been a gap in offsets. + +How it **works**: + +- The ingester reads records from Kafka and processes them sequentially. It keeps track of the offset of the last record it's processed. +- Upon fetching the next batch of records, it checks if the first available record has an offset of one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records. +- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or if the records have been produced in a transaction and the transaction was aborted, then there may be a gap. +- Mimir doesn't produce in transactions and does not delete records. +- When the ingester starts, it attempts to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester misses some records. + +How to **investigate**: + +- Find the offsets which were missed. The ingester logs them along with the message `there is a gap in consumed offsets`. +- Verify that there have been no deleted records in your Kafka cluster. +- Verify that the ingester hasn't been down for longer than the retention on the Kafka partition. +- Report a bug. + ### MimirStrongConsistencyEnforcementFailed This alert fires when too many read requests with strong consistency are failing. diff --git a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml index ebb3c5fd135..929e6927b57 100644 --- a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml +++ b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml @@ -1133,6 +1133,15 @@ spec: for: 5m labels: severity: critical + - alert: MimirIngesterMissedRecordsFromKafka + annotations: + message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss. + runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka + expr: | + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0 + labels: + severity: critical - alert: MimirStrongConsistencyEnforcementFailed annotations: message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path. diff --git a/operations/mimir-mixin-compiled-baremetal/alerts.yaml b/operations/mimir-mixin-compiled-baremetal/alerts.yaml index 77846f2f2d6..9283e7e8eea 100644 --- a/operations/mimir-mixin-compiled-baremetal/alerts.yaml +++ b/operations/mimir-mixin-compiled-baremetal/alerts.yaml @@ -1107,6 +1107,15 @@ groups: for: 5m labels: severity: critical + - alert: MimirIngesterMissedRecordsFromKafka + annotations: + message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss. + runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka + expr: | + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0 + labels: + severity: critical - alert: MimirStrongConsistencyEnforcementFailed annotations: message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path. diff --git a/operations/mimir-mixin-compiled/alerts.yaml b/operations/mimir-mixin-compiled/alerts.yaml index cef80213c98..be782a551da 100644 --- a/operations/mimir-mixin-compiled/alerts.yaml +++ b/operations/mimir-mixin-compiled/alerts.yaml @@ -1121,6 +1121,15 @@ groups: for: 5m labels: severity: critical + - alert: MimirIngesterMissedRecordsFromKafka + annotations: + message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss. + runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka + expr: | + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0 + labels: + severity: critical - alert: MimirStrongConsistencyEnforcementFailed annotations: message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path. diff --git a/operations/mimir-mixin/alerts/ingest-storage.libsonnet b/operations/mimir-mixin/alerts/ingest-storage.libsonnet index 3cca0186d4b..b223a0514b7 100644 --- a/operations/mimir-mixin/alerts/ingest-storage.libsonnet +++ b/operations/mimir-mixin/alerts/ingest-storage.libsonnet @@ -161,6 +161,21 @@ }, }, + // Alert firing is an ingester is reading from Kafka, there are buffered records to process, but processing is stuck. + { + alert: $.alertName('IngesterMissedRecordsFromKafka'), + expr: ||| + # Alert if the ingester missed some records from Kafka. + increase(cortex_ingest_storage_reader_missed_records_total[%s]) > 0 + ||| % $.alertRangeInterval(10), + labels: { + severity: 'critical', + }, + annotations: { + message: '%(product)s {{ $labels.%(per_instance_label)s }} in %(alert_aggregation_variables)s missed processing records from Kafka. There may be data loss.' % $._config, + }, + }, + // Alert firing if Mimir is failing to enforce strong read consistency. { alert: $.alertName('StrongConsistencyEnforcementFailed'), diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go index 710c7f5a6d4..044e17009f8 100644 --- a/pkg/storage/ingest/fetcher.go +++ b/pkg/storage/ingest/fetcher.go @@ -16,6 +16,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" @@ -227,7 +228,7 @@ type concurrentFetchers struct { // ordering. orderedFetches chan fetchResult - lastReturnedRecord int64 + lastReturnedOffset int64 startOffsets *genericOffsetReader[int64] // trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes. @@ -283,7 +284,7 @@ func newConcurrentFetchers( partitionID: partition, metrics: metrics, minBytesWaitTime: minBytesWaitTime, - lastReturnedRecord: startOffset - 1, + lastReturnedOffset: startOffset - 1, startOffsets: startOffsetsReader, trackCompressedBytes: trackCompressedBytes, maxBufferedBytesLimit: maxBufferedBytesLimit, @@ -343,7 +344,7 @@ func (r *concurrentFetchers) Stop() { r.bufferedFetchedRecords.Store(0) r.bufferedFetchedBytes.Store(0) - level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord) + level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset) } // Update implements fetcher @@ -352,7 +353,7 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) { r.done = make(chan struct{}) r.wg.Add(1) - go r.start(ctx, r.lastReturnedRecord+1, concurrency) + go r.start(ctx, r.lastReturnedOffset+1, concurrency) } // PollFetches implements fetcher @@ -369,12 +370,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont // PollFetches() calls). r.bufferedFetchedRecords.Sub(int64(len(f.Records))) - firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) + firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedOffset) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) f.Records = f.Records[firstUnreturnedRecordIdx:] if len(f.Records) > 0 { - r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset + instrumentGaps(findGapsInRecords(f.Records, r.lastReturnedOffset), r.metrics.missedRecords, r.logger) + r.lastReturnedOffset = f.Records[len(f.Records)-1].Offset } return kgo.Fetches{{ @@ -388,6 +390,41 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont } } +func instrumentGaps(gaps []offsetRange, records prometheus.Counter, logger log.Logger) { + for _, gap := range gaps { + level.Error(logger).Log( + "msg", "there is a gap in consumed offsets; it is likely that there was data loss; see runbook for MimirIngesterMissedRecordsFromKafka", + "records_offset_gap_start_inclusive", gap.start, + "records_offset_gap_end_exclusive", gap.end, + ) + records.Add(float64(gap.numOffsets())) + level.Error(logger).Log("msg", "found gap in records", "start", gap.start, "end", gap.end) + } +} + +type offsetRange struct { + // start is inclusive + start int64 + + // end is exclusive + end int64 +} + +func (g offsetRange) numOffsets() int64 { + return g.end - g.start +} + +func findGapsInRecords(records []*kgo.Record, lastReturnedOffset int64) []offsetRange { + var gaps []offsetRange + for _, r := range records { + if r.Offset != lastReturnedOffset+1 { + gaps = append(gaps, offsetRange{start: lastReturnedOffset + 1, end: r.Offset}) + } + lastReturnedOffset = r.Offset + } + return gaps +} + func recordIndexAfterOffset(records []*kgo.Record, offset int64) int { for i, r := range records { if r.Offset > offset { diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go index 92b45f15296..deb7a066440 100644 --- a/pkg/storage/ingest/fetcher_test.go +++ b/pkg/storage/ingest/fetcher_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "math" + "strings" "sync" "testing" "time" @@ -17,6 +18,7 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/test" "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kerr" @@ -1149,6 +1151,15 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli reg := prometheus.NewPedanticRegistry() metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{}) + t.Cleanup(func() { + // Assuming none of the tests intentionally create gaps in offsets, there should be no missed records. + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_reader_missed_records_total The number of offsets that were never consumed by the reader because they weren't fetched. + # TYPE cortex_ingest_storage_reader_missed_records_total counter + cortex_ingest_storage_reader_missed_records_total 0 + `), "cortex_ingest_storage_reader_missed_records_total")) + }) + // This instantiates the fields of kprom. // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. metrics.kprom.OnNewClient(client) @@ -1322,3 +1333,78 @@ func TestFetchWant_UpdateBytesPerRecord(t *testing.T) { }) } } + +func TestFindGapsInRecords(t *testing.T) { + tests := map[string]struct { + records []*kgo.Record + lastReturnedOffset int64 + want []offsetRange + }{ + "no gaps": { + records: []*kgo.Record{ + {Offset: 1}, + {Offset: 2}, + {Offset: 3}, + }, + lastReturnedOffset: 0, + want: nil, + }, + "single gap": { + records: []*kgo.Record{ + {Offset: 5}, + }, + lastReturnedOffset: 2, + want: []offsetRange{ + {start: 3, end: 5}, + }, + }, + "multiple gaps": { + records: []*kgo.Record{ + {Offset: 3}, + {Offset: 7}, + {Offset: 10}, + }, + lastReturnedOffset: 1, + want: []offsetRange{ + {start: 2, end: 3}, + {start: 4, end: 7}, + {start: 8, end: 10}, + }, + }, + "empty records": { + records: []*kgo.Record{}, + lastReturnedOffset: 5, + want: nil, + }, + "gap at start": { + records: []*kgo.Record{ + {Offset: 10}, + {Offset: 11}, + }, + lastReturnedOffset: 5, + want: []offsetRange{ + {start: 6, end: 10}, + }, + }, + "gap at start and middle": { + records: []*kgo.Record{ + {Offset: 10}, + {Offset: 11}, + {Offset: 15}, + {Offset: 16}, + }, + lastReturnedOffset: 5, + want: []offsetRange{ + {start: 6, end: 10}, + {start: 12, end: 15}, + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + got := findGapsInRecords(tc.records, tc.lastReturnedOffset) + assert.Equal(t, tc.want, got) + }) + } +} diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 9a4bf73426b..009b4fe039e 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -1007,6 +1007,7 @@ type readerMetrics struct { lastConsumedOffset prometheus.Gauge consumeLatency prometheus.Histogram kprom *kprom.Metrics + missedRecords prometheus.Counter } type readerMetricsSource interface { @@ -1083,6 +1084,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg), lastConsumedOffset: lastConsumedOffset, kprom: NewKafkaReaderClientMetrics(component, reg), + missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_missed_records_total", + Help: "The number of offsets that were never consumed by the reader because they weren't fetched.", + }), } m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {