From 952199444159308fda7e6011a07f37a6f62f1a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 20 Dec 2023 15:04:01 +0000 Subject: [PATCH] postgres cdc: update mirror lsn_offset when wal processing raises consumedXLogPos (#823) Not doing so could cause failures if consumption restarted, since retrieval of mirror lsn_offset would start before updated lsn sent to pg, making possible that pg had gotten rid of that wal in meantime Also always use `GREATEST` to make sure lsn_offset is monotonic --- flow/activities/flowable.go | 3 ++ flow/connectors/bigquery/bigquery.go | 17 ++++++++++ flow/connectors/core.go | 3 ++ flow/connectors/eventhub/eventhub.go | 6 ++-- flow/connectors/external_metadata/store.go | 4 +-- flow/connectors/postgres/cdc.go | 38 ++++++++++++++-------- flow/connectors/postgres/client.go | 4 ++- flow/connectors/postgres/postgres.go | 14 +++++++- flow/connectors/s3/s3.go | 4 +-- flow/connectors/snowflake/snowflake.go | 13 +++++++- flow/model/model.go | 2 ++ 11 files changed, 85 insertions(+), 23 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 69628c28d5..11a6d37895 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -264,6 +264,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, RelationMessageMapping: input.RelationMessageMapping, RecordStream: recordBatch, + SetLastOffset: func(lastOffset int64) error { + return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset) + }, }) }) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 5f966ecf7f..2749566ced 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -340,6 +340,23 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) { } } +func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error { + query := fmt.Sprintf( + "UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'", + c.datasetID, + MirrorJobsTable, + lastOffset, + jobName, + ) + q := c.client.Query(query) + _, err := q.Read(c.ctx) + if err != nil { + return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err) + } + + return nil +} + func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) { query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 707a7f0b11..c3fb138398 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -62,6 +62,9 @@ type CDCSyncConnector interface { // GetLastOffset gets the last offset from the metadata table on the destination GetLastOffset(jobName string) (int64, error) + // SetLastOffset updates the last offset on the metadata table on the destination + SetLastOffset(jobName string, lastOffset int64) error + // GetLastSyncBatchID gets the last batch synced to the destination from the metadata table GetLastSyncBatchID(jobName string) (int64, error) diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4be57309f2..027d3027fa 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -109,7 +109,7 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) { return c.pgMetadata.FetchLastOffset(jobName) } -func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error { +func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error { err := c.pgMetadata.UpdateLastOffset(jobName, offset) if err != nil { c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err)) @@ -187,7 +187,7 @@ func (c *EventHubConnector) processBatch( } if lastSeenLSN > lastUpdatedOffset { - err = c.updateLastOffset(flowJobName, lastSeenLSN) + err = c.SetLastOffset(flowJobName, lastSeenLSN) lastUpdatedOffset = lastSeenLSN c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN)) if err != nil { @@ -233,7 +233,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S return nil, err } - err = c.updateLastOffset(req.FlowJobName, lastCheckpoint) + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) if err != nil { c.logger.Error("failed to update last offset", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index ef2cf5e45b..eee1d4ef66 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -146,7 +146,6 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) { var offset pgtype.Int8 err := rows.Scan(&offset) if err != nil { - // if the job doesn't exist, return 0 if err.Error() == "no rows in result set" { return 0, nil } @@ -198,7 +197,8 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id) VALUES ($1, $2, $3) ON CONFLICT (job_name) - DO UPDATE SET last_offset = $2, updated_at = NOW() + DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset), + updated_at = NOW() `, jobName, offset, 0) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 979723f930..f2eda2e5f4 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -30,10 +30,10 @@ type PostgresCDCSource struct { SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude slot string + SetLastOffset func(int64) error publication string relationMessageMapping model.RelationMessageMapping typeMap *pgtype.Map - startLSN pglogrepl.LSN commitLock bool customTypeMapping map[uint32]string @@ -56,6 +56,7 @@ type PostgresCDCConfig struct { RelationMessageMapping model.RelationMessageMapping CatalogPool *pgxpool.Pool FlowJobName string + SetLastOffset func(int64) error } // Create a new PostgresCDCSource @@ -72,6 +73,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, TableNameMapping: cdcConfig.TableNameMapping, slot: cdcConfig.Slot, + SetLastOffset: cdcConfig.SetLastOffset, publication: cdcConfig.Publication, relationMessageMapping: cdcConfig.RelationMessageMapping, typeMap: pgtype.NewMap(), @@ -152,19 +154,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error { sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName)) // start replication - p.startLSN = 0 + var clientXLogPos, startLSN pglogrepl.LSN if req.LastOffset > 0 { p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) - p.startLSN = pglogrepl.LSN(req.LastOffset + 1) + clientXLogPos = pglogrepl.LSN(req.LastOffset) + startLSN = clientXLogPos + 1 } - err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts) + err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts) if err != nil { - return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err) + return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err) } - p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN)) + p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN)) - return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream) + return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream) } // start consuming the cdc stream @@ -181,12 +184,12 @@ func (p *PostgresCDCSource) consumeStream( } }() - // clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed - // until clientXLogPos - 1 each time we send a standby status update. + // clientXLogPos is the last checkpoint id, we need to ack that we have processed + // until clientXLogPos each time we send a standby status update. // consumedXLogPos is the lsn that has been committed on the destination. consumedXLogPos := pglogrepl.LSN(0) if clientXLogPos > 0 { - consumedXLogPos = clientXLogPos - 1 + consumedXLogPos = clientXLogPos err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) @@ -194,6 +197,7 @@ func (p *PostgresCDCSource) consumeStream( return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err) } } + proposedConsumedXLogPos := consumedXLogPos var standByLastLogged time.Time cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName) @@ -252,19 +256,27 @@ func (p *PostgresCDCSource) consumeStream( if pkmRequiresResponse { // Update XLogPos to the last processed position, we can only confirm // that this is the last row committed on the destination. + if proposedConsumedXLogPos > consumedXLogPos { + p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos)) + consumedXLogPos = proposedConsumedXLogPos + err := p.SetLastOffset(int64(consumedXLogPos)) + if err != nil { + return fmt.Errorf("storing updated LSN failed: %w", err) + } + } + err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos}) if err != nil { return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err) } + pkmRequiresResponse = false if time.Since(standByLastLogged) > 10*time.Second { numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len()) p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage)) standByLastLogged = time.Now() } - - pkmRequiresResponse = false } if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock { @@ -469,7 +481,7 @@ func (p *PostgresCDCSource) consumeStream( if cdcRecordsStorage.IsEmpty() { // given that we have no records it is safe to update the flush wal position // to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages. - consumedXLogPos = clientXLogPos + proposedConsumedXLogPos = clientXLogPos records.UpdateLatestCheckpoint(int64(clientXLogPos)) } } diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 5e00ca8a7f..d709866239 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -34,13 +34,14 @@ const ( createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)" getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1" + setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2" getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1" getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1" createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)" checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1" - updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=$1, sync_batch_id=$2 WHERE mirror_job_name=$3" + updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3" updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2" getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name, @@ -486,6 +487,7 @@ func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool if err != nil { return false, fmt.Errorf("error reading result row: %w", err) } + return result.Bool, nil } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index cad914706f..b848c5a5b5 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -185,13 +185,24 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) { if err != nil { return 0, fmt.Errorf("error while reading result row: %w", err) } + if result.Int64 == 0 { c.logger.Warn("Assuming zero offset means no sync has happened") } - return result.Int64, nil } +// SetLastOffset updates the last synced offset for a job. +func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error { + _, err := c.pool. + Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) + if err != nil { + return fmt.Errorf("error setting last offset for job %s: %w", jobName, err) + } + + return nil +} + // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { defer func() { @@ -238,6 +249,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu RelationMessageMapping: req.RelationMessageMapping, CatalogPool: catalogPool, FlowJobName: req.FlowJobName, + SetLastOffset: req.SetLastOffset, }, c.customTypesMapping) if err != nil { return fmt.Errorf("failed to create cdc source: %w", err) diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index 96d16930cc..c40ef05bd4 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -176,7 +176,7 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) { } // update offset for a job -func (c *S3Connector) updateLastOffset(jobName string, offset int64) error { +func (c *S3Connector) SetLastOffset(jobName string, offset int64) error { err := c.pgMetadata.UpdateLastOffset(jobName, offset) if err != nil { c.logger.Error("failed to update last offset: ", slog.Any("error", err)) @@ -218,7 +218,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes return nil, fmt.Errorf("failed to get last checkpoint: %w", err) } - err = c.updateLastOffset(req.FlowJobName, lastCheckpoint) + err = c.SetLastOffset(req.FlowJobName, lastCheckpoint) if err != nil { c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err)) return nil, err diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f92ed3e33e..ad4d952547 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -73,6 +73,7 @@ const ( WHERE TABLE_SCHEMA=? and TABLE_NAME=?` checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?" + setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?" getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?" dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" @@ -301,7 +302,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { }() if !rows.Next() { - c.logger.Warn("No row found ,returning nil") + c.logger.Warn("No row found, returning 0") return 0, nil } var result pgtype.Int8 @@ -311,10 +312,20 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) { } if result.Int64 == 0 { c.logger.Warn("Assuming zero offset means no sync has happened") + return 0, nil } return result.Int64, nil } +func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error { + _, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL, + c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName) + if err != nil { + return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err) + } + return nil +} + func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) { rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) diff --git a/flow/model/model.go b/flow/model/model.go index 02949f3c2e..487616c531 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -50,6 +50,8 @@ type PullRecordsRequest struct { RelationMessageMapping RelationMessageMapping // record batch for pushing changes into RecordStream *CDCRecordStream + // last offset may be forwarded while processing records + SetLastOffset func(int64) error } type Record interface {