Skip to content

Commit

Permalink
Retry at the transaction level
Browse files Browse the repository at this point in the history
Instead of restarting the entire replication thread after a failure
  • Loading branch information
tirsen committed May 8, 2023
1 parent e989120 commit 241b679
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions pkg/clone/transactionwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (w *TransactionWriter) runSequential(ctx context.Context, b backoff.BackOff
return ctx.Err()
}

err := autotx.TransactWithOptions(ctx, w.target, &sql.TxOptions{Isolation: sql.LevelReadCommitted}, func(tx *sql.Tx) error {
err := w.transact(ctx, func(tx *sql.Tx) error {
for _, mutation := range transaction.Mutations {
err := w.handleMutation(ctx, tx, mutation)
if err != nil {
Expand Down Expand Up @@ -325,7 +325,7 @@ func (s *transactionSequence) Run(ctx context.Context) error {
if len(transaction.transaction.Mutations) == 0 {
continue
}
err := autotx.TransactWithOptions(ctx, s.writer.target, &sql.TxOptions{Isolation: sql.LevelReadCommitted}, func(tx *sql.Tx) error {
err := s.writer.transact(ctx, func(tx *sql.Tx) error {
for _, mutation := range transaction.transaction.Mutations {
err := s.writer.handleMutation(ctx, tx, mutation)
if err != nil {
Expand Down Expand Up @@ -737,6 +737,18 @@ func (w *TransactionWriter) writeCheckpoint(ctx context.Context, tx *sql.Tx, pos
return nil
}

func (w *TransactionWriter) transact(ctx context.Context, f func(tx *sql.Tx) error) error {
return errors.WithStack(autotx.TransactWithRetryAndOptions(ctx,
w.target,
&sql.TxOptions{Isolation: sql.LevelReadCommitted},
autotx.RetryOptions{
MaxRetries: int(w.config.WriteRetries),
IsRetryable: func(err error) bool {
return !isSchemaError(err)
},
}, f))
}

// repair synchronously diffs and writes the chunk to the target (diff and write)
// the writes are made synchronously in the replication stream to maintain strong consistency
func (m *Mutation) repair(ctx context.Context, tx DBWriter) (rowCount int, sizeBytes uint64, err error) {
Expand Down

0 comments on commit 241b679

Please sign in to comment.