Skip to content

Commit

Permalink
Merge branch 'main' into flatten-qvalue-avro
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 20, 2023
2 parents d99f7de + 9521994 commit 8a00957
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 23 deletions.
3 changes: 3 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
},
})
})

Expand Down
17 changes: 17 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,23 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf(
"UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
c.datasetID,
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type CDCSyncConnector interface {
// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(jobName string) (int64, error)

// SetLastOffset updates the last offset on the metadata table on the destination
SetLastOffset(jobName string, lastOffset int64) error

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)

Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err))
Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *EventHubConnector) processBatch(
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
err = c.SetLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
var offset pgtype.Int8
err := rows.Scan(&offset)
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
return 0, nil
}
Expand Down Expand Up @@ -198,7 +197,8 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = $2, updated_at = NOW()
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)

if err != nil {
Expand Down
38 changes: 25 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type PostgresCDCSource struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
SetLastOffset func(int64) error
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string

Expand All @@ -56,6 +56,7 @@ type PostgresCDCConfig struct {
RelationMessageMapping model.RelationMessageMapping
CatalogPool *pgxpool.Pool
FlowJobName string
SetLastOffset func(int64) error
}

// Create a new PostgresCDCSource
Expand All @@ -72,6 +73,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
SetLastOffset: cdcConfig.SetLastOffset,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
Expand Down Expand Up @@ -152,19 +154,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName))

// start replication
p.startLSN = 0
var clientXLogPos, startLSN pglogrepl.LSN
if req.LastOffset > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset))
p.startLSN = pglogrepl.LSN(req.LastOffset + 1)
clientXLogPos = pglogrepl.LSN(req.LastOffset)
startLSN = clientXLogPos + 1
}

err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts)
err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts)
if err != nil {
return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err)
return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err)
}
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN))
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN))

return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream)
return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream)
}

// start consuming the cdc stream
Expand All @@ -181,19 +184,20 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// clientXLogPos is the last checkpoint id, we need to ack that we have processed
// until clientXLogPos each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
consumedXLogPos = clientXLogPos

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
Expand Down Expand Up @@ -252,19 +256,27 @@ func (p *PostgresCDCSource) consumeStream(
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
consumedXLogPos = proposedConsumedXLogPos
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
pkmRequiresResponse = false

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
standByLastLogged = time.Now()
}

pkmRequiresResponse = false
}

if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
Expand Down Expand Up @@ -469,7 +481,7 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ const (
createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)"

getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1"
setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2"
getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1"
getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"

insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)"
checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=$1, sync_batch_id=$2 WHERE mirror_job_name=$3"
updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3"
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2"

getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name,
Expand Down Expand Up @@ -486,6 +487,7 @@ func (c *PostgresConnector) jobMetadataExistsTx(tx pgx.Tx, jobName string) (bool
if err != nil {
return false, fmt.Errorf("error reading result row: %w", err)
}

return result.Bool, nil
}

Expand Down
14 changes: 13 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,24 @@ func (c *PostgresConnector) GetLastOffset(jobName string) (int64, error) {
if err != nil {
return 0, fmt.Errorf("error while reading result row: %w", err)
}

if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened")
}

return result.Int64, nil
}

// SetLastOffset updates the last synced offset for a job.
func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) error {
_, err := c.pool.
Exec(c.ctx, fmt.Sprintf(setLastOffsetSQL, c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName)
if err != nil {
return fmt.Errorf("error setting last offset for job %s: %w", jobName, err)
}

return nil
}

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
Expand Down Expand Up @@ -238,6 +249,7 @@ func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.Pu
RelationMessageMapping: req.RelationMessageMapping,
CatalogPool: catalogPool,
FlowJobName: req.FlowJobName,
SetLastOffset: req.SetLastOffset,
}, c.customTypesMapping)
if err != nil {
return fmt.Errorf("failed to create cdc source: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (c *S3Connector) GetLastOffset(jobName string) (int64, error) {
}

// update offset for a job
func (c *S3Connector) updateLastOffset(jobName string, offset int64) error {
func (c *S3Connector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error("failed to update last offset: ", slog.Any("error", err))
Expand Down Expand Up @@ -218,7 +218,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset for s3 cdc", slog.Any("error", err))
return nil, err
Expand Down
13 changes: 12 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const (
WHERE TABLE_SCHEMA=? and TABLE_NAME=?`
checkIfJobMetadataExistsSQL = "SELECT TO_BOOLEAN(COUNT(1)) FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastOffsetSQL = "SELECT OFFSET FROM %s.%s WHERE MIRROR_JOB_NAME=?"
setLastOffsetSQL = "UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?) WHERE MIRROR_JOB_NAME=?"
getLastSyncBatchID_SQL = "SELECT SYNC_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
getLastNormalizeBatchID_SQL = "SELECT NORMALIZE_BATCH_ID FROM %s.%s WHERE MIRROR_JOB_NAME=?"
dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
Expand Down Expand Up @@ -301,7 +302,7 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
}()

if !rows.Next() {
c.logger.Warn("No row found ,returning nil")
c.logger.Warn("No row found, returning 0")
return 0, nil
}
var result pgtype.Int8
Expand All @@ -311,10 +312,20 @@ func (c *SnowflakeConnector) GetLastOffset(jobName string) (int64, error) {
}
if result.Int64 == 0 {
c.logger.Warn("Assuming zero offset means no sync has happened")
return 0, nil
}
return result.Int64, nil
}

func (c *SnowflakeConnector) SetLastOffset(jobName string, lastOffset int64) error {
_, err := c.database.ExecContext(c.ctx, fmt.Sprintf(setLastOffsetSQL,
c.metadataSchema, mirrorJobsTableIdentifier), lastOffset, jobName)
if err != nil {
return fmt.Errorf("error querying Snowflake peer for last syncedID: %w", err)
}
return nil
}

func (c *SnowflakeConnector) GetLastSyncBatchID(jobName string) (int64, error) {
rows, err := c.database.QueryContext(c.ctx, fmt.Sprintf(getLastSyncBatchID_SQL, c.metadataSchema,
mirrorJobsTableIdentifier), jobName)
Expand Down
2 changes: 2 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PullRecordsRequest struct {
RelationMessageMapping RelationMessageMapping
// record batch for pushing changes into
RecordStream *CDCRecordStream
// last offset may be forwarded while processing records
SetLastOffset func(int64) error
}

type Record interface {
Expand Down

0 comments on commit 8a00957

Please sign in to comment.