diff --git a/development/mimir-ingest-storage/config/mimir.yaml b/development/mimir-ingest-storage/config/mimir.yaml
index d9acd954384..2d36ab22bc8 100644
--- a/development/mimir-ingest-storage/config/mimir.yaml
+++ b/development/mimir-ingest-storage/config/mimir.yaml
@@ -16,9 +16,7 @@ ingest_storage:
     topic:   mimir-ingest
     last_produced_offset_poll_interval: 500ms
     startup_fetch_concurrency: 15
-    startup_records_per_fetch: 2400
     ongoing_fetch_concurrency: 2
-    ongoing_records_per_fetch: 30
 
 ingester:
   track_ingester_owned_series: true
diff --git a/development/mimir-ingest-storage/docker-compose.jsonnet b/development/mimir-ingest-storage/docker-compose.jsonnet
index 1a54972f61f..b120b416836 100644
--- a/development/mimir-ingest-storage/docker-compose.jsonnet
+++ b/development/mimir-ingest-storage/docker-compose.jsonnet
@@ -54,10 +54,10 @@ std.manifestYamlDoc({
         '-ingester.ring.prefix=exclusive-prefix',
         '-ingest-storage.kafka.consume-from-position-at-startup=end',
         '-ingest-storage.kafka.consume-from-timestamp-at-startup=0',
-        '-ingest-storage.kafka.ingestion-concurrency=2',
-        '-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
         '-ingest-storage.kafka.startup-fetch-concurrency=15',
         '-ingest-storage.kafka.ongoing-fetch-concurrency=2',
+        '-ingest-storage.kafka.ingestion-concurrency-max=2',
+        '-ingest-storage.kafka.ingestion-concurrency-batch-size=150',
       ],
       extraVolumes: ['.data-mimir-write-zone-c-61:/data:delegated'],
     }),
diff --git a/development/mimir-ingest-storage/docker-compose.yml b/development/mimir-ingest-storage/docker-compose.yml
index 37f8d9bef54..acb9a6cbb93 100644
--- a/development/mimir-ingest-storage/docker-compose.yml
+++ b/development/mimir-ingest-storage/docker-compose.yml
@@ -322,7 +322,7 @@
     "command":
       - "sh"
       - "-c"
-      - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.ingestion-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2"
+      - "exec ./mimir -config.file=./config/mimir.yaml -target=ingester -activity-tracker.filepath=/activity/mimir-write-zone-c-61 -ingester.ring.instance-availability-zone=zone-c -ingester.ring.instance-id=ingester-zone-c-61 -ingester.partition-ring.prefix=exclusive-prefix -ingester.ring.prefix=exclusive-prefix -ingest-storage.kafka.consume-from-position-at-startup=end -ingest-storage.kafka.consume-from-timestamp-at-startup=0 -ingest-storage.kafka.startup-fetch-concurrency=15 -ingest-storage.kafka.ongoing-fetch-concurrency=2 -ingest-storage.kafka.ingestion-concurrency-max=2 -ingest-storage.kafka.ingestion-concurrency-batch-size=150"
     "depends_on":
       "kafka_1":
         "condition": "service_healthy"
diff --git a/docs/sources/mimir/manage/mimir-runbooks/_index.md b/docs/sources/mimir/manage/mimir-runbooks/_index.md
index 2eb98cdad9a..8dd771c8c0a 100644
--- a/docs/sources/mimir/manage/mimir-runbooks/_index.md
+++ b/docs/sources/mimir/manage/mimir-runbooks/_index.md
@@ -1532,6 +1532,25 @@ How to **investigate**:
   - If the call exists and it's waiting on a lock then there may be a deadlock.
   - If the call doesn't exist then it could either mean processing is not stuck (false positive) or the `pushToStorage` wasn't called at all, and so you should investigate the callers in the code.
 
+### MimirIngesterMissedRecordsFromKafka
+
+This alert fires when an ingester has missed processing some records from Kafka. In other words, there has been a gap in offsets.
+
+How it **works**:
+
+- The ingester reads records from Kafka and processes them sequentially. It keeps track of the offset of the last record it's processed.
+- Upon fetching the next batch of records, it checks if the first available record has an offset of one greater than the last processed offset. If the first available offset is larger than that, then the ingester has missed some records.
+- Kafka doesn't guarantee sequential offsets. If a record has been manually deleted from Kafka or if the records have been produced in a transaction and the transaction was aborted, then there may be a gap.
+- Mimir doesn't produce in transactions and does not delete records.
+- When the ingester starts, it attempts to resume from the last offset it processed. If the ingester has been unavailable for long enough that the next record is already removed due to retention, then the ingester misses some records.
+
+How to **investigate**:
+
+- Find the offsets which were missed. The ingester logs them along with the message `there is a gap in consumed offsets`.
+- Verify that there have been no deleted records in your Kafka cluster.
+- Verify that the ingester hasn't been down for longer than the retention on the Kafka partition.
+- Report a bug.
+
 ### MimirStrongConsistencyEnforcementFailed
 
 This alert fires when too many read requests with strong consistency are failing.
diff --git a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml
index ebb3c5fd135..929e6927b57 100644
--- a/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml
+++ b/operations/helm/tests/metamonitoring-values-generated/mimir-distributed/templates/metamonitoring/mixin-alerts.yaml
@@ -1133,6 +1133,15 @@ spec:
             for: 5m
             labels:
               severity: critical
+          - alert: MimirIngesterMissedRecordsFromKafka
+            annotations:
+              message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
+              runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
+            expr: |
+              # Alert if the ingester missed some records from Kafka.
+              increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
+            labels:
+              severity: critical
           - alert: MimirStrongConsistencyEnforcementFailed
             annotations:
               message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
diff --git a/operations/mimir-mixin-compiled-baremetal/alerts.yaml b/operations/mimir-mixin-compiled-baremetal/alerts.yaml
index 77846f2f2d6..9283e7e8eea 100644
--- a/operations/mimir-mixin-compiled-baremetal/alerts.yaml
+++ b/operations/mimir-mixin-compiled-baremetal/alerts.yaml
@@ -1107,6 +1107,15 @@ groups:
           for: 5m
           labels:
             severity: critical
+        - alert: MimirIngesterMissedRecordsFromKafka
+          annotations:
+            message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
+            runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
+          expr: |
+            # Alert if the ingester missed some records from Kafka.
+            increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
+          labels:
+            severity: critical
         - alert: MimirStrongConsistencyEnforcementFailed
           annotations:
             message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
diff --git a/operations/mimir-mixin-compiled/alerts.yaml b/operations/mimir-mixin-compiled/alerts.yaml
index cef80213c98..be782a551da 100644
--- a/operations/mimir-mixin-compiled/alerts.yaml
+++ b/operations/mimir-mixin-compiled/alerts.yaml
@@ -1121,6 +1121,15 @@ groups:
           for: 5m
           labels:
             severity: critical
+        - alert: MimirIngesterMissedRecordsFromKafka
+          annotations:
+            message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} missed processing records from Kafka. There may be data loss.
+            runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringestermissedrecordsfromkafka
+          expr: |
+            # Alert if the ingester missed some records from Kafka.
+            increase(cortex_ingest_storage_reader_missed_records_total[10m]) > 0
+          labels:
+            severity: critical
         - alert: MimirStrongConsistencyEnforcementFailed
           annotations:
             message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to enforce strong-consistency on read-path.
diff --git a/operations/mimir-mixin/alerts/ingest-storage.libsonnet b/operations/mimir-mixin/alerts/ingest-storage.libsonnet
index 3cca0186d4b..b223a0514b7 100644
--- a/operations/mimir-mixin/alerts/ingest-storage.libsonnet
+++ b/operations/mimir-mixin/alerts/ingest-storage.libsonnet
@@ -161,6 +161,21 @@
           },
         },
 
+        // Alert firing is an ingester is reading from Kafka, there are buffered records to process, but processing is stuck.
+        {
+          alert: $.alertName('IngesterMissedRecordsFromKafka'),
+          expr: |||
+            # Alert if the ingester missed some records from Kafka.
+            increase(cortex_ingest_storage_reader_missed_records_total[%s]) > 0
+          ||| % $.alertRangeInterval(10),
+          labels: {
+            severity: 'critical',
+          },
+          annotations: {
+            message: '%(product)s {{ $labels.%(per_instance_label)s }} in %(alert_aggregation_variables)s missed processing records from Kafka. There may be data loss.' % $._config,
+          },
+        },
+
         // Alert firing if Mimir is failing to enforce strong read consistency.
         {
           alert: $.alertName('StrongConsistencyEnforcementFailed'),
diff --git a/pkg/storage/ingest/fetcher.go b/pkg/storage/ingest/fetcher.go
index 710c7f5a6d4..044e17009f8 100644
--- a/pkg/storage/ingest/fetcher.go
+++ b/pkg/storage/ingest/fetcher.go
@@ -16,6 +16,7 @@ import (
 	"github.com/go-kit/log/level"
 	"github.com/grafana/dskit/backoff"
 	"github.com/opentracing/opentracing-go"
+	"github.com/prometheus/client_golang/prometheus"
 	"github.com/twmb/franz-go/pkg/kadm"
 	"github.com/twmb/franz-go/pkg/kerr"
 	"github.com/twmb/franz-go/pkg/kgo"
@@ -227,7 +228,7 @@ type concurrentFetchers struct {
 	// ordering.
 	orderedFetches chan fetchResult
 
-	lastReturnedRecord int64
+	lastReturnedOffset int64
 	startOffsets       *genericOffsetReader[int64]
 
 	// trackCompressedBytes controls whether to calculate MaxBytes for fetch requests based on previous responses' compressed or uncompressed bytes.
@@ -283,7 +284,7 @@ func newConcurrentFetchers(
 		partitionID:             partition,
 		metrics:                 metrics,
 		minBytesWaitTime:        minBytesWaitTime,
-		lastReturnedRecord:      startOffset - 1,
+		lastReturnedOffset:      startOffset - 1,
 		startOffsets:            startOffsetsReader,
 		trackCompressedBytes:    trackCompressedBytes,
 		maxBufferedBytesLimit:   maxBufferedBytesLimit,
@@ -343,7 +344,7 @@ func (r *concurrentFetchers) Stop() {
 	r.bufferedFetchedRecords.Store(0)
 	r.bufferedFetchedBytes.Store(0)
 
-	level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_record", r.lastReturnedRecord)
+	level.Info(r.logger).Log("msg", "stopped concurrent fetchers", "last_returned_offset", r.lastReturnedOffset)
 }
 
 // Update implements fetcher
@@ -352,7 +353,7 @@ func (r *concurrentFetchers) Update(ctx context.Context, concurrency int) {
 	r.done = make(chan struct{})
 
 	r.wg.Add(1)
-	go r.start(ctx, r.lastReturnedRecord+1, concurrency)
+	go r.start(ctx, r.lastReturnedOffset+1, concurrency)
 }
 
 // PollFetches implements fetcher
@@ -369,12 +370,13 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
 		// PollFetches() calls).
 		r.bufferedFetchedRecords.Sub(int64(len(f.Records)))
 
-		firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
+		firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedOffset)
 		r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)
 
 		f.Records = f.Records[firstUnreturnedRecordIdx:]
 		if len(f.Records) > 0 {
-			r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
+			instrumentGaps(findGapsInRecords(f.Records, r.lastReturnedOffset), r.metrics.missedRecords, r.logger)
+			r.lastReturnedOffset = f.Records[len(f.Records)-1].Offset
 		}
 
 		return kgo.Fetches{{
@@ -388,6 +390,41 @@ func (r *concurrentFetchers) PollFetches(ctx context.Context) (kgo.Fetches, cont
 	}
 }
 
+func instrumentGaps(gaps []offsetRange, records prometheus.Counter, logger log.Logger) {
+	for _, gap := range gaps {
+		level.Error(logger).Log(
+			"msg", "there is a gap in consumed offsets; it is likely that there was data loss; see runbook for MimirIngesterMissedRecordsFromKafka",
+			"records_offset_gap_start_inclusive", gap.start,
+			"records_offset_gap_end_exclusive", gap.end,
+		)
+		records.Add(float64(gap.numOffsets()))
+		level.Error(logger).Log("msg", "found gap in records", "start", gap.start, "end", gap.end)
+	}
+}
+
+type offsetRange struct {
+	// start is inclusive
+	start int64
+
+	// end is exclusive
+	end int64
+}
+
+func (g offsetRange) numOffsets() int64 {
+	return g.end - g.start
+}
+
+func findGapsInRecords(records []*kgo.Record, lastReturnedOffset int64) []offsetRange {
+	var gaps []offsetRange
+	for _, r := range records {
+		if r.Offset != lastReturnedOffset+1 {
+			gaps = append(gaps, offsetRange{start: lastReturnedOffset + 1, end: r.Offset})
+		}
+		lastReturnedOffset = r.Offset
+	}
+	return gaps
+}
+
 func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
 	for i, r := range records {
 		if r.Offset > offset {
diff --git a/pkg/storage/ingest/fetcher_test.go b/pkg/storage/ingest/fetcher_test.go
index 92b45f15296..deb7a066440 100644
--- a/pkg/storage/ingest/fetcher_test.go
+++ b/pkg/storage/ingest/fetcher_test.go
@@ -8,6 +8,7 @@ import (
 	"errors"
 	"fmt"
 	"math"
+	"strings"
 	"sync"
 	"testing"
 	"time"
@@ -17,6 +18,7 @@ import (
 	"github.com/grafana/dskit/services"
 	"github.com/grafana/dskit/test"
 	"github.com/prometheus/client_golang/prometheus"
+	promtest "github.com/prometheus/client_golang/prometheus/testutil"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 	"github.com/twmb/franz-go/pkg/kerr"
@@ -1149,6 +1151,15 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
 	reg := prometheus.NewPedanticRegistry()
 	metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{})
 
+	t.Cleanup(func() {
+		// Assuming none of the tests intentionally create gaps in offsets, there should be no missed records.
+		assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
+			# HELP cortex_ingest_storage_reader_missed_records_total The number of offsets that were never consumed by the reader because they weren't fetched.
+        	# TYPE cortex_ingest_storage_reader_missed_records_total counter
+        	cortex_ingest_storage_reader_missed_records_total 0
+		`), "cortex_ingest_storage_reader_missed_records_total"))
+	})
+
 	// This instantiates the fields of kprom.
 	// This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves.
 	metrics.kprom.OnNewClient(client)
@@ -1322,3 +1333,78 @@ func TestFetchWant_UpdateBytesPerRecord(t *testing.T) {
 		})
 	}
 }
+
+func TestFindGapsInRecords(t *testing.T) {
+	tests := map[string]struct {
+		records            []*kgo.Record
+		lastReturnedOffset int64
+		want               []offsetRange
+	}{
+		"no gaps": {
+			records: []*kgo.Record{
+				{Offset: 1},
+				{Offset: 2},
+				{Offset: 3},
+			},
+			lastReturnedOffset: 0,
+			want:               nil,
+		},
+		"single gap": {
+			records: []*kgo.Record{
+				{Offset: 5},
+			},
+			lastReturnedOffset: 2,
+			want: []offsetRange{
+				{start: 3, end: 5},
+			},
+		},
+		"multiple gaps": {
+			records: []*kgo.Record{
+				{Offset: 3},
+				{Offset: 7},
+				{Offset: 10},
+			},
+			lastReturnedOffset: 1,
+			want: []offsetRange{
+				{start: 2, end: 3},
+				{start: 4, end: 7},
+				{start: 8, end: 10},
+			},
+		},
+		"empty records": {
+			records:            []*kgo.Record{},
+			lastReturnedOffset: 5,
+			want:               nil,
+		},
+		"gap at start": {
+			records: []*kgo.Record{
+				{Offset: 10},
+				{Offset: 11},
+			},
+			lastReturnedOffset: 5,
+			want: []offsetRange{
+				{start: 6, end: 10},
+			},
+		},
+		"gap at start and middle": {
+			records: []*kgo.Record{
+				{Offset: 10},
+				{Offset: 11},
+				{Offset: 15},
+				{Offset: 16},
+			},
+			lastReturnedOffset: 5,
+			want: []offsetRange{
+				{start: 6, end: 10},
+				{start: 12, end: 15},
+			},
+		},
+	}
+
+	for name, tc := range tests {
+		t.Run(name, func(t *testing.T) {
+			got := findGapsInRecords(tc.records, tc.lastReturnedOffset)
+			assert.Equal(t, tc.want, got)
+		})
+	}
+}
diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go
index 9a4bf73426b..009b4fe039e 100644
--- a/pkg/storage/ingest/reader.go
+++ b/pkg/storage/ingest/reader.go
@@ -1007,6 +1007,7 @@ type readerMetrics struct {
 	lastConsumedOffset               prometheus.Gauge
 	consumeLatency                   prometheus.Histogram
 	kprom                            *kprom.Metrics
+	missedRecords                    prometheus.Counter
 }
 
 type readerMetricsSource interface {
@@ -1083,6 +1084,10 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer, metricsSourc
 		strongConsistencyInstrumentation: NewStrongReadConsistencyInstrumentation[struct{}](component, reg),
 		lastConsumedOffset:               lastConsumedOffset,
 		kprom:                            NewKafkaReaderClientMetrics(component, reg),
+		missedRecords: promauto.With(reg).NewCounter(prometheus.CounterOpts{
+			Name: "cortex_ingest_storage_reader_missed_records_total",
+			Help: "The number of offsets that were never consumed by the reader because they weren't fetched.",
+		}),
 	}
 
 	m.Service = services.NewTimerService(100*time.Millisecond, nil, func(context.Context) error {