diff --git a/v2/managedsettling.go b/v2/managedsettling.go index 049e6d81..ac77a0ac 100644 --- a/v2/managedsettling.go +++ b/v2/managedsettling.go @@ -23,7 +23,7 @@ type ManagedSettler struct { func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) { if err := m.next(ctx, message); err != nil { log(ctx, "error returned from the handler. Calling ManagedSettler error handler") - m.handleError(ctx, settler, message, err) + m.options.OnError(ctx, m.options, settler, message, err) return } if err := settler.CompleteMessage(ctx, message, nil); err != nil { @@ -70,6 +70,11 @@ func (s *ConstantDelayStrategy) GetDelay(_ uint32) time.Duration { // ManagedSettlingOptions allows to configure the ManagedSettling middleware type ManagedSettlingOptions struct { + // Allows to override the built-in error handling logic. + // OnError is called before any message settling action is taken. + // the ManagedSettlingOptions struct is passed as an argument so that the configuration + // like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently + OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) // RetryDecision is invoked to decide whether an error should be retried. // the default is to retry 5 times before moving the message to the deadletter. RetryDecision RetryDecision @@ -95,21 +100,11 @@ type ManagedSettlingOptions struct { // the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not. // this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue) func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingFunc) *ManagedSettler { - const ( - defaultRetryDecisionMaxAttempts = 5 - defaultDelay = 5 * time.Second - ) - options := &ManagedSettlingOptions{ - RetryDecision: &MaxAttemptsRetryDecision{MaxAttempts: defaultRetryDecisionMaxAttempts}, - RetryDelayStrategy: &ConstantDelayStrategy{Delay: defaultDelay}, - OnCompleted: func(_ context.Context, _ *azservicebus.ReceivedMessage) { - }, - OnAbandoned: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) { - }, - OnDeadLettered: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) { - }, - } + options := defaultManagedSettlingOptions() if opts != nil { + if opts.OnError != nil { + options.OnError = opts.OnError + } if opts.RetryDecision != nil { options.RetryDecision = opts.RetryDecision } @@ -132,11 +127,33 @@ func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSett } } -func (m *ManagedSettler) handleError(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) { +func defaultManagedSettlingOptions() *ManagedSettlingOptions { + const ( + defaultRetryDecisionMaxAttempts = 5 + defaultDelay = 5 * time.Second + ) + return &ManagedSettlingOptions{ + OnError: handleError, + RetryDecision: &MaxAttemptsRetryDecision{MaxAttempts: defaultRetryDecisionMaxAttempts}, + RetryDelayStrategy: &ConstantDelayStrategy{Delay: defaultDelay}, + OnCompleted: func(_ context.Context, _ *azservicebus.ReceivedMessage) { + }, + OnAbandoned: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) { + }, + OnDeadLettered: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) { + }, + } +} + +func handleError(ctx context.Context, + options *ManagedSettlingOptions, + settler MessageSettler, + message *azservicebus.ReceivedMessage, + handleErr error) { if handleErr == nil { handleErr = fmt.Errorf("nil error: %w", handleErr) } - if !m.options.RetryDecision.CanRetry(handleErr, message) { + if !options.RetryDecision.CanRetry(handleErr, message) { log(ctx, "moving message to dead letter queue because processing failed to an error: %s", handleErr) deadLetterSettlement.settle(ctx, settler, message, &azservicebus.DeadLetterOptions{ Reason: to.Ptr("ManagedSettlingHandlerDeadLettering"), @@ -144,14 +161,14 @@ func (m *ManagedSettler) handleError(ctx context.Context, settler MessageSettler PropertiesToModify: nil, }) // this could be a special hook to have more control on deadlettering, but keeping it simple for now - m.options.OnDeadLettered(ctx, message, handleErr) + options.OnDeadLettered(ctx, message, handleErr) return } // the delay is implemented as an in-memory sleep before calling abandon. // this will continue renewing the lock on the message while we wait for this delay to pass. - delay := m.options.RetryDelayStrategy.GetDelay(message.DeliveryCount) + delay := options.RetryDelayStrategy.GetDelay(message.DeliveryCount) log(ctx, "delay strategy return delay of %s", delay) time.Sleep(delay) abandonSettlement.settle(ctx, settler, message, nil) - m.options.OnAbandoned(ctx, message, handleErr) + options.OnAbandoned(ctx, message, handleErr) } diff --git a/v2/managedsettling_test.go b/v2/managedsettling_test.go index dc7d2a50..303c6f0a 100644 --- a/v2/managedsettling_test.go +++ b/v2/managedsettling_test.go @@ -153,9 +153,8 @@ func TestManagedSettler_Handle(t *testing.T) { } func Test_NilErr_WrappedInDeadLetter(t *testing.T) { - h := NewManagedSettlingHandler(nil, nil) settler := &fakeSettler{} - h.handleError(context.TODO(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, nil) + handleError(context.TODO(), defaultManagedSettlingOptions(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, nil) g := NewWithT(t) g.Expect(*settler.deadletterOptions.ErrorDescription).To(HavePrefix("nil error:")) } @@ -173,15 +172,31 @@ func TestDefaultOptions_CallDefaultHooks(t *testing.T) { g.Expect(settler.completed).To(BeTrue()) settler = &fakeSettler{} - h.handleError(context.TODO(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 0}, fmt.Errorf("oops")) + defaultOptions := defaultManagedSettlingOptions() + handleError(context.TODO(), defaultOptions, settler, &azservicebus.ReceivedMessage{DeliveryCount: 0}, fmt.Errorf("oops")) g.Expect(settler.abandoned).To(BeTrue()) settler = &fakeSettler{} - h.handleError(context.TODO(), settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, fmt.Errorf("oops")) + handleError(context.TODO(), defaultOptions, settler, &azservicebus.ReceivedMessage{DeliveryCount: 6}, fmt.Errorf("oops")) g.Expect(settler.deadlettered).To(BeTrue()) g.Expect(*settler.deadletterOptions.ErrorDescription).To(Equal("oops")) } +func TestOnErrorOverride(t *testing.T) { + g := NewWithT(t) + settler := &fakeSettler{} + opts := defaultManagedSettlingOptions() + var onErrorCalled bool + opts.OnError = func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) { + onErrorCalled = true + } + h := NewManagedSettlingHandler(opts, func(_ context.Context, _ *azservicebus.ReceivedMessage) error { + return fmt.Errorf("failed") + }) + h.Handle(context.Background(), settler, &azservicebus.ReceivedMessage{}) + g.Expect(onErrorCalled).To(BeTrue()) +} + func TestMaxAttemptsRetryDecision(t *testing.T) { for _, tc := range []struct { maxAttempts uint32