From 64654488a7d69a6b2990491609beddcb65ee6480 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 30 Jan 2024 08:27:42 -0500 Subject: [PATCH] Fix hang when logging WAL segment removed (#1174) When we encounter an error, we don't get any new messages until we re-establish START_REPLICATION. The added complexity isn't worth special-handling this error. --- flow/connectors/postgres/cdc.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 18cada42c7..c99bf40a79 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -26,8 +26,6 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -const maxRetriesForWalSegmentRemoved = 5 - type PostgresCDCSource struct { ctx context.Context replConn *pgx.Conn @@ -266,7 +264,6 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse := false waitingForCommit := false - retryAttemptForWALSegmentRemoved := 0 for { if pkmRequiresResponse { @@ -349,20 +346,7 @@ func (p *PostgresCDCSource) consumeStream( } if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { - if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" { - if p.walSegmentRemovedRegex.MatchString(errMsg.Message) { - retryAttemptForWALSegmentRemoved++ - if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved { - return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg) - } else { - p.logger.Warn( - "WAL segment removed, restarting replication retrying in 30 seconds...", - slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved)) - time.Sleep(30 * time.Second) - continue - } - } - } + p.logger.Error(fmt.Sprintf("received Postgres WAL error: %+v", errMsg)) return fmt.Errorf("received Postgres WAL error: %+v", errMsg) }