Skip to content

Commit

Permalink
franz-go: test against redpanda
Browse files Browse the repository at this point in the history
Turns out, redpanda does not like the EndBeginTxnUnsafe option.
That's valid.
  • Loading branch information
twmb committed Oct 19, 2022
1 parent 9387634 commit 31e6e37
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
24 changes: 21 additions & 3 deletions .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
echo "staticcheck ./..."
staticcheck -checks 'all,-ST1003,-SA1012,-ST1016,-SA1019,-SA2001' ./... # actually contains atomicalign check
integration-test:
integration-test-kafka:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
Expand All @@ -80,11 +80,29 @@ jobs:
KAFKA_CFG_BROKER_ID: 1
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID
# BITNAMI_DEBUG: true # Enable this to get more info on startup failures
steps:
- uses: actions/checkout@v3
- run: go test ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: kafka:9092
KGO_TEST_RECORDS: 50000
KGO_TEST_UNSAFE: true
KGO_TEST_STABLE_FETCH: true

integration-test-redpanda:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
name: "integration test redpanda"
container: golang:1.19.2
services:
redpanda:
image: vectorized/redpanda-nightly:latest
ports:
- 9092:9092
steps:
- uses: actions/checkout@v3
- run: go test ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: redpanda:9092
17 changes: 17 additions & 0 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ var (
adm *Client
testrf = 3
testRecordLimit = 500000

// Kraft sometimes has massive hangs internally when completing
// transactions. Against zk Kafka as well as Redpanda, we could rely on
// our internal mitigations to never have KIP-447 problems. Not true
// against Kraft, see #223.
requireStableFetch = false

// Redpanda is a bit more strict with transactions: we must wait for
// EndTxn to return successfully before beginning a new transaction. We
// cannot use EndAndBeginTransaction with EndBeginTxnUnsafe.
allowUnsafe = false
)

func init() {
Expand All @@ -37,6 +48,12 @@ func init() {
if n, _ := strconv.Atoi(os.Getenv("KGO_TEST_RECORDS")); n > 0 {
testRecordLimit = n
}
if _, exists := os.LookupEnv("KGO_TEST_STABLE_FETCH"); exists {
requireStableFetch = true
}
if _, exists := os.LookupEnv("KGO_TEST_UNSAFE"); exists {
allowUnsafe = true
}
}

func getSeedBrokers() Opt {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ const (
// application with the SAME transactional ID and produce to all the
// same partitions to ensure to resume the transaction and unstick the
// partitions.
//
// Also note: this option does not work on all broker implementations.
// This relies on Kafka internals. Some brokers (notably Redpanda) are
// more strict with enforcing transaction correctness and this option
// cannot be used and will cause errors.
EndBeginTxnUnsafe
)

Expand Down
19 changes: 10 additions & 9 deletions pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestTxnEtl(t *testing.T) {
// we commit and begin a new one.
if i > 0 && i%10000 == 0 {
how := EndBeginTxnSafe
if safeUnsafe {
if safeUnsafe && allowUnsafe {
how = EndBeginTxnUnsafe
}
safeUnsafe = !safeUnsafe
Expand Down Expand Up @@ -136,19 +136,15 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) {

func (c *testConsumer) transact(txnsBeforeQuit int) {
defer c.wg.Done()
txnSess, _ := NewGroupTransactSession(

opts := []Opt{
getSeedBrokers(),
// Kraft sometimes has massive hangs internally when completing
// transactions. Against zk Kafka, we could rely on our
// internal mitigations to never have KIP-447 problems.
// Not true against Kraft, see #223.
RequireStableFetchOffsets(),
// Kraft sometimes returns success from topic creation, and
// then returns UnknownTopicXyz for a while in metadata loads.
// It also returns NotLeaderXyz; we handle both problems.
UnknownTopicRetries(-1),
TransactionalID(randsha()),
TransactionTimeout(10*time.Second),
TransactionTimeout(10 * time.Second),
WithLogger(testLogger()),
// Control records have their own unique offset, so for testing,
// we keep the record to ensure we do not doubly consume control
Expand All @@ -159,7 +155,12 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
FetchIsolationLevel(ReadCommitted()),
Balancers(c.balancer),
MaxBufferedRecords(10000),
)
}
if requireStableFetch {
opts = append(opts, RequireStableFetchOffsets())
}

txnSess, _ := NewGroupTransactSession(opts...)
defer txnSess.Close()

ntxns := 0 // for if txnsBeforeQuit is non-negative
Expand Down

0 comments on commit 31e6e37

Please sign in to comment.