Skip to content

Commit

Permalink
kgo: add EnsureProduceConnectionIsOpen
Browse files Browse the repository at this point in the history
This can help reduce latency if you produce infrequently, but know
you'll be producing shortly.

Closes #807.
  • Loading branch information
twmb committed Oct 15, 2024
1 parent b66ceb7 commit d0d94b2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 1 deletion.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ linters:
- asciicheck
- bidichk
- bodyclose
- copyloopvar
- durationcheck
- exhaustive
- gocritic
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (p *pinReq) SetVersion(v int16) {
p.Request.SetVersion(v)
}

type forceOpenReq struct{ kmsg.Request }

type promisedReq struct {
ctx context.Context
req kmsg.Request
Expand Down Expand Up @@ -408,6 +410,12 @@ start:
default:
}

if _, isForceOpen := req.(*forceOpenReq); isForceOpen {
cxn.lastWrite.Store(time.Now().UnixNano())
pr.promise(nil, nil)
return
}

// Produce requests (and only produce requests) can be written
// without receiving a reply. If we see required acks is 0,
// then we immediately call the promise with no response.
Expand Down
75 changes: 75 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,81 @@ func (cl *Client) BufferedProduceBytes() int64 {
return cl.producer.bufferedBytes + cl.producer.blockedBytes
}

// EnsureProduceConnectionIsOpen attempts to open a produce connection to all
// specified brokers, or all brokers if `brokers` is empty or contains -1.
//
// This can be used in an attempt to reduce the latency when producing if your
// application produces infrequently: you can force open a produce connection a
// bit earlier than you intend to produce, rather than at the moment you
// produce. In rare circumstances, it is possible that a connection that was
// ensured to be open may close before you produce.
//
// This returns an errors.Join'd error that merges a message for all brokers
// that failed to be opened, as well as why.
//
// Note that if opening connections to all brokers, this only opens connections
// to all discovered brokers. If no internal metadata request has ever been
// issued and returned, this will not open a connection to any broker.
func (cl *Client) EnsureProduceConnectionIsOpen(ctx context.Context, brokers ...int32) error {
var (
keep = brokers[:0]
all bool
wg sync.WaitGroup
mu sync.Mutex
errs []error
)
for _, b := range brokers {
switch {
case b < -1:
case b == -1:
all = true
case b > -1:
keep = append(keep, b)
}
}
var toOpen []*broker
if all || len(brokers) == 0 {
cl.brokersMu.RLock()
toOpen = cl.brokers
cl.brokersMu.RUnlock()
} else {
for _, b := range brokers {
b := b
wg.Add(1)
go func() {
defer wg.Done()

br, err := cl.brokerOrErr(ctx, b, errUnknownBroker)

mu.Lock()
defer mu.Unlock()
if err != nil {
errs = append(errs, fmt.Errorf("%d: %w", b, err))
return
}
toOpen = append(toOpen, br)
}()
}
wg.Wait()
}

for _, br := range toOpen {
br := br
wg.Add(1)
br.do(ctx, &forceOpenReq{new(kmsg.ProduceRequest)}, func(_ kmsg.Response, err error) {
defer wg.Done()
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("%d: %w", br.meta.NodeID, err))
mu.Unlock()
}
})
}
wg.Wait()

return errors.Join(errs...)
}

type unknownTopicProduces struct {
buffered []promisedRec
wait chan error // retryable errors
Expand Down

0 comments on commit d0d94b2

Please sign in to comment.