From de7bda42f6cb76d0dd97a9ff2d417f678d7258e4 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Wed, 22 Jan 2025 14:59:07 +0000 Subject: [PATCH 1/2] list_offsets: timequeries are inclusive This commit updates the docs for ListOffsetsAfterMilli to be explicit that timestamps returned are inclusive. Both Redpanda and Kafka use inclusive search conditions. Kafka: https://github.com/apache/kafka/blob/084fcbd3275ffd22ec0e1841b54de87d986afe0c/core/src/main/scala/kafka/log/UnifiedLog.scala#L1222-L1223 Redpanda: https://github.com/redpanda-data/redpanda/blob/997dd5a76b16b78e249b85843b00d591d2f91adc/src/v/storage/log_reader.cc#L605 --- pkg/kadm/metadata.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kadm/metadata.go b/pkg/kadm/metadata.go index 6b3f5b27..d436b10f 100644 --- a/pkg/kadm/metadata.go +++ b/pkg/kadm/metadata.go @@ -420,7 +420,7 @@ func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (L return cl.listOffsets(ctx, 1, -1, topics) } -// ListOffsetsAfterMilli returns the first offsets after the requested +// ListOffsetsAfterMilli returns the first offsets at or after the requested // millisecond timestamp. Unlike listing start/end/committed offsets, offsets // returned from this function also include the timestamp of the offset. If no // topics are specified, all topics are listed. If a partition has no offsets From 4303e98c1ded42648dc6791fd0e56713a9469cf9 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Wed, 22 Jan 2025 16:49:09 +0000 Subject: [PATCH 2/2] kfake: add a test for ListOffsetsAtOrAfterMilli --- pkg/kfake/go.mod | 1 + pkg/kfake/go.sum | 2 + pkg/kfake/issues_test.go | 128 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+) diff --git a/pkg/kfake/go.mod b/pkg/kfake/go.mod index fd073c16..325cdaec 100644 --- a/pkg/kfake/go.mod +++ b/pkg/kfake/go.mod @@ -6,6 +6,7 @@ toolchain go1.22.0 require ( github.com/twmb/franz-go v1.18.1 + github.com/twmb/franz-go/pkg/kadm v1.15.0 github.com/twmb/franz-go/pkg/kmsg v1.9.0 golang.org/x/crypto v0.32.0 ) diff --git a/pkg/kfake/go.sum b/pkg/kfake/go.sum index 98212ab9..183166c5 100644 --- a/pkg/kfake/go.sum +++ b/pkg/kfake/go.sum @@ -4,6 +4,8 @@ github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc= github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M= +github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= +github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= diff --git a/pkg/kfake/issues_test.go b/pkg/kfake/issues_test.go index 9ec2151c..99ed7eb3 100644 --- a/pkg/kfake/issues_test.go +++ b/pkg/kfake/issues_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" @@ -158,3 +159,130 @@ func TestIssue885(t *testing.T) { consumed += fs.NumRecords() } } + +func TestIssueTimestampInclusivity(t *testing.T) { + t.Skip("kfake needs #803 fixed to be able to properly serve this API, but this test has been validated against a real broker at least.") + + const ( + testTopic = "bar" + producedBatches = 5 + followerLogStart = 3 + ) + + c, err := NewCluster( + NumBrokers(2), + SeedTopics(1, testTopic), + ) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + // Inline anonymous function so that we can defer and cleanup within scope. + func() { + cl, err := kgo.NewClient( + kgo.DefaultProduceTopic(testTopic), + kgo.SeedBrokers(c.ListenAddrs()...), + ) + if err != nil { + t.Fatal(err) + } + defer cl.Close() + + for i := 0; i < producedBatches; i++ { + _, cancel := context.WithTimeout(context.Background(), 3*time.Second) + offset := i * 4 + r1 := kgo.StringRecord(strconv.Itoa(offset)) + r1.Timestamp = time.UnixMilli(10_000 + int64(offset)) + offset += 1 + r2 := kgo.StringRecord(strconv.Itoa(offset)) + r2.Timestamp = time.UnixMilli(10_000 + int64(offset)) + offset += 1 + r3 := kgo.StringRecord(strconv.Itoa(offset)) + r3.Timestamp = time.UnixMilli(10_000 + int64(offset)) + err := cl.ProduceSync(context.TODO(), r1, r2, r3).FirstErr() + cancel() + if err != nil { + t.Fatal(err) + } + } + }() + + cl, err := kgo.NewClient( + kgo.SeedBrokers(c.ListenAddrs()...), + ) + if err != nil { + t.Fatal(err) + } + defer cl.Close() + adm := kadm.NewClient(cl) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + tests := []struct { + SearchTimestamp int64 + ExpectedOffset int64 + ExpectedTimestamp int64 + }{ + { + SearchTimestamp: 99, + ExpectedOffset: 0, + ExpectedTimestamp: 10_000, + }, + { + SearchTimestamp: 10_000, + ExpectedOffset: 0, + ExpectedTimestamp: 10_000, + }, + { + SearchTimestamp: 10_001, + ExpectedOffset: 1, + ExpectedTimestamp: 10_001, + }, + { + SearchTimestamp: 10_003, + ExpectedOffset: 3, + ExpectedTimestamp: 10_004, + }, + { + SearchTimestamp: 10_004, + ExpectedOffset: 3, + ExpectedTimestamp: 10_004, + }, + { + SearchTimestamp: 10_015, + ExpectedOffset: 12, + ExpectedTimestamp: 10_016, + }, + { + SearchTimestamp: 10_018, + ExpectedOffset: 14, + ExpectedTimestamp: 10_018, + }, + { + SearchTimestamp: 11_000, + ExpectedOffset: 15, + ExpectedTimestamp: -1, + }, + } + for _, test := range tests { + offsets, err := adm.ListOffsetsAfterMilli(ctx, test.SearchTimestamp) + if err != nil { + t.Fatal(err) + } + offset, ok := offsets.Lookup(testTopic, 0) + if !ok { + t.Fatal("missing partition") + } + if offset.Offset != test.ExpectedOffset || offset.Timestamp != test.ExpectedTimestamp { + t.Fatalf( + "searching for %d got: %+v, want offset %d, timestamp %d", + test.SearchTimestamp, + offset, + test.ExpectedOffset, + test.ExpectedTimestamp, + ) + } + } +}