From dc77358dbfa9498d501f4ba89adabc787d709afb Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Dec 2023 23:39:07 -0700 Subject: [PATCH 1/5] kgo tests: support TLS via KGO_TEST_TLS small dsl, hopefully simple enough --- pkg/kgo/client_test.go | 4 +-- pkg/kgo/consumer_direct_test.go | 24 +++++-------- pkg/kgo/group_test.go | 6 ++-- pkg/kgo/helpers_test.go | 61 ++++++++++++++++++++++++++++++++- pkg/kgo/txn_test.go | 5 ++- 5 files changed, 73 insertions(+), 27 deletions(-) diff --git a/pkg/kgo/client_test.go b/pkg/kgo/client_test.go index 8b29fa11..968609ee 100644 --- a/pkg/kgo/client_test.go +++ b/pkg/kgo/client_test.go @@ -113,9 +113,7 @@ func TestUnknownGroupOffsetFetchPinned(t *testing.T) { req := kmsg.NewOffsetFetchRequest() req.Group = "unknown-" + strconv.FormatInt(time.Now().UnixNano(), 10) - cl, _ := NewClient( - getSeedBrokers(), - ) + cl, _ := newTestClient() defer cl.Close() defer func() { if err := recover(); err != nil { diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index f4ddc4db..8f74fa89 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -18,8 +18,7 @@ func TestIssue325(t *testing.T) { topic, cleanup := tmpTopic(t) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(topic), UnknownTopicRetries(-1), ) @@ -45,8 +44,7 @@ func TestIssue337(t *testing.T) { topic, cleanup := tmpTopicPartitions(t, 2) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(topic), RecordPartitioner(ManualPartitioner()), UnknownTopicRetries(-1), @@ -92,8 +90,7 @@ func TestDirectPartitionPurge(t *testing.T) { topic, cleanup := tmpTopicPartitions(t, 2) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(topic), RecordPartitioner(ManualPartitioner()), UnknownTopicRetries(-1), @@ -155,8 +152,7 @@ func TestIssue434(t *testing.T) { defer cleanup1() defer cleanup2() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)), ConsumeRegex(), @@ -209,8 +205,7 @@ func TestAddRemovePartitions(t *testing.T) { t1, cleanup := tmpTopicPartitions(t, 2) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), RecordPartitioner(ManualPartitioner()), FetchMaxWait(100*time.Millisecond), @@ -278,8 +273,7 @@ func TestPauseIssue489(t *testing.T) { t1, cleanup := tmpTopicPartitions(t, 3) defer cleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), DefaultProduceTopic(t1), RecordPartitioner(ManualPartitioner()), @@ -360,8 +354,7 @@ func TestPauseIssueOct2023(t *testing.T) { defer cleanup3() ts := []string{t1, t2, t3} - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( UnknownTopicRetries(-1), ConsumeTopics(ts...), MetadataMinAge(50*time.Millisecond), @@ -438,8 +431,7 @@ func TestIssue523(t *testing.T) { g1, gcleanup := tmpGroup(t) defer gcleanup() - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( DefaultProduceTopic(t1), ConsumeTopics(".*"+t1+".*"), ConsumeRegex(), diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 6e06da23..c0fa8d28 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -35,8 +35,7 @@ func TestGroupETL(t *testing.T) { //////////////////// go func() { - cl, _ := NewClient( - getSeedBrokers(), + cl, _ := newTestClient( WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), MaxBufferedBytes(50000), @@ -118,7 +117,6 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { netls := 0 // for if etlsBeforeQuit is non-negative opts := []Opt{ - getSeedBrokers(), UnknownTopicRetries(-1), // see txn_test comment WithLogger(testLogger()), ConsumerGroup(c.group), @@ -152,7 +150,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { OnPartitionsLost(func(context.Context, *Client, map[string][]int32) {}), } - cl, _ := NewClient(opts...) + cl, _ := newTestClient(opts...) defer cl.Close() defer func() { diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index f7b8e97f..8f009021 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -3,6 +3,8 @@ package kgo import ( "context" "crypto/sha256" + "crypto/tls" + "crypto/x509" "encoding/hex" "errors" "fmt" @@ -36,6 +38,9 @@ var ( // cannot use EndAndBeginTransaction with EndBeginTxnUnsafe. allowUnsafe = false + // DSL syntax is ({ca|cert|key}:path),{1,3} + testCert *tls.Config + // We create topics with a different number of partitions to exercise // a few extra code paths; we index into npartitions with npartitionsAt, // an atomic that we modulo after load. @@ -45,7 +50,7 @@ var ( func init() { var err error - adm, err = NewClient(getSeedBrokers()) + adm, err = newTestClient() if err != nil { panic(fmt.Sprintf("unable to create admin client: %v", err)) } @@ -62,6 +67,60 @@ func init() { if _, exists := os.LookupEnv("KGO_TEST_UNSAFE"); exists { allowUnsafe = true } + if paths, exists := os.LookupEnv("KGO_TEST_TLS"); exists { + var caPath, certPath, keyPath string + for _, path := range strings.Split(paths, ",") { + switch { + case strings.HasPrefix(path, "ca:"): + caPath = path[3:] + case strings.HasPrefix(path, "cert:"): + certPath = path[5:] + case strings.HasPrefix(path, "key:"): + keyPath = path[4:] + default: + panic(fmt.Sprintf("invalid tls format %q", path)) + } + } + inittls := func() { + if testCert == nil { + testCert = &tls.Config{MinVersion: tls.VersionTLS12} + } + } + if caPath != "" { + ca, err := os.ReadFile(caPath) //nolint:gosec // we are deliberately including a file from a variable + if err != nil { + panic(fmt.Sprintf("unable to read ca: %v", err)) + } + inittls() + testCert.RootCAs = x509.NewCertPool() + if !testCert.RootCAs.AppendCertsFromPEM(ca) { + panic("unable to append ca") + } + } + if certPath != "" || keyPath != "" { + if certPath == "" || keyPath == "" { + panic("both cert path and key path must be specified") + } + cert, err := tls.LoadX509KeyPair(certPath, keyPath) + if err != nil { + panic(fmt.Sprintf("unable to load cert/key pair: %v", err)) + } + inittls() + testCert.Certificates = append(testCert.Certificates, cert) + } + } +} + +func testClientOpts(opts ...Opt) []Opt { + opts = append(opts, getSeedBrokers()) + if testCert != nil { + opts = append(opts, DialTLSConfig(testCert)) + } + return opts +} + +func newTestClient(opts ...Opt) (*Client, error) { + return NewClient(testClientOpts(opts...)...) } func getSeedBrokers() Opt { diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 63a6f5c8..fc1fdbe8 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -26,8 +26,7 @@ func TestTxnEtl(t *testing.T) { //////////////////// go func() { - cl, err := NewClient( - getSeedBrokers(), + cl, err := newTestClient( WithLogger(BasicLogger(os.Stderr, testLogLevel, func() string { return time.Now().UTC().Format("15:04:05.999") + " " })), @@ -139,7 +138,6 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() opts := []Opt{ - getSeedBrokers(), // Kraft sometimes returns success from topic creation, and // then returns UnknownTopicXyz for a while in metadata loads. // It also returns NotLeaderXyz; we handle both problems. @@ -160,6 +158,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { if requireStableFetch { opts = append(opts, RequireStableFetchOffsets()) } + opts = append(opts, testClientOpts()...) txnSess, _ := NewGroupTransactSession(opts...) defer txnSess.Close() From 1f1c1c24cf8f7065ddeb155e120caa0e490406c3 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Dec 2023 23:47:03 -0700 Subject: [PATCH 2/5] kgo tests: add KGO_TEST_SCRAM --- pkg/kgo/helpers_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 8f009021..1d4a57ab 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -20,6 +20,8 @@ import ( "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/sasl" + "github.com/twmb/franz-go/pkg/sasl/scram" ) var ( @@ -38,9 +40,12 @@ var ( // cannot use EndAndBeginTransaction with EndBeginTxnUnsafe. allowUnsafe = false - // DSL syntax is ({ca|cert|key}:path),{1,3} + // KGO_TEST_TLS: DSL syntax is ({ca|cert|key}:path),{1,3} testCert *tls.Config + // KGO_TEST_SCRAM: DSL is user:pass; we require SCRAM-SHA-256 + saslScram sasl.Mechanism + // We create topics with a different number of partitions to exercise // a few extra code paths; we index into npartitions with npartitionsAt, // an atomic that we modulo after load. @@ -109,6 +114,16 @@ func init() { testCert.Certificates = append(testCert.Certificates, cert) } } + if saslStr, exists := os.LookupEnv("KGO_TEST_SCRAM"); exists { + split := strings.Split(saslStr, ":") + if len(split) != 2 { + panic(fmt.Sprintf("invalid scram format %q", saslStr)) + } + saslScram = (scram.Auth{ + User: split[0], + Pass: split[1], + }).AsSha256Mechanism() + } } func testClientOpts(opts ...Opt) []Opt { @@ -116,6 +131,9 @@ func testClientOpts(opts ...Opt) []Opt { if testCert != nil { opts = append(opts, DialTLSConfig(testCert)) } + if saslScram != nil { + opts = append(opts, SASL(saslScram)) + } return opts } From faa45b984caec8f62d9a3c37a9507f6872ddba00 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Dec 2023 23:52:24 -0700 Subject: [PATCH 3/5] kgo tests: support 512 in KGO_TEST_SCRAM --- pkg/kgo/helpers_test.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 1d4a57ab..a36cc5c0 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -43,7 +43,7 @@ var ( // KGO_TEST_TLS: DSL syntax is ({ca|cert|key}:path),{1,3} testCert *tls.Config - // KGO_TEST_SCRAM: DSL is user:pass; we require SCRAM-SHA-256 + // KGO_TEST_SCRAM: DSL is user:pass(:num); we assume 256 saslScram sasl.Mechanism // We create topics with a different number of partitions to exercise @@ -116,13 +116,26 @@ func init() { } if saslStr, exists := os.LookupEnv("KGO_TEST_SCRAM"); exists { split := strings.Split(saslStr, ":") - if len(split) != 2 { + if len(split) != 2 && len(split) != 3 { panic(fmt.Sprintf("invalid scram format %q", saslStr)) } - saslScram = (scram.Auth{ + a := scram.Auth{ User: split[0], Pass: split[1], - }).AsSha256Mechanism() + } + saslScram = a.AsSha256Mechanism() + if len(split) == 3 { + n, err := strconv.Atoi(split[2]) + if err != nil { + panic(fmt.Sprintf("invalid scram alg %q: %v", split[2], err)) + } + if n != 256 && n != 512 { + panic(fmt.Sprintf("invalid scram alg %q: must be 256 or 512", split[2])) + } + if n == 512 { + saslScram = a.AsSha512Mechanism() + } + } } } From b08a33800e52bbda673da77b48ac48670d83ac68 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 30 Dec 2023 23:55:17 -0700 Subject: [PATCH 4/5] kgo tests: initialize admin client after env var parsing --- pkg/kgo/helpers_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index a36cc5c0..0460f7b8 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -55,11 +55,6 @@ var ( func init() { var err error - adm, err = newTestClient() - if err != nil { - panic(fmt.Sprintf("unable to create admin client: %v", err)) - } - if n, _ := strconv.Atoi(os.Getenv("KGO_TEST_RF")); n > 0 { testrf = n } @@ -137,6 +132,10 @@ func init() { } } } + adm, err = newTestClient() + if err != nil { + panic(fmt.Sprintf("unable to create admin client: %v", err)) + } } func testClientOpts(opts ...Opt) []Opt { From afd95b8216a3c7c66fe1e10a3306dfc0517ce1bf Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 31 Dec 2023 00:16:14 -0700 Subject: [PATCH 5/5] kgo tests: allow lowering test topic partitions --- pkg/kgo/helpers_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 0460f7b8..ccf32865 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -49,6 +49,8 @@ var ( // We create topics with a different number of partitions to exercise // a few extra code paths; we index into npartitions with npartitionsAt, // an atomic that we modulo after load. + // + // We can lower bound these numbers with KGO_TEST_MAX_TOPIC_PARTS. npartitions = []int{7, 11, 31} npartitionsAt int64 ) @@ -132,6 +134,20 @@ func init() { } } } + if maxTParts, exists := os.LookupEnv("KGO_TEST_MAX_TOPIC_PARTS"); exists { + n, err := strconv.Atoi(maxTParts) + if err != nil { + panic(fmt.Sprintf("invalid max topic parts %q: %v", maxTParts, err)) + } + if n < 1 { + n = 1 + } + for i, v := range npartitions { + if v > n { + npartitions[i] = n + } + } + } adm, err = newTestClient() if err != nil { panic(fmt.Sprintf("unable to create admin client: %v", err))