diff --git a/v2/managedsettling.go b/v2/managedsettling.go index f9ef735..bf7088a 100644 --- a/v2/managedsettling.go +++ b/v2/managedsettling.go @@ -38,7 +38,9 @@ func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, mes m.options.OnError(ctx, m.options, settler, message, err) return } - if err := settler.CompleteMessage(ctx, message, nil); err != nil { + settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) + defer cancel() + if err := settler.CompleteMessage(settleCtx, message, nil); err != nil { logger.Error(fmt.Sprintf("error completing message: %s", err)) m.options.OnAbandoned(ctx, message, err) return diff --git a/v2/settlehandler.go b/v2/settlehandler.go index 320efd0..fdc8022 100644 --- a/v2/settlehandler.go +++ b/v2/settlehandler.go @@ -3,11 +3,14 @@ package shuttle import ( "context" "fmt" + "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/devigned/tab" ) +const settlementTimeout = 30 * time.Second + // Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp type Settlement interface { Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage) @@ -107,7 +110,10 @@ type settlement[T any] struct { func (s settlement[T]) settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, options T) { span := tab.FromContext(ctx) span.Logger().Info(fmt.Sprintf("%s message", s.name)) - if err := s.settleFunc(ctx, settler, message, options); err != nil { + settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) + defer cancel() + getLogger(ctx).Info(fmt.Sprintf("%s message with ID: %s", s.name, message.MessageID)) + if err := s.settleFunc(settleCtx, settler, message, options); err != nil { wrapped := fmt.Errorf("%s settlement failed: %w", s.name, err) getLogger(ctx).Error(wrapped.Error()) span.Logger().Error(wrapped)