From 83d561e17cb0c97542a292e99b54ca313ea6012f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Erbrech?= Date: Wed, 19 Jul 2023 06:00:42 +1000 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Cancel=20message=20context=20when?= =?UTF-8?q?=20lockrenewer=20stops=20(#140)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cancel messsage context when lockrenewer stops * add test for deprecated constructor default * fix linter * fix linter --- v2/e2e/e2e_test.go | 2 +- v2/lockrenewer.go | 77 +++++++++++++++++---- v2/lockrenewer_test.go | 107 ++++++++++++++++++++++------- v2/managedsettling_example_test.go | 2 +- v2/processor.go | 5 -- v2/settlehandler_example_test.go | 3 +- 6 files changed, 149 insertions(+), 47 deletions(-) diff --git a/v2/e2e/e2e_test.go b/v2/e2e/e2e_test.go index 99f41ba3..a10dde50 100644 --- a/v2/e2e/e2e_test.go +++ b/v2/e2e/e2e_test.go @@ -30,7 +30,7 @@ func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() { lockRenewalInterval := 2 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, - shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval, + shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewSettlementHandler(nil, testHandler(t, success, sendCount)))), &shuttle.ProcessorOptions{MaxConcurrency: 25}) diff --git a/v2/lockrenewer.go b/v2/lockrenewer.go index 301519c0..0d7f5573 100644 --- a/v2/lockrenewer.go +++ b/v2/lockrenewer.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -14,29 +15,69 @@ import ( "github.com/Azure/go-shuttle/v2/metrics" ) -// NewRenewLockHandler starts a renewlock goroutine for each message received. -func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFunc { +// LockRenewer abstracts the servicebus receiver client to only expose lock renewal +type LockRenewer interface { + RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error +} + +// LockRenewalOptions configures the lock renewal. +type LockRenewalOptions struct { + // Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds. + Interval *time.Duration + // CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped. + // Defaults to true. + CancelMessageContextOnStop *bool +} + +// NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval. +func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc { + interval := 10 * time.Second + cancelMessageContextOnStop := true + if options != nil { + if options.Interval != nil { + interval = *options.Interval + } + if options.CancelMessageContextOnStop != nil { + cancelMessageContextOnStop = *options.CancelMessageContextOnStop + } + } plr := &peekLockRenewer{ - next: handler, - lockRenewer: lockRenewer, - renewalInterval: interval, - stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking + next: handler, + lockRenewer: lockRenewer, + renewalInterval: &interval, + cancelMessageCtxOnStop: cancelMessageContextOnStop, + stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking } return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) { - go plr.startPeriodicRenewal(ctx, message) - handler.Handle(ctx, settler, message) - plr.stop(ctx) + renewalCtx, cancel := context.WithCancel(ctx) + plr.cancelMessageCtx = cancel + go plr.startPeriodicRenewal(renewalCtx, message) + handler.Handle(renewalCtx, settler, message) + plr.stop(renewalCtx) } } -// PeekLockRenewer starts a background goroutine that renews the message lock at the given interval until Stop() is called +// Deprecated: use NewLockRenewalHandler +// NewRenewLockHandler starts a renewlock goroutine for each message received. +func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFunc { + return NewLockRenewalHandler(lockRenewer, + &LockRenewalOptions{ + Interval: interval, + // default to false on the old handler signature to keep the same behavior + CancelMessageContextOnStop: to.Ptr(false)}, + handler) +} + +// peekLockRenewer starts a background goroutine that renews the message lock at the given interval until Stop() is called // or until the passed in context is canceled. // it is a pass through handler if the renewalInterval is nil type peekLockRenewer struct { - next Handler - lockRenewer LockRenewer - renewalInterval *time.Duration - alive atomic.Bool + next Handler + lockRenewer LockRenewer + renewalInterval *time.Duration + alive atomic.Bool + cancelMessageCtxOnStop bool + cancelMessageCtx func() // stopped channel allows to short circuit the renewal loop // when we are already waiting on the select. @@ -46,12 +87,17 @@ type peekLockRenewer struct { stopped chan struct{} } +// stop will stop the renewal loop. if LockRenewalOptions.CancelMessageContextOnStop is set to true, it cancels the message context. func (plr *peekLockRenewer) stop(ctx context.Context) { plr.alive.Store(false) // don't send the stop signal to the loop if there is already one in the channel if len(plr.stopped) == 0 { plr.stopped <- struct{}{} } + if plr.cancelMessageCtxOnStop { + log(ctx, "canceling message context") + plr.cancelMessageCtx() + } log(ctx, "stopped periodic renewal") } @@ -86,7 +132,8 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a span.RecordError(fmt.Errorf("failed to renew lock: %w", err)) // on error, we continue to the next loop iteration. // if the context is Done, we will enter the ctx.Done() case and exit the renewal. - // if the error is anything else, we keep retrying the renewal + // if the error is identified as permanent, we stop the renewal. + // if the error is anything else, we keep trying the renewal. if plr.isPermanent(err) { log(ctx, "stopping periodic renewal for message: ", message.MessageID) plr.stop(ctx) diff --git a/v2/lockrenewer_test.go b/v2/lockrenewer_test.go index 0e37bfff..a809bbc6 100644 --- a/v2/lockrenewer_test.go +++ b/v2/lockrenewer_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" . "github.com/onsi/gomega" @@ -29,11 +30,12 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) { settler := &fakeSettler{} g := NewWithT(t) interval := 100 * time.Millisecond - lr := shuttle.NewRenewLockHandler(renewer, &interval, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, - message *azservicebus.ReceivedMessage) { - err := settler.CompleteMessage(ctx, message, nil) - g.Expect(err).To(Not(HaveOccurred())) - })) + lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval}, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, + message *azservicebus.ReceivedMessage) { + err := settler.CompleteMessage(ctx, message, nil) + g.Expect(err).To(Not(HaveOccurred())) + })) msg := &azservicebus.ReceivedMessage{} ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() @@ -48,10 +50,11 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) { func Test_RenewPeriodically(t *testing.T) { renewer := &fakeSBLockRenewer{} interval := 50 * time.Millisecond - lr := shuttle.NewRenewLockHandler(renewer, &interval, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, - message *azservicebus.ReceivedMessage) { - time.Sleep(150 * time.Millisecond) - })) + lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval}, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, + message *azservicebus.ReceivedMessage) { + time.Sleep(150 * time.Millisecond) + })) msg := &azservicebus.ReceivedMessage{} ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) defer cancel() @@ -63,11 +66,31 @@ func Test_RenewPeriodically(t *testing.T) { 20*time.Millisecond).Should(Succeed()) } +//nolint:staticcheck // still need to cover the deprecated func +func Test_NewLockRenewerHandler_defaultToNotCancelMessageContext(t *testing.T) { + g := NewWithT(t) + interval := 20 * time.Millisecond + sbRenewer := &fakeSBLockRenewer{ + Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}, + } + + handler := shuttle.NewRenewLockHandler(sbRenewer, &interval, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) { + g.Consistently(func(g Gomega) { + g.Expect(ctx.Err()).To(BeNil()) + }, "120ms", "10ms").Should(Succeed()) + })) + handler.Handle(context.Background(), &fakeSettler{}, &azservicebus.ReceivedMessage{}) +} + func Test_RenewPeriodically_Error(t *testing.T) { type testCase struct { - name string - renewer *fakeSBLockRenewer - verify func(g Gomega, tc *testCase) + name string + renewer *fakeSBLockRenewer + isRenewerCanceled bool + cancelCtxOnStop *bool + gotMessageCtx context.Context + verify func(g Gomega, tc *testCase) } testCases := []testCase{ { @@ -81,13 +104,14 @@ func Test_RenewPeriodically_Error(t *testing.T) { }, }, { - name: "stop periodic renewal on context canceled", - renewer: &fakeSBLockRenewer{Err: context.Canceled}, + name: "stop periodic renewal on context canceled", + isRenewerCanceled: true, + renewer: &fakeSBLockRenewer{Err: context.Canceled}, verify: func(g Gomega, tc *testCase) { g.Consistently( - func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) }, + func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) }, 130*time.Millisecond, - 20*time.Millisecond) + 20*time.Millisecond).Should(Succeed()) }, }, { @@ -95,9 +119,34 @@ func Test_RenewPeriodically_Error(t *testing.T) { renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}}, verify: func(g Gomega, tc *testCase) { g.Consistently( - func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) }, + func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) }, + 130*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) + }, + }, + { + name: "cancel message context on stop by default", + renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}}, + verify: func(g Gomega, tc *testCase) { + g.Consistently( + func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) }, 130*time.Millisecond, - 20*time.Millisecond) + 20*time.Millisecond).Should(Succeed()) + g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled)) + }, + }, + { + name: "does not cancel message context on stop if disabled", + renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}}, + cancelCtxOnStop: to.Ptr(false), + verify: func(g Gomega, tc *testCase) { + g.Consistently( + func(g Gomega) { + g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) + g.Expect(tc.gotMessageCtx.Err()).To(BeNil()) + }, + 100*time.Millisecond, + 20*time.Millisecond).Should(Succeed()) }, }, { @@ -106,7 +155,7 @@ func Test_RenewPeriodically_Error(t *testing.T) { verify: func(g Gomega, tc *testCase) { g.Eventually( func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) }, - 130*time.Millisecond, + 140*time.Millisecond, 20*time.Millisecond).Should(Succeed()) }, }, @@ -116,12 +165,22 @@ func Test_RenewPeriodically_Error(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() interval := 50 * time.Millisecond - lr := shuttle.NewRenewLockHandler(tc.renewer, &interval, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, - message *azservicebus.ReceivedMessage) { - time.Sleep(150 * time.Millisecond) - })) + lr := shuttle.NewLockRenewalHandler(tc.renewer, &shuttle.LockRenewalOptions{Interval: &interval, CancelMessageContextOnStop: tc.cancelCtxOnStop}, + shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, + message *azservicebus.ReceivedMessage) { + tc.gotMessageCtx = ctx + select { + case <-time.After(110 * time.Millisecond): + break + case <-ctx.Done(): + break + } + })) msg := &azservicebus.ReceivedMessage{} - ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond) + ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond) + if tc.isRenewerCanceled { + cancel() + } defer cancel() lr.Handle(ctx, &fakeSettler{}, msg) tc.verify(NewWithT(t), &tc) diff --git a/v2/managedsettling_example_test.go b/v2/managedsettling_example_test.go index cfdbd3dd..8b5a3dcf 100644 --- a/v2/managedsettling_example_test.go +++ b/v2/managedsettling_example_test.go @@ -27,7 +27,7 @@ func ExampleNewManagedSettlingHandler() { lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, - shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval, + shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{ RetryDecision: &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2}, RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second}, diff --git a/v2/processor.go b/v2/processor.go index 6aa3dcc0..31163495 100644 --- a/v2/processor.go +++ b/v2/processor.go @@ -35,11 +35,6 @@ func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message f(ctx, settler, message) } -// LockRenewer abstracts the servicebus receiver client to only expose lock renewal -type LockRenewer interface { - RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error -} - // Processor encapsulates the message pump and concurrency handling of servicebus. // it exposes a handler API to provides a middleware based message processing pipeline. type Processor struct { diff --git a/v2/settlehandler_example_test.go b/v2/settlehandler_example_test.go index 7368ef5c..67dcbc08 100644 --- a/v2/settlehandler_example_test.go +++ b/v2/settlehandler_example_test.go @@ -6,6 +6,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/go-shuttle/v2" ) @@ -25,7 +26,7 @@ func ExampleNewSettlementHandler() { lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, - shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval, + shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewSettlementHandler(nil, mySettlingHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10}) ctx, cancel := context.WithCancel(context.Background())