Skip to content

Commit

Permalink
✨ Cancel message context when lockrenewer stops (#140)
Browse files Browse the repository at this point in the history
* cancel messsage context when lockrenewer stops

* add test for deprecated constructor default

* fix linter

* fix linter
  • Loading branch information
serbrech authored Jul 18, 2023
1 parent 08ce85d commit 83d561e
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 47 deletions.
2 changes: 1 addition & 1 deletion v2/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *SBSuite) TestPublishAndListen_ConcurrentLockRenewal() {
lockRenewalInterval := 2 * time.Second
p := shuttle.NewProcessor(receiver,
shuttle.NewPanicHandler(nil,
shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval,
shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewSettlementHandler(nil,
testHandler(t, success, sendCount)))), &shuttle.ProcessorOptions{MaxConcurrency: 25})

Expand Down
77 changes: 62 additions & 15 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,77 @@ import (
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/Azure/go-shuttle/v2/metrics"
)

// NewRenewLockHandler starts a renewlock goroutine for each message received.
func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFunc {
// LockRenewer abstracts the servicebus receiver client to only expose lock renewal
type LockRenewer interface {
RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

// LockRenewalOptions configures the lock renewal.
type LockRenewalOptions struct {
// Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds.
Interval *time.Duration
// CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped.
// Defaults to true.
CancelMessageContextOnStop *bool
}

// NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc {
interval := 10 * time.Second
cancelMessageContextOnStop := true
if options != nil {
if options.Interval != nil {
interval = *options.Interval
}
if options.CancelMessageContextOnStop != nil {
cancelMessageContextOnStop = *options.CancelMessageContextOnStop
}
}
plr := &peekLockRenewer{
next: handler,
lockRenewer: lockRenewer,
renewalInterval: interval,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
next: handler,
lockRenewer: lockRenewer,
renewalInterval: &interval,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
}
return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
go plr.startPeriodicRenewal(ctx, message)
handler.Handle(ctx, settler, message)
plr.stop(ctx)
renewalCtx, cancel := context.WithCancel(ctx)
plr.cancelMessageCtx = cancel
go plr.startPeriodicRenewal(renewalCtx, message)
handler.Handle(renewalCtx, settler, message)
plr.stop(renewalCtx)
}
}

// PeekLockRenewer starts a background goroutine that renews the message lock at the given interval until Stop() is called
// Deprecated: use NewLockRenewalHandler
// NewRenewLockHandler starts a renewlock goroutine for each message received.
func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFunc {
return NewLockRenewalHandler(lockRenewer,
&LockRenewalOptions{
Interval: interval,
// default to false on the old handler signature to keep the same behavior
CancelMessageContextOnStop: to.Ptr(false)},
handler)
}

// peekLockRenewer starts a background goroutine that renews the message lock at the given interval until Stop() is called
// or until the passed in context is canceled.
// it is a pass through handler if the renewalInterval is nil
type peekLockRenewer struct {
next Handler
lockRenewer LockRenewer
renewalInterval *time.Duration
alive atomic.Bool
next Handler
lockRenewer LockRenewer
renewalInterval *time.Duration
alive atomic.Bool
cancelMessageCtxOnStop bool
cancelMessageCtx func()

// stopped channel allows to short circuit the renewal loop
// when we are already waiting on the select.
Expand All @@ -46,12 +87,17 @@ type peekLockRenewer struct {
stopped chan struct{}
}

// stop will stop the renewal loop. if LockRenewalOptions.CancelMessageContextOnStop is set to true, it cancels the message context.
func (plr *peekLockRenewer) stop(ctx context.Context) {
plr.alive.Store(false)
// don't send the stop signal to the loop if there is already one in the channel
if len(plr.stopped) == 0 {
plr.stopped <- struct{}{}
}
if plr.cancelMessageCtxOnStop {
log(ctx, "canceling message context")
plr.cancelMessageCtx()
}
log(ctx, "stopped periodic renewal")
}

Expand Down Expand Up @@ -86,7 +132,8 @@ func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *a
span.RecordError(fmt.Errorf("failed to renew lock: %w", err))
// on error, we continue to the next loop iteration.
// if the context is Done, we will enter the ctx.Done() case and exit the renewal.
// if the error is anything else, we keep retrying the renewal
// if the error is identified as permanent, we stop the renewal.
// if the error is anything else, we keep trying the renewal.
if plr.isPermanent(err) {
log(ctx, "stopping periodic renewal for message: ", message.MessageID)
plr.stop(ctx)
Expand Down
107 changes: 83 additions & 24 deletions v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
. "github.com/onsi/gomega"

Expand All @@ -29,11 +30,12 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) {
settler := &fakeSettler{}
g := NewWithT(t)
interval := 100 * time.Millisecond
lr := shuttle.NewRenewLockHandler(renewer, &interval, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
err := settler.CompleteMessage(ctx, message, nil)
g.Expect(err).To(Not(HaveOccurred()))
}))
lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
err := settler.CompleteMessage(ctx, message, nil)
g.Expect(err).To(Not(HaveOccurred()))
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
Expand All @@ -48,10 +50,11 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) {
func Test_RenewPeriodically(t *testing.T) {
renewer := &fakeSBLockRenewer{}
interval := 50 * time.Millisecond
lr := shuttle.NewRenewLockHandler(renewer, &interval, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
defer cancel()
Expand All @@ -63,11 +66,31 @@ func Test_RenewPeriodically(t *testing.T) {
20*time.Millisecond).Should(Succeed())
}

//nolint:staticcheck // still need to cover the deprecated func
func Test_NewLockRenewerHandler_defaultToNotCancelMessageContext(t *testing.T) {
g := NewWithT(t)
interval := 20 * time.Millisecond
sbRenewer := &fakeSBLockRenewer{
Err: &azservicebus.Error{Code: azservicebus.CodeLockLost},
}

handler := shuttle.NewRenewLockHandler(sbRenewer, &interval,
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) {
g.Consistently(func(g Gomega) {
g.Expect(ctx.Err()).To(BeNil())
}, "120ms", "10ms").Should(Succeed())
}))
handler.Handle(context.Background(), &fakeSettler{}, &azservicebus.ReceivedMessage{})
}

func Test_RenewPeriodically_Error(t *testing.T) {
type testCase struct {
name string
renewer *fakeSBLockRenewer
verify func(g Gomega, tc *testCase)
name string
renewer *fakeSBLockRenewer
isRenewerCanceled bool
cancelCtxOnStop *bool
gotMessageCtx context.Context
verify func(g Gomega, tc *testCase)
}
testCases := []testCase{
{
Expand All @@ -81,23 +104,49 @@ func Test_RenewPeriodically_Error(t *testing.T) {
},
},
{
name: "stop periodic renewal on context canceled",
renewer: &fakeSBLockRenewer{Err: context.Canceled},
name: "stop periodic renewal on context canceled",
isRenewerCanceled: true,
renewer: &fakeSBLockRenewer{Err: context.Canceled},
verify: func(g Gomega, tc *testCase) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(0))) },
130*time.Millisecond,
20*time.Millisecond)
20*time.Millisecond).Should(Succeed())
},
},
{
name: "stop periodic renewal on permanent error (lockLost)",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
name: "cancel message context on stop by default",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
verify: func(g Gomega, tc *testCase) {
g.Consistently(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1))) },
130*time.Millisecond,
20*time.Millisecond)
20*time.Millisecond).Should(Succeed())
g.Expect(tc.gotMessageCtx.Err()).To(Equal(context.Canceled))
},
},
{
name: "does not cancel message context on stop if disabled",
renewer: &fakeSBLockRenewer{Err: &azservicebus.Error{Code: azservicebus.CodeLockLost}},
cancelCtxOnStop: to.Ptr(false),
verify: func(g Gomega, tc *testCase) {
g.Consistently(
func(g Gomega) {
g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(1)))
g.Expect(tc.gotMessageCtx.Err()).To(BeNil())
},
100*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
{
Expand All @@ -106,7 +155,7 @@ func Test_RenewPeriodically_Error(t *testing.T) {
verify: func(g Gomega, tc *testCase) {
g.Eventually(
func(g Gomega) { g.Expect(tc.renewer.RenewCount.Load()).To(Equal(int32(2))) },
130*time.Millisecond,
140*time.Millisecond,
20*time.Millisecond).Should(Succeed())
},
},
Expand All @@ -116,12 +165,22 @@ func Test_RenewPeriodically_Error(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
interval := 50 * time.Millisecond
lr := shuttle.NewRenewLockHandler(tc.renewer, &interval, shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
time.Sleep(150 * time.Millisecond)
}))
lr := shuttle.NewLockRenewalHandler(tc.renewer, &shuttle.LockRenewalOptions{Interval: &interval, CancelMessageContextOnStop: tc.cancelCtxOnStop},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
tc.gotMessageCtx = ctx
select {
case <-time.After(110 * time.Millisecond):
break
case <-ctx.Done():
break
}
}))
msg := &azservicebus.ReceivedMessage{}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Millisecond)
ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond)
if tc.isRenewerCanceled {
cancel()
}
defer cancel()
lr.Handle(ctx, &fakeSettler{}, msg)
tc.verify(NewWithT(t), &tc)
Expand Down
2 changes: 1 addition & 1 deletion v2/managedsettling_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ExampleNewManagedSettlingHandler() {
lockRenewalInterval := 10 * time.Second
p := shuttle.NewProcessor(receiver,
shuttle.NewPanicHandler(nil,
shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval,
shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{
RetryDecision: &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2},
RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second},
Expand Down
5 changes: 0 additions & 5 deletions v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message
f(ctx, settler, message)
}

// LockRenewer abstracts the servicebus receiver client to only expose lock renewal
type LockRenewer interface {
RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

// Processor encapsulates the message pump and concurrency handling of servicebus.
// it exposes a handler API to provides a middleware based message processing pipeline.
type Processor struct {
Expand Down
3 changes: 2 additions & 1 deletion v2/settlehandler_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"

"github.com/Azure/go-shuttle/v2"
)

Expand All @@ -25,7 +26,7 @@ func ExampleNewSettlementHandler() {
lockRenewalInterval := 10 * time.Second
p := shuttle.NewProcessor(receiver,
shuttle.NewPanicHandler(nil,
shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval,
shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
shuttle.NewSettlementHandler(nil, mySettlingHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10})

ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 83d561e

Please sign in to comment.