From ed66fc73b3148fab7fdd10af25c27486af40e59e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 14 Dec 2023 14:50:00 +0000 Subject: [PATCH] postgres cdc: update mirror lsn_offset when wal processing raises consumedXLogPos Not doing so could cause failures if consumption restarted, since retrieval of mirror lsn_offset would start before updated lsn sent to pg, making possible that pg had gotten rid of that wal in meantime --- flow/connectors/postgres/cdc.go | 35 +++++++++++++++++++--------- flow/connectors/postgres/client.go | 23 +++++++++++------- flow/connectors/postgres/postgres.go | 3 ++- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index cf75fb2e2c..3637974f52 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,6 +5,7 @@ import ( "crypto/sha256" "fmt" "log/slog" + "strconv" "time" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -28,10 +29,10 @@ type PostgresCDCSource struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude slot string + metadataSchema string publication string relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map - startLSN pglogrepl.LSN commitLock bool customTypeMapping map[uint32]string @@ -44,6 +45,7 @@ type PostgresCDCConfig struct { AppContext context.Context Connection *pgxpool.Pool Slot string + MetadataSchema string Publication string SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude @@ -64,6 +66,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, TableNameMapping: cdcConfig.TableNameMapping, slot: cdcConfig.Slot, + metadataSchema: cdcConfig.MetadataSchema, publication: cdcConfig.Publication, relationMessageMapping: cdcConfig.RelationMessageMapping, typeMap: pgtype.NewMap(), @@ -142,19 +145,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName)) // start replication - p.startLSN = 0 + var clientXLogPos, startLSN pglogrepl.LSN if req.LastSyncState != nil && req.LastSyncState.Checkpoint > 0 { p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastSyncState.Checkpoint)) - p.startLSN = pglogrepl.LSN(req.LastSyncState.Checkpoint + 1) + clientXLogPos = pglogrepl.LSN(req.LastSyncState.Checkpoint) + startLSN = clientXLogPos + 1 } - err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts) + err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts) if err != nil { - return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err) + return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err) } - p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN)) + p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN)) - return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream) + return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream) } // start consuming the cdc stream @@ -171,14 +175,23 @@ func (p *PostgresCDCSource) consumeStream( } }() - // clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed - // until clientXLogPos - 1 each time we send a standby status update. + // clientXLogPos is the last checkpoint id, we need to ack that we have processed + // until clientXLogPos each time we send a standby status update. // consumedXLogPos is the lsn that has been committed on the destination. consumedXLogPos := pglogrepl.LSN(0) if clientXLogPos > 0 { - consumedXLogPos = clientXLogPos - 1 + consumedXLogPos = clientXLogPos - err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, + result := conn.ExecParams( + p.ctx, + fmt.Sprintf("UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1)", p.metadataSchema, mirrorJobsTableIdentifier), + [][]byte{[]byte(strconv.FormatInt(consumedXLogPos, 10))}, nil, nil, nil) + _, err := result.Close() + if err != nil { + return fmt.Errorf("[initial-flush] storing updated LSN failed: %w", err) + } + + err = pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 871fd7403b..ef99d7e56d 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -38,7 +38,7 @@ const ( insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)" checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1" - updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=$1, sync_batch_id=$2 WHERE mirror_job_name=$3" + updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3" updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2" getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name, @@ -412,19 +412,24 @@ func (c *PostgresConnector) getLastNormalizeBatchID(jobName string) (int64, erro } func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) { - rows, err := c.pool.Query(c.ctx, - fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + var result pgtype.Bool + err := c.pool.QueryRow(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { - return false, fmt.Errorf("failed to check if job exists: %w", err) + return false, fmt.Errorf("error reading result row: %w", err) } - defer rows.Close() + return result.Bool, nil +} + +func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool, error) { var result pgtype.Bool - rows.Next() - err = rows.Scan(&result) + err := tx.QueryRow(c.ctx, + fmt.Sprintf(checkIfJobMetadataExistsSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName).Scan(&result) if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } + return result.Bool, nil } @@ -440,7 +445,7 @@ func (c *PostgresConnector) majorVersionCheck(majorVersion int) (bool, error) { func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64, syncRecordsTx pgx.Tx) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) + jobMetadataExists, err := c.jobMetadataExistsTx(syncRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) } @@ -466,7 +471,7 @@ func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normalizeBatchID int64, normalizeRecordsTx pgx.Tx) error { - jobMetadataExists, err := c.jobMetadataExists(flowJobName) + jobMetadataExists, err := c.jobMetadataExistsTx(normalizeRecordsTx, flowJobName) if err != nil { return fmt.Errorf("failed to get sync status for flow job: %w", err) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e5bec86b06..681200f3ac 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -236,6 +236,7 @@ func (c *PostgresConnector) PullRecords(req *model.PullRecordsRequest) error { Connection: c.replPool.Pool, SrcTableIDNameMapping: req.SrcTableIDNameMapping, Slot: slotName, + MetadataSchema: c.metadataSchema, Publication: publicationName, TableNameMapping: req.TableNameMapping, RelationMessageMapping: req.RelationMessageMapping, @@ -273,7 +274,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S if err != nil { return nil, fmt.Errorf("failed to get previous syncBatchID: %w", err) } - syncBatchID = syncBatchID + 1 + syncBatchID += 1 records := make([][]interface{}, 0) tableNameRowsMapping := make(map[string]uint32)