diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index c4496f0ac..a708c6d55 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -633,6 +633,12 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { } } } + conflictErr, conflictTestOk := err.(operations.ConflictError) + if conflictTestOk && conflictErr.IsConflictError() { + // We know that the connector has received our batch, so we shouldn't need to retry + payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent) + return true, nil + } } else { if core.IsPinned(payload.Batch.TX.Type) { payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent) diff --git a/internal/batch/batch_processor_test.go b/internal/batch/batch_processor_test.go index fde042660..f5704b60e 100644 --- a/internal/batch/batch_processor_test.go +++ b/internal/batch/batch_processor_test.go @@ -75,6 +75,18 @@ func mockRunAsGroupPassthrough(mdi *databasemocks.Plugin) { } } +type testConflictError struct { + err error +} + +func (tce *testConflictError) Error() string { + return tce.err.Error() +} + +func (tce *testConflictError) IsConflictError() bool { + return true +} + func TestUnfilledBatch(t *testing.T) { log.SetLevel("debug") coreconfig.Reset() @@ -129,6 +141,17 @@ func TestUnfilledBatch(t *testing.T) { mim.AssertExpectations(t) } +func TestHandleDispatchConflictError(t *testing.T) { + cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error { + conflictErr := testConflictError{err: fmt.Errorf("pop")} + return &conflictErr + }) + defer cancel() + bp.dispatchBatch(&DispatchPayload{}) + bp.cancelCtx() + <-bp.done +} + func TestBatchSizeOverflow(t *testing.T) { log.SetLevel("debug") coreconfig.Reset()