From 3369a9b16470283172c7285ac720657cec0eb511 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 30 Jan 2024 08:32:45 -0500 Subject: [PATCH] Revert Logging for wal segment removed --- flow/connectors/postgres/cdc.go | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 4fd1b3dd79..f8f1fa2416 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -26,8 +26,6 @@ import ( "go.temporal.io/sdk/activity" ) -const maxRetriesForWalSegmentRemoved = 5 - type PostgresCDCSource struct { ctx context.Context replPool *pgxpool.Pool @@ -279,7 +277,6 @@ func (p *PostgresCDCSource) consumeStream( pkmRequiresResponse := false waitingForCommit := false - retryAttemptForWALSegmentRemoved := 0 for { if pkmRequiresResponse { @@ -362,20 +359,7 @@ func (p *PostgresCDCSource) consumeStream( } if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok { - if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" { - if p.walSegmentRemovedRegex.MatchString(errMsg.Message) { - retryAttemptForWALSegmentRemoved++ - if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved { - return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg) - } else { - p.logger.Warn( - "WAL segment removed, restarting replication retrying in 30 seconds...", - slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved)) - time.Sleep(30 * time.Second) - continue - } - } - } + p.logger.Error(fmt.Sprintf("received Postgres WAL error: %+v", errMsg)) return fmt.Errorf("received Postgres WAL error: %+v", errMsg) }