diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 92ebeaa3..c7069d4b 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -160,10 +160,11 @@ type cfg struct { balancers []GroupBalancer // balancers we can use protocol string // "consumer" by default, expected to never be overridden - sessionTimeout time.Duration - rebalanceTimeout time.Duration - heartbeatInterval time.Duration - requireStable bool + sessionTimeout time.Duration + rebalanceTimeout time.Duration + heartbeatInterval time.Duration + requireStable bool + assumeConsumersRequireStable bool onAssigned func(context.Context, *Client, map[string][]int32) onRevoked func(context.Context, *Client, map[string][]int32) @@ -1503,10 +1504,21 @@ func HeartbeatInterval(interval time.Duration) GroupOpt { // transactional timeouts to a small value (10s) rather than the default 60s. // Lowering the transactional timeout will reduce the chance that consumers are // entirely blocked. +// +// If all consumers in your group also require stable fetch offsets, you may +// want to additionally use [AssumeConsumersRequireStable]. func RequireStableFetchOffsets() GroupOpt { return groupOpt{func(cfg *cfg) { cfg.requireStable = true }} } +// AssumeConsumersRequireStable opts the [GroupTransactSession] into NOT +// aborting whenever rebalance occur (i.e., opts this client into assuming +// every other client in the group also requires stable offsets). This +// option should be used in tandem with [RequireStableFetchOffsets]. +func AssumeConsumersRequireStable() GroupOpt { + return groupOpt{func(cfg *cfg) { cfg.assumeConsumersRequireStable = true }} +} + // BlockRebalanceOnPoll switches the client to block rebalances whenever you // poll until you explicitly call AllowRebalance. This option also ensures that // any OnPartitions{Assigned,Revoked,Lost} callbacks are only called when you diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index c0fa8d28..a604e911 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -101,6 +101,7 @@ func TestGroupETL(t *testing.T) { errs, false, tc.balancer, + false, ) }) } diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 078fbf77..7957dacc 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -404,9 +404,9 @@ func (c *testConsumer) wait() { c.wg.Wait() } -func (c *testConsumer) goRun(transactional bool, etlsBeforeQuit int) { +func (c *testConsumer) goRun(transactional bool, etlsBeforeQuit int, assumeStable bool) { if transactional { - c.goTransact(etlsBeforeQuit) + c.goTransact(etlsBeforeQuit, assumeStable) } else { c.goGroupETL(etlsBeforeQuit) } @@ -419,6 +419,7 @@ func testChainETL( errs chan error, transactional bool, balancer GroupBalancer, + assumeStable bool, ) { var ( ///////////// @@ -484,24 +485,24 @@ func testChainETL( //////////////////// for i := 0; i < 3; i++ { // three consumers start with standard poll&commit behavior - consumers1.goRun(transactional, -1) - consumers2.goRun(transactional, -1) - consumers3.goRun(transactional, -1) + consumers1.goRun(transactional, -1, assumeStable) + consumers2.goRun(transactional, -1, assumeStable) + consumers3.goRun(transactional, -1, assumeStable) } - consumers1.goRun(transactional, 0) // bail immediately - consumers1.goRun(transactional, 2) // bail after two txns - consumers2.goRun(transactional, 2) // same + consumers1.goRun(transactional, 0, assumeStable) // bail immediately + consumers1.goRun(transactional, 2, assumeStable) // bail after two txns + consumers2.goRun(transactional, 2, assumeStable) // same time.Sleep(5 * time.Second) for i := 0; i < 3; i++ { // trigger rebalance after 5s with more consumers - consumers1.goRun(transactional, -1) - consumers2.goRun(transactional, -1) - consumers3.goRun(transactional, -1) + consumers1.goRun(transactional, -1, assumeStable) + consumers2.goRun(transactional, -1, assumeStable) + consumers3.goRun(transactional, -1, assumeStable) } - consumers2.goRun(transactional, 0) // bail immediately - consumers1.goRun(transactional, 1) // bail after one txn + consumers2.goRun(transactional, 0, assumeStable) // bail immediately + consumers1.goRun(transactional, 1, assumeStable) // bail after one txn doneConsume := make(chan struct{}) go func() { diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 25cfd443..f62f6218 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -31,8 +31,10 @@ const ( // (EOS). // // If you are running Kafka 2.5+, it is strongly recommended that you also use -// RequireStableFetchOffsets. See that config option's documentation for more -// details. +// [RequireStableFetchOffsets]. See that config option's documentation for more +// details. By default, if the client detects any rebalance, any active transaction +// is aborted for safety. You can use the [AssumeConsumersRequireStable] to opt into +// NOT aborting automatically on rebalance. See issue 754 for more detail. type GroupTransactSession struct { cl *Client @@ -94,18 +96,27 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) { userRevoked := cfg.onRevoked cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) { s.failMu.Lock() - defer s.failMu.Unlock() if s.revoked { + s.failMu.Unlock() return } if cl.consumer.g.cooperative.Load() && len(rev) == 0 && !s.revoked { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit") + } else if cl.cfg.assumeConsumersRequireStable { + cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke, but we are assuming all consumers require stable; allowing commit while in user revoked") + defer func() { + s.failMu.Lock() + s.revoked = true + close(s.revokedCh) + s.failMu.Unlock() + }() } else { cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction") s.revoked = true close(s.revokedCh) } + s.failMu.Unlock() if userRevoked != nil { userRevoked(ctx, cl, rev) @@ -115,14 +126,15 @@ func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) { userLost := cfg.onLost cfg.onLost = func(ctx context.Context, cl *Client, lost map[string][]int32) { s.failMu.Lock() - defer s.failMu.Unlock() if s.lost { + s.failMu.Unlock() return } cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction") s.lost = true close(s.lostCh) + s.failMu.Unlock() if userLost != nil { userLost(ctx, cl, lost) diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index fc1fdbe8..2fcd27e7 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -108,13 +108,15 @@ func TestTxnEtl(t *testing.T) { //////////////////////////// for _, tc := range []struct { - name string - balancer GroupBalancer + name string + balancer GroupBalancer + assumeStable bool }{ - {"roundrobin", RoundRobinBalancer()}, - {"range", RangeBalancer()}, - {"sticky", StickyBalancer()}, - {"cooperative-sticky", CooperativeStickyBalancer()}, + {"roundrobin", RoundRobinBalancer(), false}, + {"range", RangeBalancer(), true}, + {"sticky", StickyBalancer(), false}, + {"cooperative-sticky", CooperativeStickyBalancer(), true}, + {"cooperative-sticky", CooperativeStickyBalancer(), false}, } { t.Run(tc.name, func(t *testing.T) { testChainETL( @@ -124,17 +126,18 @@ func TestTxnEtl(t *testing.T) { errs, true, tc.balancer, + tc.assumeStable, ) }) } } -func (c *testConsumer) goTransact(txnsBeforeQuit int) { +func (c *testConsumer) goTransact(txnsBeforeQuit int, assumeStable bool) { c.wg.Add(1) - go c.transact(txnsBeforeQuit) + go c.transact(txnsBeforeQuit, assumeStable) } -func (c *testConsumer) transact(txnsBeforeQuit int) { +func (c *testConsumer) transact(txnsBeforeQuit int, assumeStable bool) { defer c.wg.Done() opts := []Opt{ @@ -155,12 +158,21 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { Balancers(c.balancer), MaxBufferedRecords(10000), } - if requireStableFetch { - opts = append(opts, RequireStableFetchOffsets()) + var txnSess *GroupTransactSession + if assumeStable { + opts = append(opts, OnPartitionsRevoked(func(ctx context.Context, cl *Client, _ map[string][]int32) { + txnSess.End(ctx, TryCommit) + })) + } + if requireStableFetch || assumeStable { + opts = append(opts, + RequireStableFetchOffsets(), + AssumeConsumersRequireStable(), + ) } opts = append(opts, testClientOpts()...) - txnSess, _ := NewGroupTransactSession(opts...) + txnSess, _ = NewGroupTransactSession(opts...) defer txnSess.Close() ntxns := 0 // for if txnsBeforeQuit is non-negative