Skip to content

Commit

Permalink
Merge pull request #715 from twmb/tiny
Browse files Browse the repository at this point in the history
misc
  • Loading branch information
twmb authored Apr 30, 2024
2 parents 6a58760 + c08635f commit 8b53958
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 17 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/i386.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Inspired by github.com/IBM/sarama/pull/2874 (MIT)

name: i386

on:
push:
branches: ["*"]
paths-ignore:
- '**/*.md'
pull_request:
branches: ["*"]
paths-ignore:
- '**/*.md'

jobs:
atomicalign:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: 'stable'
- name: staticcheck
env:
GOARCH: 386
GOFLAGS: -tags=functional
run: |
git clone --depth=1 https://github.com/dominikh/go-tools /tmp/go-tools
( cd /tmp/go-tools/cmd/staticcheck && go build -o /tmp/staticcheck )
/tmp/staticcheck -checks SA1027 ./...
4 changes: 1 addition & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# We may as well allow multiple golangci-lint invocations at once.
run:
allow-parallel-runners: true
go: "1.21"
go: "1.22"

# golangci-lint by default ignores some staticcheck and vet raised issues that
# are actually important to catch. The following ensures that we do not ignore
Expand Down Expand Up @@ -68,7 +68,6 @@ linters-settings:
#
# https://github.com/mvdan/gofumpt/issues/137
gofumpt:
lang-version: "1.21"
extra-rules: true

gosec:
Expand Down Expand Up @@ -163,5 +162,4 @@ linters-settings:
# contexts for beneficial reasons, and we disable the SSLv3 deprecation
# warning because this is solely for a debug log.
staticcheck:
go: "1.21"
checks: ["all", "-SA1012", "-SA1019"]
2 changes: 1 addition & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2269,7 +2269,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
backoff := cl.cfg.retryBackoff(tries)
if err != nil &&
(reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3) ||
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) {
(retryTimeout == 0 || time.Now().Add(backoff).Sub(start) <= retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) {
// Non-reshardable re-requests just jump back to the
// top where the broker is loaded. This is the case on
// requests where the original request is split to
Expand Down
24 changes: 14 additions & 10 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,10 @@ func RequestRetries(n int) Opt {
return clientOpt{func(cfg *cfg) { cfg.retries = int64(n) }}
}

// RetryTimeout sets the upper limit on how long we allow requests to retry,
// overriding the default of:
// RetryTimeout sets the upper limit on how long we allow a request to be
// issued and then reissued on failure. That is, this control the total
// end-to-end maximum time we allow for trying a request, This overrides the
// default of:
//
// JoinGroup: cfg.SessionTimeout (default 45s)
// SyncGroup: cfg.SessionTimeout (default 45s)
Expand All @@ -721,15 +723,17 @@ func RequestRetries(n int) Opt {
//
// A value of zero indicates no request timeout.
//
// The timeout is evaluated after a request is issued. If a retry backoff
// places the next request past the retry timeout deadline, the request will
// still be tried once more once the backoff expires.
// The timeout is evaluated after a request errors. If the time since the start
// of the first request plus any backoff for the latest failure is less than
// the retry timeout, the request will be issued again.
func RetryTimeout(t time.Duration) Opt {
return RetryTimeoutFn(func(int16) time.Duration { return t })
}

// RetryTimeoutFn sets the per-request upper limit on how long we allow
// requests to retry, overriding the default of:
// RetryTimeoutFn sets the upper limit on how long we allow a request to be
// issued and then reissued on failure. That is, this control the total
// end-to-end maximum time we allow for trying a request, This overrides the
// default of:
//
// JoinGroup: cfg.SessionTimeout (default 45s)
// SyncGroup: cfg.SessionTimeout (default 45s)
Expand All @@ -745,9 +749,9 @@ func RetryTimeout(t time.Duration) Opt {
//
// If the function returns zero, there is no retry timeout.
//
// The timeout is evaluated after a request is issued. If a retry backoff
// places the next request past the retry timeout deadline, the request will
// still be tried once more once the backoff expires.
// The timeout is evaluated after a request errors. If the time since the start
// of the first request plus any backoff for the latest failure is less than
// the retry timeout, the request will be issued again.
func RetryTimeoutFn(t func(int16) time.Duration) Opt {
return clientOpt{func(cfg *cfg) { cfg.retryTimeout = t }}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
case assignPurgeMatching:
// This is slightly different than invalidate in that
// we invalidate whole topics.
loadOffsets.keepFilter(func(t string, p int32) bool {
loadOffsets.keepFilter(func(t string, _ int32) bool {
_, ok := assignments[t]
return !ok // assignments are topics to purge -- do NOT keep the topic if it is being purged
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/consumer_direct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestPauseIssue489(t *testing.T) {
r := StringRecord("v")
r.Partition = int32(which % 3)
which++
cl.Produce(ctx, r, func(r *Record, err error) {
cl.Produce(ctx, r, func(_ *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestPauseIssueOct2023(t *testing.T) {
r := StringRecord("v")
r.Topic = ts[which%len(ts)]
which++
cl.Produce(ctx, r, func(r *Record, err error) {
cl.Produce(ctx, r, func(_ *Record, err error) {
if err == context.Canceled {
exit.Store(true)
}
Expand Down

0 comments on commit 8b53958

Please sign in to comment.