Skip to content

Commit

Permalink
Fix hang when logging WAL segment removed (#1174)
Browse files Browse the repository at this point in the history
When we encounter an error, we don't get any new messages until we
re-establish START_REPLICATION. The added complexity isn't worth
special-handling this error.
  • Loading branch information
iskakaushik authored Jan 30, 2024
1 parent 632f9f3 commit 6465448
Showing 1 changed file with 1 addition and 17 deletions.
18 changes: 1 addition & 17 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

const maxRetriesForWalSegmentRemoved = 5

type PostgresCDCSource struct {
ctx context.Context
replConn *pgx.Conn
Expand Down Expand Up @@ -266,7 +264,6 @@ func (p *PostgresCDCSource) consumeStream(

pkmRequiresResponse := false
waitingForCommit := false
retryAttemptForWALSegmentRemoved := 0

for {
if pkmRequiresResponse {
Expand Down Expand Up @@ -349,20 +346,7 @@ func (p *PostgresCDCSource) consumeStream(
}

if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" {
if p.walSegmentRemovedRegex.MatchString(errMsg.Message) {
retryAttemptForWALSegmentRemoved++
if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved {
return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg)
} else {
p.logger.Warn(
"WAL segment removed, restarting replication retrying in 30 seconds...",
slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved))
time.Sleep(30 * time.Second)
continue
}
}
}
p.logger.Error(fmt.Sprintf("received Postgres WAL error: %+v", errMsg))
return fmt.Errorf("received Postgres WAL error: %+v", errMsg)
}

Expand Down

0 comments on commit 6465448

Please sign in to comment.