Skip to content

Commit

Permalink
Merge pull request #1595 from kaleido-io/idempotent-retry-fix
Browse files Browse the repository at this point in the history
break out of infinite retry for submitted ops
  • Loading branch information
shorsher authored Nov 1, 2024
2 parents f81d957 + a116123 commit 8951c92
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,12 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error {
}
}
}
conflictErr, conflictTestOk := err.(operations.ConflictError)
if conflictTestOk && conflictErr.IsConflictError() {
// We know that the connector has received our batch, so we shouldn't need to retry
payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent)
return true, nil
}
} else {
if core.IsPinned(payload.Batch.TX.Type) {
payload.addMessageUpdate(payload.Messages, core.MessageStateReady, core.MessageStateSent)
Expand Down
23 changes: 23 additions & 0 deletions internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ func mockRunAsGroupPassthrough(mdi *databasemocks.Plugin) {
}
}

type testConflictError struct {
err error
}

func (tce *testConflictError) Error() string {
return tce.err.Error()
}

func (tce *testConflictError) IsConflictError() bool {
return true
}

func TestUnfilledBatch(t *testing.T) {
log.SetLevel("debug")
coreconfig.Reset()
Expand Down Expand Up @@ -129,6 +141,17 @@ func TestUnfilledBatch(t *testing.T) {
mim.AssertExpectations(t)
}

func TestHandleDispatchConflictError(t *testing.T) {
cancel, _, bp := newTestBatchProcessor(t, func(c context.Context, state *DispatchPayload) error {
conflictErr := testConflictError{err: fmt.Errorf("pop")}
return &conflictErr
})
defer cancel()
bp.dispatchBatch(&DispatchPayload{})
bp.cancelCtx()
<-bp.done
}

func TestBatchSizeOverflow(t *testing.T) {
log.SetLevel("debug")
coreconfig.Reset()
Expand Down

0 comments on commit 8951c92

Please sign in to comment.