From 241b6795de88c3779248d8cf465c78b149cc4242 Mon Sep 17 00:00:00 2001 From: Jon Tirsen Date: Mon, 8 May 2023 15:18:54 +0200 Subject: [PATCH] Retry at the transaction level Instead of restarting the entire replication thread after a failure --- pkg/clone/transactionwriter.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/clone/transactionwriter.go b/pkg/clone/transactionwriter.go index 421e98b..9bd6f09 100644 --- a/pkg/clone/transactionwriter.go +++ b/pkg/clone/transactionwriter.go @@ -99,7 +99,7 @@ func (w *TransactionWriter) runSequential(ctx context.Context, b backoff.BackOff return ctx.Err() } - err := autotx.TransactWithOptions(ctx, w.target, &sql.TxOptions{Isolation: sql.LevelReadCommitted}, func(tx *sql.Tx) error { + err := w.transact(ctx, func(tx *sql.Tx) error { for _, mutation := range transaction.Mutations { err := w.handleMutation(ctx, tx, mutation) if err != nil { @@ -325,7 +325,7 @@ func (s *transactionSequence) Run(ctx context.Context) error { if len(transaction.transaction.Mutations) == 0 { continue } - err := autotx.TransactWithOptions(ctx, s.writer.target, &sql.TxOptions{Isolation: sql.LevelReadCommitted}, func(tx *sql.Tx) error { + err := s.writer.transact(ctx, func(tx *sql.Tx) error { for _, mutation := range transaction.transaction.Mutations { err := s.writer.handleMutation(ctx, tx, mutation) if err != nil { @@ -737,6 +737,18 @@ func (w *TransactionWriter) writeCheckpoint(ctx context.Context, tx *sql.Tx, pos return nil } +func (w *TransactionWriter) transact(ctx context.Context, f func(tx *sql.Tx) error) error { + return errors.WithStack(autotx.TransactWithRetryAndOptions(ctx, + w.target, + &sql.TxOptions{Isolation: sql.LevelReadCommitted}, + autotx.RetryOptions{ + MaxRetries: int(w.config.WriteRetries), + IsRetryable: func(err error) bool { + return !isSchemaError(err) + }, + }, f)) +} + // repair synchronously diffs and writes the chunk to the target (diff and write) // the writes are made synchronously in the replication stream to maintain strong consistency func (m *Mutation) repair(ctx context.Context, tx DBWriter) (rowCount int, sizeBytes uint64, err error) {