Skip to content

Commit

Permalink
kgo tests: support TLS via KGO_TEST_TLS
Browse files Browse the repository at this point in the history
small dsl, hopefully simple enough
  • Loading branch information
twmb committed Dec 31, 2023
1 parent 98f95bc commit 5f0a3fa
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 27 deletions.
4 changes: 1 addition & 3 deletions pkg/kgo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ func TestUnknownGroupOffsetFetchPinned(t *testing.T) {
req := kmsg.NewOffsetFetchRequest()
req.Group = "unknown-" + strconv.FormatInt(time.Now().UnixNano(), 10)

cl, _ := NewClient(
getSeedBrokers(),
)
cl, _ := newTestClient()
defer cl.Close()
defer func() {
if err := recover(); err != nil {
Expand Down
24 changes: 8 additions & 16 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func TestIssue325(t *testing.T) {
topic, cleanup := tmpTopic(t)
defer cleanup()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
DefaultProduceTopic(topic),
UnknownTopicRetries(-1),
)
Expand All @@ -45,8 +44,7 @@ func TestIssue337(t *testing.T) {
topic, cleanup := tmpTopicPartitions(t, 2)
defer cleanup()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
DefaultProduceTopic(topic),
RecordPartitioner(ManualPartitioner()),
UnknownTopicRetries(-1),
Expand Down Expand Up @@ -92,8 +90,7 @@ func TestDirectPartitionPurge(t *testing.T) {
topic, cleanup := tmpTopicPartitions(t, 2)
defer cleanup()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
DefaultProduceTopic(topic),
RecordPartitioner(ManualPartitioner()),
UnknownTopicRetries(-1),
Expand Down Expand Up @@ -155,8 +152,7 @@ func TestIssue434(t *testing.T) {
defer cleanup1()
defer cleanup2()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
UnknownTopicRetries(-1),
ConsumeTopics(fmt.Sprintf("(%s|%s)", t1, t2)),
ConsumeRegex(),
Expand Down Expand Up @@ -209,8 +205,7 @@ func TestAddRemovePartitions(t *testing.T) {
t1, cleanup := tmpTopicPartitions(t, 2)
defer cleanup()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
UnknownTopicRetries(-1),
RecordPartitioner(ManualPartitioner()),
FetchMaxWait(100*time.Millisecond),
Expand Down Expand Up @@ -278,8 +273,7 @@ func TestPauseIssue489(t *testing.T) {
t1, cleanup := tmpTopicPartitions(t, 3)
defer cleanup()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
UnknownTopicRetries(-1),
DefaultProduceTopic(t1),
RecordPartitioner(ManualPartitioner()),
Expand Down Expand Up @@ -360,8 +354,7 @@ func TestPauseIssueOct2023(t *testing.T) {
defer cleanup3()
ts := []string{t1, t2, t3}

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
UnknownTopicRetries(-1),
ConsumeTopics(ts...),
MetadataMinAge(50*time.Millisecond),
Expand Down Expand Up @@ -438,8 +431,7 @@ func TestIssue523(t *testing.T) {
g1, gcleanup := tmpGroup(t)
defer gcleanup()

cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
DefaultProduceTopic(t1),
ConsumeTopics(".*"+t1+".*"),
ConsumeRegex(),
Expand Down
6 changes: 2 additions & 4 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func TestGroupETL(t *testing.T) {
////////////////////

go func() {
cl, _ := NewClient(
getSeedBrokers(),
cl, _ := newTestClient(
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
MaxBufferedRecords(10000),
MaxBufferedBytes(50000),
Expand Down Expand Up @@ -118,7 +117,6 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
netls := 0 // for if etlsBeforeQuit is non-negative

opts := []Opt{
getSeedBrokers(),
UnknownTopicRetries(-1), // see txn_test comment
WithLogger(testLogger()),
ConsumerGroup(c.group),
Expand Down Expand Up @@ -152,7 +150,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
OnPartitionsLost(func(context.Context, *Client, map[string][]int32) {}),
}

cl, _ := NewClient(opts...)
cl, _ := newTestClient(opts...)
defer cl.Close()

defer func() {
Expand Down
54 changes: 53 additions & 1 deletion pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kgo
import (
"context"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -36,6 +38,9 @@ var (
// cannot use EndAndBeginTransaction with EndBeginTxnUnsafe.
allowUnsafe = false

// DSL syntax is ({ca|cert|key}:path),{1,3}
testCert *tls.Config

// We create topics with a different number of partitions to exercise
// a few extra code paths; we index into npartitions with npartitionsAt,
// an atomic that we modulo after load.
Expand All @@ -45,7 +50,7 @@ var (

func init() {
var err error
adm, err = NewClient(getSeedBrokers())
adm, err = newTestClient()
if err != nil {
panic(fmt.Sprintf("unable to create admin client: %v", err))
}
Expand All @@ -62,6 +67,53 @@ func init() {
if _, exists := os.LookupEnv("KGO_TEST_UNSAFE"); exists {
allowUnsafe = true
}
if paths, exists := os.LookupEnv("KGO_TEST_TLS"); exists {
var caPath, certPath, keyPath string
for _, path := range strings.Split(paths, ",") {
switch {
case strings.HasPrefix(path, "ca:"):
caPath = path[3:]
case strings.HasPrefix(path, "cert:"):
certPath = path[5:]
case strings.HasPrefix(path, "key:"):
keyPath = path[4:]
default:
panic(fmt.Sprintf("invalid tls format %q", path))
}
}
if caPath != "" {
ca, err := os.ReadFile(caPath)

Check failure on line 85 in pkg/kgo/helpers_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

G304: Potential file inclusion via variable (gosec)
if err != nil {
panic(fmt.Sprintf("unable to read ca: %v", err))
}
testCert = &tls.Config{RootCAs: x509.NewCertPool()}

Check failure on line 89 in pkg/kgo/helpers_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint on amd64

G402: TLS MinVersion too low. (gosec)
if !testCert.RootCAs.AppendCertsFromPEM(ca) {
panic("unable to append ca")
}
}
if certPath != "" || keyPath != "" {
if certPath == "" || keyPath == "" {
panic("both cert path and key path must be specified")
}
cert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
panic(fmt.Sprintf("unable to load cert/key pair: %v", err))
}
testCert.Certificates = append(testCert.Certificates, cert)
}
}
}

func testClientOpts(opts ...Opt) []Opt {
opts = append(opts, getSeedBrokers())
if testCert != nil {
opts = append(opts, DialTLSConfig(testCert))
}
return opts
}

func newTestClient(opts ...Opt) (*Client, error) {
return NewClient(testClientOpts(opts...)...)
}

func getSeedBrokers() Opt {
Expand Down
5 changes: 2 additions & 3 deletions pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ func TestTxnEtl(t *testing.T) {
////////////////////

go func() {
cl, err := NewClient(
getSeedBrokers(),
cl, err := newTestClient(
WithLogger(BasicLogger(os.Stderr, testLogLevel, func() string {
return time.Now().UTC().Format("15:04:05.999") + " "
})),
Expand Down Expand Up @@ -139,7 +138,6 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
defer c.wg.Done()

opts := []Opt{
getSeedBrokers(),
// Kraft sometimes returns success from topic creation, and
// then returns UnknownTopicXyz for a while in metadata loads.
// It also returns NotLeaderXyz; we handle both problems.
Expand All @@ -160,6 +158,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
if requireStableFetch {
opts = append(opts, RequireStableFetchOffsets())
}
opts = append(opts, testClientOpts()...)

txnSess, _ := NewGroupTransactSession(opts...)
defer txnSess.Close()
Expand Down

0 comments on commit 5f0a3fa

Please sign in to comment.