Skip to content

Commit

Permalink
postgres cdc: update mirror lsn_offset when wal processing raises con…
Browse files Browse the repository at this point in the history
…sumedXLogPos

Not doing so could cause failures if consumption restarted,
since retrieval of mirror lsn_offset would start before updated lsn sent to pg,
making possible that pg had gotten rid of that wal in meantime
  • Loading branch information
serprex committed Dec 14, 2023
1 parent 46208f1 commit fceefb1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 21 deletions.
35 changes: 24 additions & 11 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"fmt"
"log/slog"
"strconv"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
Expand All @@ -28,10 +29,10 @@ type PostgresCDCSource struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
metadataSchema string
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string

Expand All @@ -44,6 +45,7 @@ type PostgresCDCConfig struct {
AppContext context.Context
Connection *pgxpool.Pool
Slot string
MetadataSchema string
Publication string
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
Expand All @@ -64,6 +66,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
metadataSchema: cdcConfig.MetadataSchema,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
Expand Down Expand Up @@ -142,19 +145,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName))

// start replication
p.startLSN = 0
var clientXLogPos, startLSN pglogrepl.LSN
if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint))
p.startLSN = pglogrepl.LSN(req.LastSyncState.Checkpoint + 1)
clientXLogPos = pglogrepl.LSN(req.LastSyncState.Checkpoint)
startLSN = clientXLogPos + 1
}

err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts)
err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts)
if err != nil {
return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err)
return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err)
}
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN))
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN))

return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream)
return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream)
}

// start consuming the cdc stream
Expand All @@ -171,14 +175,23 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// clientXLogPos is the last checkpoint id, we need to ack that we have processed
// until clientXLogPos each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
consumedXLogPos = clientXLogPos

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
result := conn.ExecParams(
p.ctx,
fmt.Sprintf("UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1)", p.metadataSchema, mirrorJobsTableIdentifier),
[][]byte{[]byte(strconv.FormatInt(int64(consumedXLogPos), 10))}, nil, nil, nil)
_, err := result.Close()
if err != nil {
return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err)
}

err = pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
Expand Down
23 changes: 14 additions & 9 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)"
checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=$1, sync_batch_id=$2 WHERE mirror_job_name=$3"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3"
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2"

getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name,
Expand Down Expand Up @@ -412,19 +412,24 @@ func (c *PostgresConnector) getLastNormalizeBatchID(jobName string) (int64, erro
}

func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
rows, err := c.pool.Query(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
var result pgtype.Bool
err := c.pool.QueryRow(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
return false, fmt.Errorf("failed to check if job exists: %w", err)
return false, fmt.Errorf("error reading result row: %w", err)
}
defer rows.Close()

return result.Bool, nil
}

func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool, error) {
var result pgtype.Bool
rows.Next()
err = rows.Scan(&result)
err := tx.QueryRow(c.ctx,
fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result)
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}

return result.Bool, nil
}

Expand All @@ -440,7 +445,7 @@ func (c *PostgresConnector) majorVersionCheck(majorVersion int) (bool, error) {

func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64,
syncRecordsTx pgx.Tx) error {
jobMetadataExists, err := c.jobMetadataExists(flowJobName)
jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName)
if err != nil {
return fmt.Errorf("failed to get sync status for flow job: %w", err)
}
Expand All @@ -466,7 +471,7 @@ func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64,

func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64,
normalizeRecordsTx pgx.Tx) error {
jobMetadataExists, err := c.jobMetadataExists(flowJobName)
jobMetadataExists, err := c.jobMetadataExistsTx(normalizeRecordsTx, flowJobName)
if err != nil {
return fmt.Errorf("failed to get sync status for flow job: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error {
Connection: c.replPool.Pool,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
MetadataSchema: c.metadataSchema,
Publication: publicationName,
TableNameMapping: req.TableNameMapping,
RelationMessageMapping: req.RelationMessageMapping,
Expand Down Expand Up @@ -273,7 +274,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
if err != nil {
return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err)
}
syncBatchID = syncBatchID + 1
syncBatchID += 1
records := make([][]interface{}, 0)
tableNameRowsMapping := make(map[string]uint32)

Expand Down

0 comments on commit fceefb1

Please sign in to comment.