Skip to content

Commit

Permalink
add retries
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 29, 2023
1 parent 5d3e9f2 commit bbf04be
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"go.temporal.io/sdk/activity"
)

const maxRetriesForWalSegmentRemoved = 5

type PostgresCDCSource struct {
ctx context.Context
replPool *pgxpool.Pool
Expand Down Expand Up @@ -279,6 +281,7 @@ func (p *PostgresCDCSource) consumeStream(

pkmRequiresResponse := false
waitingForCommit := false
retryAttemptForWALSegmentRemoved := 0

for {
if pkmRequiresResponse {
Expand Down Expand Up @@ -355,9 +358,16 @@ func (p *PostgresCDCSource) consumeStream(
if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
if errMsg.Severity == "ERROR" && errMsg.Code == "XX000" {
if p.walSegmentRemovedRegex.MatchString(errMsg.Message) {
p.logger.Warn(fmt.Sprintf("WAL segment removed, restarting replication retrying in 30 seconds..."))
time.Sleep(30 * time.Second)
continue
retryAttemptForWALSegmentRemoved++
if retryAttemptForWALSegmentRemoved > maxRetriesForWalSegmentRemoved {
return fmt.Errorf("max retries for WAL segment removed exceeded: %+v", errMsg)
} else {
p.logger.Warn(fmt.Sprintf(
"WAL segment removed, restarting replication retrying in 30 seconds..."),
slog.Any("error", errMsg), slog.Int("retryAttempt", retryAttemptForWALSegmentRemoved))
time.Sleep(30 * time.Second)
continue
}
}
}
return fmt.Errorf("received Postgres WAL error: %+v", errMsg)
Expand Down

0 comments on commit bbf04be

Please sign in to comment.