Skip to content

Commit

Permalink
Revert Logging for wal segment removed
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 30, 2024
1 parent 1875b99 commit 3369a9b
Showing 1 changed file with 1 addition and 17 deletions.
18 changes: 1 addition & 17 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"go.temporal.io/sdk/activity"
)

const maxRetriesForWalSegmentRemoved = 5

type PostgresCDCSource struct {
ctx context.Context
replPool *pgxpool.Pool
Expand Down Expand Up @@ -279,7 +277,6 @@ func (p *PostgresCDCSource) consumeStream(

pkmRequiresResponse := false
waitingForCommit := false
retryAttemptForWALSegmentRemoved := 0

for {
if pkmRequiresResponse {
Expand Down Expand Up @@ -362,20 +359,7 @@ func (p *PostgresCDCSource) consumeStream(
}

if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" {
if p.walSegmentRemovedRegex.MatchString(errMsg.Message) {
retryAttemptForWALSegmentRemoved++
if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved {
return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg)
} else {
p.logger.Warn(
"WAL segment removed, restarting replication retrying in 30 seconds...",
slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved))
time.Sleep(30 * time.Second)
continue
}
}
}
p.logger.Error(fmt.Sprintf("received Postgres WAL error: %+v", errMsg))
return fmt.Errorf("received Postgres WAL error: %+v", errMsg)
}

Expand Down

0 comments on commit 3369a9b

Please sign in to comment.