From 142b2e21d2c6838534f87ebd88ccd3f9bbf7bd68 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 29 Apr 2024 23:16:36 -0600 Subject: [PATCH 1/3] kgo: update RetryTimeout docs and minor conditional Hopefully a little clearer. The conditional now just mirrors the other (almost) identical conditional below. The difference is a nanosecond, so, not really worth talking about more. --- pkg/kgo/client.go | 2 +- pkg/kgo/config.go | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 586227b7..285becad 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -2269,7 +2269,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res backoff := cl.cfg.retryBackoff(tries) if err != nil && (reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3) || - (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) < retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) { + (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) <= retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) { // Non-reshardable re-requests just jump back to the // top where the broker is loaded. This is the case on // requests where the original request is split to diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 221095cf..b63ae9d6 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -708,8 +708,10 @@ func RequestRetries(n int) Opt { return clientOpt{func(cfg *cfg) { cfg.retries = int64(n) }} } -// RetryTimeout sets the upper limit on how long we allow requests to retry, -// overriding the default of: +// RetryTimeout sets the upper limit on how long we allow a request to be +// issued and then reissued on failure. That is, this control the total +// end-to-end maximum time we allow for trying a request, This overrides the +// default of: // // JoinGroup: cfg.SessionTimeout (default 45s) // SyncGroup: cfg.SessionTimeout (default 45s) @@ -721,15 +723,17 @@ func RequestRetries(n int) Opt { // // A value of zero indicates no request timeout. // -// The timeout is evaluated after a request is issued. If a retry backoff -// places the next request past the retry timeout deadline, the request will -// still be tried once more once the backoff expires. +// The timeout is evaluated after a request errors. If the time since the start +// of the first request plus any backoff for the latest failure is less than +// the retry timeout, the request will be issued again. func RetryTimeout(t time.Duration) Opt { return RetryTimeoutFn(func(int16) time.Duration { return t }) } -// RetryTimeoutFn sets the per-request upper limit on how long we allow -// requests to retry, overriding the default of: +// RetryTimeoutFn sets the upper limit on how long we allow a request to be +// issued and then reissued on failure. That is, this control the total +// end-to-end maximum time we allow for trying a request, This overrides the +// default of: // // JoinGroup: cfg.SessionTimeout (default 45s) // SyncGroup: cfg.SessionTimeout (default 45s) @@ -745,9 +749,9 @@ func RetryTimeout(t time.Duration) Opt { // // If the function returns zero, there is no retry timeout. // -// The timeout is evaluated after a request is issued. If a retry backoff -// places the next request past the retry timeout deadline, the request will -// still be tried once more once the backoff expires. +// The timeout is evaluated after a request errors. If the time since the start +// of the first request plus any backoff for the latest failure is less than +// the retry timeout, the request will be issued again. func RetryTimeoutFn(t func(int16) time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.retryTimeout = t }} } From a4f259a4c9eb05a633bd0acb8db8bc8f2e71b5f7 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 29 Apr 2024 23:28:53 -0600 Subject: [PATCH 2/3] fix new linter errors --- .golangci.yml | 4 +--- pkg/kgo/consumer.go | 2 +- pkg/kgo/consumer_direct_test.go | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 600552cc..2895cca6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,7 +1,7 @@ # We may as well allow multiple golangci-lint invocations at once. run: allow-parallel-runners: true - go: "1.21" + go: "1.22" # golangci-lint by default ignores some staticcheck and vet raised issues that # are actually important to catch. The following ensures that we do not ignore @@ -68,7 +68,6 @@ linters-settings: # # https://github.com/mvdan/gofumpt/issues/137 gofumpt: - lang-version: "1.21" extra-rules: true gosec: @@ -163,5 +162,4 @@ linters-settings: # contexts for beneficial reasons, and we disable the SSLv3 deprecation # warning because this is solely for a debug log. staticcheck: - go: "1.21" checks: ["all", "-SA1012", "-SA1019"] diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 76ba397b..10e0fb44 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -1027,7 +1027,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how case assignPurgeMatching: // This is slightly different than invalidate in that // we invalidate whole topics. - loadOffsets.keepFilter(func(t string, p int32) bool { + loadOffsets.keepFilter(func(t string, _ int32) bool { _, ok := assignments[t] return !ok // assignments are topics to purge -- do NOT keep the topic if it is being purged }) diff --git a/pkg/kgo/consumer_direct_test.go b/pkg/kgo/consumer_direct_test.go index f02b0d69..b63f5343 100644 --- a/pkg/kgo/consumer_direct_test.go +++ b/pkg/kgo/consumer_direct_test.go @@ -292,7 +292,7 @@ func TestPauseIssue489(t *testing.T) { r := StringRecord("v") r.Partition = int32(which % 3) which++ - cl.Produce(ctx, r, func(r *Record, err error) { + cl.Produce(ctx, r, func(_ *Record, err error) { if err == context.Canceled { exit.Store(true) } @@ -372,7 +372,7 @@ func TestPauseIssueOct2023(t *testing.T) { r := StringRecord("v") r.Topic = ts[which%len(ts)] which++ - cl.Produce(ctx, r, func(r *Record, err error) { + cl.Produce(ctx, r, func(_ *Record, err error) { if err == context.Canceled { exit.Store(true) } From c08635f2e09a54b20e0ff568b998486c22983c1f Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 29 Apr 2024 23:35:39 -0600 Subject: [PATCH 3/3] GHA: add i386 atomicalign Inspo from github.com/IBM/sarama/pull/2874 (MIT) --- .github/workflows/i386.yml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 .github/workflows/i386.yml diff --git a/.github/workflows/i386.yml b/.github/workflows/i386.yml new file mode 100644 index 00000000..3696a6bb --- /dev/null +++ b/.github/workflows/i386.yml @@ -0,0 +1,30 @@ +# Inspired by github.com/IBM/sarama/pull/2874 (MIT) + +name: i386 + +on: + push: + branches: ["*"] + paths-ignore: + - '**/*.md' + pull_request: + branches: ["*"] + paths-ignore: + - '**/*.md' + +jobs: + atomicalign: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: 'stable' + - name: staticcheck + env: + GOARCH: 386 + GOFLAGS: -tags=functional + run: | + git clone --depth=1 https://github.com/dominikh/go-tools /tmp/go-tools + ( cd /tmp/go-tools/cmd/staticcheck && go build -o /tmp/staticcheck ) + /tmp/staticcheck -checks SA1027 ./...