diff --git a/.golangci.yml b/.golangci.yml index 193366c2..9fd813e8 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,7 +24,6 @@ linters: - asciicheck - bidichk - bodyclose - - copyloopvar - durationcheck - exhaustive - gocritic diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index c3d5a9a7..5a3db1b2 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -41,6 +41,8 @@ func (p *pinReq) SetVersion(v int16) { p.Request.SetVersion(v) } +type forceOpenReq struct{ kmsg.Request } + type promisedReq struct { ctx context.Context req kmsg.Request @@ -408,6 +410,12 @@ start: default: } + if _, isForceOpen := req.(*forceOpenReq); isForceOpen { + cxn.lastWrite.Store(time.Now().UnixNano()) + pr.promise(nil, nil) + return + } + // Produce requests (and only produce requests) can be written // without receiving a reply. If we see required acks is 0, // then we immediately call the promise with no response. diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 1ea2a29f..322439db 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -101,6 +101,81 @@ func (cl *Client) BufferedProduceBytes() int64 { return cl.producer.bufferedBytes + cl.producer.blockedBytes } +// EnsureProduceConnectionIsOpen attempts to open a produce connection to all +// specified brokers, or all brokers if `brokers` is empty or contains -1. +// +// This can be used in an attempt to reduce the latency when producing if your +// application produces infrequently: you can force open a produce connection a +// bit earlier than you intend to produce, rather than at the moment you +// produce. In rare circumstances, it is possible that a connection that was +// ensured to be open may close before you produce. +// +// This returns an errors.Join'd error that merges a message for all brokers +// that failed to be opened, as well as why. +// +// Note that if opening connections to all brokers, this only opens connections +// to all discovered brokers. If no internal metadata request has ever been +// issued and returned, this will not open a connection to any broker. +func (cl *Client) EnsureProduceConnectionIsOpen(ctx context.Context, brokers ...int32) error { + var ( + keep = brokers[:0] + all bool + wg sync.WaitGroup + mu sync.Mutex + errs []error + ) + for _, b := range brokers { + switch { + case b < -1: + case b == -1: + all = true + case b > -1: + keep = append(keep, b) + } + } + var toOpen []*broker + if all || len(brokers) == 0 { + cl.brokersMu.RLock() + toOpen = cl.brokers + cl.brokersMu.RUnlock() + } else { + for _, b := range brokers { + b := b + wg.Add(1) + go func() { + defer wg.Done() + + br, err := cl.brokerOrErr(ctx, b, errUnknownBroker) + + mu.Lock() + defer mu.Unlock() + if err != nil { + errs = append(errs, fmt.Errorf("%d: %w", b, err)) + return + } + toOpen = append(toOpen, br) + }() + } + wg.Wait() + } + + for _, br := range toOpen { + br := br + wg.Add(1) + br.do(ctx, &forceOpenReq{new(kmsg.ProduceRequest)}, func(_ kmsg.Response, err error) { + defer wg.Done() + if err != nil { + mu.Lock() + errs = append(errs, fmt.Errorf("%d: %w", br.meta.NodeID, err)) + mu.Unlock() + } + }) + } + wg.Wait() + + return errors.Join(errs...) +} + type unknownTopicProduces struct { buffered []promisedRec wait chan error // retryable errors