Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo: add EnsureProduceConnectionIsOpen #839

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ linters:
- asciicheck
- bidichk
- bodyclose
- copyloopvar
- durationcheck
- exhaustive
- gocritic
Expand Down
8 changes: 8 additions & 0 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (p *pinReq) SetVersion(v int16) {
p.Request.SetVersion(v)
}

type forceOpenReq struct{ kmsg.Request }

type promisedReq struct {
ctx context.Context
req kmsg.Request
Expand Down Expand Up @@ -408,6 +410,12 @@ start:
default:
}

if _, isForceOpen := req.(*forceOpenReq); isForceOpen {
cxn.lastWrite.Store(time.Now().UnixNano())
pr.promise(nil, nil)
return
}

// Produce requests (and only produce requests) can be written
// without receiving a reply. If we see required acks is 0,
// then we immediately call the promise with no response.
Expand Down
75 changes: 75 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,81 @@ func (cl *Client) BufferedProduceBytes() int64 {
return cl.producer.bufferedBytes + cl.producer.blockedBytes
}

// EnsureProduceConnectionIsOpen attempts to open a produce connection to all
// specified brokers, or all brokers if `brokers` is empty or contains -1.
//
// This can be used in an attempt to reduce the latency when producing if your
// application produces infrequently: you can force open a produce connection a
// bit earlier than you intend to produce, rather than at the moment you
// produce. In rare circumstances, it is possible that a connection that was
// ensured to be open may close before you produce.
//
// This returns an errors.Join'd error that merges a message for all brokers
// that failed to be opened, as well as why.
//
// Note that if opening connections to all brokers, this only opens connections
// to all discovered brokers. If no internal metadata request has ever been
// issued and returned, this will not open a connection to any broker.
func (cl *Client) EnsureProduceConnectionIsOpen(ctx context.Context, brokers ...int32) error {
var (
keep = brokers[:0]
all bool
wg sync.WaitGroup
mu sync.Mutex
errs []error
)
for _, b := range brokers {
switch {
case b < -1:
case b == -1:
all = true
case b > -1:
keep = append(keep, b)
}
}
var toOpen []*broker
if all || len(brokers) == 0 {
cl.brokersMu.RLock()
toOpen = cl.brokers
cl.brokersMu.RUnlock()
} else {
for _, b := range brokers {
b := b
wg.Add(1)
go func() {
defer wg.Done()

br, err := cl.brokerOrErr(ctx, b, errUnknownBroker)

mu.Lock()
defer mu.Unlock()
if err != nil {
errs = append(errs, fmt.Errorf("%d: %w", b, err))
return
}
toOpen = append(toOpen, br)
}()
}
wg.Wait()
}

for _, br := range toOpen {
br := br
wg.Add(1)
br.do(ctx, &forceOpenReq{new(kmsg.ProduceRequest)}, func(_ kmsg.Response, err error) {
defer wg.Done()
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("%d: %w", br.meta.NodeID, err))
mu.Unlock()
}
})
}
wg.Wait()

return errors.Join(errs...)
}

type unknownTopicProduces struct {
buffered []promisedRec
wait chan error // retryable errors
Expand Down