diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 0fe159a895..d5de8947c5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -25,6 +25,8 @@ import ( "go.temporal.io/sdk/activity" ) +const maxRetriesForWalSegmentRemoved = 5 + type PostgresCDCSource struct { ctx context.Context replPool *pgxpool.Pool @@ -279,6 +281,7 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse := false waitingForCommit := false + retryAttemptForWALSegmentRemoved := 0 for { if pkmRequiresResponse { @@ -355,9 +358,16 @@ func (p *PostgresCDCSource) consumeStream( if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" { if p.walSegmentRemovedRegex.MatchString(errMsg.Message) { - p.logger.Warn(fmt.Sprintf("WAL segment removed, restarting replication retrying in 30 seconds...")) - time.Sleep(30 * time.Second) - continue + retryAttemptForWALSegmentRemoved++ + if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved { + return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg) + } else { + p.logger.Warn(fmt.Sprintf( + "WAL segment removed, restarting replication retrying in 30 seconds..."), + slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved)) + time.Sleep(30 * time.Second) + continue + } } } return fmt.Errorf("received Postgres WAL error: %+v", errMsg)