Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: counter resets ooono #9918

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ require (
)

// Using a fork of Prometheus with Mimir-specific changes.
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef
replace github.com/prometheus/prometheus => github.com/grafana/mimir-prometheus v0.0.0-20241113161758-2dd468643947

// Replace memberlist with our fork which includes some fixes that haven't been
// merged upstream yet:
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1272,8 +1272,8 @@ github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzO
github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef h1:DH1rR0nzlOIPXvLDQjmlyEtr4g+rqrN/iklns+m3w0U=
github.com/grafana/mimir-prometheus v0.0.0-20241112081901-33bfe839e4ef/go.mod h1:M4xmfU7SsnzjkLwJfvNen/MxAZp4DJPfipLzeib+0gQ=
github.com/grafana/mimir-prometheus v0.0.0-20241113161758-2dd468643947 h1:73Vnxb4q7ET0+1TkCgzrozaU7YJz6gSDcQkQOoq31Ys=
github.com/grafana/mimir-prometheus v0.0.0-20241113161758-2dd468643947/go.mod h1:M4xmfU7SsnzjkLwJfvNen/MxAZp4DJPfipLzeib+0gQ=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 h1:em1oddjXL8c1tL0iFdtVtPloq2hRPen2MJQKoAWpxu0=
github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240930132144-b5e64e81e8d3 h1:6D2gGAwyQBElSrp3E+9lSr7k8gLuP3Aiy20rweLWeBw=
Expand Down
216 changes: 216 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/histogram"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcutil"
Expand Down Expand Up @@ -5749,6 +5751,220 @@ func TestIngester_QueryExemplars(t *testing.T) {
})
}

// This test shows a single ingester returns compacted OOO and in-order chunks separately, even if they overlap
func TestIngester_QueryStream_CounterResets(t *testing.T) {
// Create ingester.
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test.
cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout = 1 * time.Second
cfg.BlocksStorageConfig.TSDB.HeadCompactionIntervalJitterEnabled = false
cfg.TSDBConfigUpdatePeriod = 1 * time.Second

// Set the OOO window to 30 minutes and enable native histograms.
limits := map[string]*validation.Limits{
userID: {
OutOfOrderTimeWindow: model.Duration(30 * time.Minute),
OOONativeHistogramsIngestionEnabled: true,
NativeHistogramsIngestionEnabled: true,
},
}
override, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits))
require.NoError(t, err)

i, err := prepareIngesterWithBlockStorageAndOverrides(t, cfg, override, nil, "", "", nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's healthy.
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

histLbls := labels.FromStrings(labels.MetricName, "foo", "series_id", strconv.Itoa(0), "type", "histogram")
histReq := mockHistogramWriteRequest(histLbls, int64(0), 4, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(2), 6, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(4), 8, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(1), 2, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(3), 3, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

// Sorted: 4, 2 (OOO), 6, 3 (OOO), 8
// Merged chunks: [4], [2 (OOO), 6], [3 (OOO), 8]

// Query chunks before compaction

// Create a GRPC server used to query back the data.
serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor))
defer serv.GracefulStop()
client.RegisterIngesterServer(serv, i)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, serv.Serve(listener))
}()

// Query back the series using GRPC streaming.
inst := ring.InstanceDesc{Id: "test", Addr: listener.Addr().String()}
c, err := client.MakeIngesterClient(inst, defaultClientTestConfig(), client.NewMetrics(nil))
require.NoError(t, err)
defer c.Close()

s, err := c.QueryStream(ctx, &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: 5,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "foo",
}},
})
require.NoError(t, err)

recvMsgs := 0

chunks := []client.Chunk{}
for {
resp, err := s.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)

for _, c := range resp.Chunkseries {
chunks = append(chunks, c.Chunks...)
}
recvMsgs++
}

require.Equal(t, recvMsgs, 1)
require.Len(t, chunks, 3)
// Sort chunks by time
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].StartTimestampMs < chunks[j].StartTimestampMs
})

headers := []chunkenc.CounterResetHeader{}
for _, c := range chunks {
require.Equal(t, c.Encoding, int32(chunk.PrometheusHistogramChunk))
chk, err := chunkenc.FromData(chunkenc.EncHistogram, c.Data)
require.NoError(t, err)

headers = append(headers, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
// Second + third chunks have counter reset detected as samples from in-order and OOO are merged and counter resets detected
// TODO: don't care what the counter resets are (will always treat as unknown)
// TODO: check samples in each chunk
require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset, chunkenc.CounterReset}, headers)

time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter)))

// Compaction
i.compactBlocks(context.Background(), false, 0, nil) // Should be compacted because the TSDB is idle.
verifyCompactedHead(t, i, true)

defer c.Close()

// Query chunks after compaction
s, err = c.QueryStream(ctx, &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: 5,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "foo",
}},
})
require.NoError(t, err)

recvMsgs = 0

chunks = []client.Chunk{}
for {
resp, err := s.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)

for _, c := range resp.Chunkseries {
chunks = append(chunks, c.Chunks...)
}
recvMsgs++
}

require.Equal(t, recvMsgs, 1)
// Two chunks - in-order and OOO blocks are first compacted separately.
// When querying, chunks from blocks in an ingester aren't merged together as the UnorderedChunkQuerier is used.
require.Len(t, chunks, 2)
// Sort chunks by time
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].StartTimestampMs < chunks[j].StartTimestampMs
})

var samples [][]sample
headers = []chunkenc.CounterResetHeader{}
for _, c := range chunks {
require.Equal(t, c.Encoding, int32(chunk.PrometheusHistogramChunk))
chk, err := chunkenc.FromData(chunkenc.EncHistogram, c.Data)
require.NoError(t, err)

s := []sample{}
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
ts, h := it.AtHistogram(nil)
s = append(s, sample{t: ts, h: h})
}
samples = append(samples, s)
headers = append(headers, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, headers)
require.Equal(t, [][]sample{
{
{t: 0, h: HistogramWithHint(4, histogram.UnknownCounterReset)},
{t: 2, h: HistogramWithHint(6, histogram.NotCounterReset)},
{t: 4, h: HistogramWithHint(8, histogram.NotCounterReset)},
},
{
{t: 1, h: HistogramWithHint(2, histogram.UnknownCounterReset)},
{t: 3, h: HistogramWithHint(3, histogram.NotCounterReset)},
},
}, samples)
}

func HistogramWithHint(idx int, hint histogram.CounterResetHint) *histogram.Histogram {
h := util_test.GenerateTestHistogram(idx)
h.CounterResetHint = hint
return h
}

type sample struct {
t int64
v float64
h *histogram.Histogram
fh *histogram.FloatHistogram
}

func writeRequestSingleSeries(lbls labels.Labels, samples []mimirpb.Sample) *mimirpb.WriteRequest {
req := &mimirpb.WriteRequest{
Source: mimirpb.API,
Expand Down
8 changes: 5 additions & 3 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator {
}
for i, cs := range css {
c.its[i] = newNonOverlappingIterator(c.its[i], cs, &c.hPool, &c.fhPool)
// TODO: pass into constructor instead but cba to fix all the tests rn...
c.its[i].id = i
}

for _, iter := range c.its {
Expand Down Expand Up @@ -138,7 +140,7 @@ func (c *mergeIterator) buildNextBatch(size int) chunkenc.ValueType {
// is before all iterators next entry.
for len(c.h) > 0 && (c.batches.len() == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) {
batch := c.h[0].Batch()
c.batches.merge(&batch, size)
c.batches.merge(&batch, size, c.h[0].id)

if c.h[0].Next(size) != chunkenc.ValNone {
heap.Fix(&c.h, 0)
Expand All @@ -165,7 +167,7 @@ func (c *mergeIterator) Err() error {
return c.currErr
}

type iteratorHeap []iterator
type iteratorHeap []*nonOverlappingIterator

func (h *iteratorHeap) Len() int { return len(*h) }
func (h *iteratorHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
Expand All @@ -177,7 +179,7 @@ func (h *iteratorHeap) Less(i, j int) bool {
}

func (h *iteratorHeap) Push(x interface{}) {
*h = append(*h, x.(iterator))
*h = append(*h, x.(*nonOverlappingIterator))
}

func (h *iteratorHeap) Pop() interface{} {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/batch/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestMergeIter(t *testing.T) {
for _, enc := range []chunk.Encoding{chunk.PrometheusXorChunk, chunk.PrometheusHistogramChunk, chunk.PrometheusFloatHistogramChunk} {
for _, enc := range []chunk.Encoding{/*chunk.PrometheusXorChunk,*/ chunk.PrometheusHistogramChunk/*, chunk.PrometheusFloatHistogramChunk*/} {
t.Run(enc.String(), func(t *testing.T) {
chunk1 := mkGenericChunk(t, 0, 100, enc)
chunk2 := mkGenericChunk(t, model.TimeFromUnix(25), 100, enc)
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/batch/non_overlapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type nonOverlappingIterator struct {
curr int
chunks []GenericChunk
iter chunkIterator
id int
}

// newNonOverlappingIterator returns a single iterator over a slice of sorted,
Expand Down
Loading
Loading