Skip to content

Commit

Permalink
Merge branch 'main' into soft-delete-tests-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 21, 2023
2 parents dbe2ad6 + 65c585a commit 7dc93c7
Show file tree
Hide file tree
Showing 30 changed files with 710 additions and 423 deletions.
3 changes: 3 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
SetLastOffset: func(lastOffset int64) error {
return dstConn.SetLastOffset(input.FlowConnectionConfigs.FlowJobName, lastOffset)
},
})
})

Expand Down
17 changes: 17 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,23 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (int64, error) {
}
}

func (c *BigQueryConnector) SetLastOffset(jobName string, lastOffset int64) error {
query := fmt.Sprintf(
"UPDATE %s.%s SET offset = GREATEST(offset, %d) WHERE mirror_job_name = '%s'",
c.datasetID,
MirrorJobsTable,
lastOffset,
jobName,
)
q := c.client.Query(query)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to run query %s on BigQuery:\n %w", query, err)
}

return nil
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
(_peerdb_deduped._peerdb_record_type != 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)

// generates update statements for the case where updates and deletes happen in the same branch
// the backfill has happened from the pull side already, so treat the DeleteRecord as an update
// and then set soft-delete to true.
if peerdbCols.SoftDelete && (peerdbCols.SoftDeleteColName != "") {
tmpArray = append(tmpArray[:len(tmpArray)-1],
fmt.Sprintf("`%s` = TRUE", peerdbCols.SoftDeleteColName))
ssep := strings.Join(tmpArray, ", ")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
}
return updateStmts
}
50 changes: 30 additions & 20 deletions flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,31 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
unchangedToastCols := []string{"", "col2, col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," +
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2, col3' " +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col2'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col3` = _peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type != 2)" +
" AND _peerdb_unchanged_toast_columns='col3'" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1," +
" `col2` = _peerdb_deduped.col2," + "`synced_at`=CURRENT_TIMESTAMP," + "`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns=''" +
" THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2,`col3`=_peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) " +
"AND _peerdb_unchanged_toast_columns='' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col2`=_peerdb_deduped.col2," +
"`col3`=_peerdb_deduped.col3,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col2,col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col2,col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) " +
"AND _peerdb_unchanged_toast_columns='col2' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE",
"WHEN MATCHED AND(_peerdb_deduped._peerdb_record_type=2) " +
"AND _peerdb_unchanged_toast_columns='col2' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1,`col3`=_peerdb_deduped.col3," +
"`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type!=2) AND _peerdb_unchanged_toast_columns='col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1," +
"`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=FALSE ",
"WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type=2) AND _peerdb_unchanged_toast_columns='col3' " +
"THEN UPDATE SET `col1`=_peerdb_deduped.col1," +
"`col2`=_peerdb_deduped.col2,`synced_at`=CURRENT_TIMESTAMP,`deleted`=TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols, &protos.PeerDBColumns{
Expand All @@ -47,7 +53,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
}

if !reflect.DeepEqual(result, expected) {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
t.Errorf("Unexpected result. Expected: %v,\nbut got: %v", expected, result)
}
}

Expand All @@ -65,6 +71,10 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) {
" `col3` = _peerdb_deduped.col3," +
" `synced_at`=CURRENT_TIMESTAMP," +
"`deleted`=FALSE",
"WHEN MATCHED AND" +
"(_peerdb_deduped._peerdb_record_type = 2) AND _peerdb_unchanged_toast_columns=''" +
"THEN UPDATE SET `col1` = _peerdb_deduped.col1, `col2` = _peerdb_deduped.col2, " +
"`col3` = _peerdb_deduped.col3, `synced_at` = CURRENT_TIMESTAMP, `deleted` = TRUE",
}

result := m.generateUpdateStatements(allCols, unchangedToastCols,
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *BigQueryConnector) SyncQRepRecords(

avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName)
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
13 changes: 9 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "")
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "", "")
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -108,6 +108,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
dstTableMetadata *bigquery.TableMetadata,
stream *model.QRecordStream,
syncedAtCol string,
softDeleteCol string,
) (int, error) {
startTime := time.Now()
flowLog := slog.Group("sync_metadata",
Expand All @@ -116,7 +117,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
slog.String("destinationTable", dstTableName),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol, softDeleteCol)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand All @@ -139,8 +140,11 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector = "*, CURRENT_TIMESTAMP"
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
Expand Down Expand Up @@ -187,12 +191,13 @@ type AvroSchema struct {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata,
syncedAtCol string,
softDeleteCol string,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
if bqField.Name == syncedAtCol {
if bqField.Name == syncedAtCol || bqField.Name == softDeleteCol {
continue
}
avroType, err := GetAvroType(bqField)
Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type CDCSyncConnector interface {
// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(jobName string) (int64, error)

// SetLastOffset updates the last offset on the metadata table on the destination
SetLastOffset(jobName string, lastOffset int64) error

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(jobName string) (int64, error)

Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *EventHubConnector) GetLastOffset(jobName string) (int64, error) {
return c.pgMetadata.FetchLastOffset(jobName)
}

func (c *EventHubConnector) updateLastOffset(jobName string, offset int64) error {
func (c *EventHubConnector) SetLastOffset(jobName string, offset int64) error {
err := c.pgMetadata.UpdateLastOffset(jobName, offset)
if err != nil {
c.logger.Error(fmt.Sprintf("failed to update last offset: %v", err))
Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *EventHubConnector) processBatch(
}

if lastSeenLSN > lastUpdatedOffset {
err = c.updateLastOffset(flowJobName, lastSeenLSN)
err = c.SetLastOffset(flowJobName, lastSeenLSN)
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
return nil, err
}

err = c.updateLastOffset(req.FlowJobName, lastCheckpoint)
err = c.SetLastOffset(req.FlowJobName, lastCheckpoint)
if err != nil {
c.logger.Error("failed to update last offset", slog.Any("error", err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func (p *PostgresMetadataStore) FetchLastOffset(jobName string) (int64, error) {
var offset pgtype.Int8
err := rows.Scan(&offset)
if err != nil {
// if the job doesn't exist, return 0
if err.Error() == "no rows in result set" {
return 0, nil
}
Expand Down Expand Up @@ -198,7 +197,8 @@ func (p *PostgresMetadataStore) UpdateLastOffset(jobName string, offset int64) e
INSERT INTO `+p.schemaName+`.`+lastSyncStateTableName+` (job_name, last_offset, sync_batch_id)
VALUES ($1, $2, $3)
ON CONFLICT (job_name)
DO UPDATE SET last_offset = $2, updated_at = NOW()
DO UPDATE SET last_offset = GREATEST(`+lastSyncStateTableName+`.last_offset, excluded.last_offset),
updated_at = NOW()
`, jobName, offset, 0)

if err != nil {
Expand Down
38 changes: 25 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type PostgresCDCSource struct {
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
slot string
SetLastOffset func(int64) error
publication string
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
customTypeMapping map[uint32]string

Expand All @@ -56,6 +56,7 @@ type PostgresCDCConfig struct {
RelationMessageMapping model.RelationMessageMapping
CatalogPool *pgxpool.Pool
FlowJobName string
SetLastOffset func(int64) error
}

// Create a new PostgresCDCSource
Expand All @@ -72,6 +73,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32
SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping,
TableNameMapping: cdcConfig.TableNameMapping,
slot: cdcConfig.Slot,
SetLastOffset: cdcConfig.SetLastOffset,
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
Expand Down Expand Up @@ -152,19 +154,20 @@ func (p *PostgresCDCSource) PullRecords(req *model.PullRecordsRequest) error {
sysident.SystemID, sysident.Timeline, sysident.XLogPos, sysident.DBName))

// start replication
p.startLSN = 0
var clientXLogPos, startLSN pglogrepl.LSN
if req.LastOffset > 0 {
p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset))
p.startLSN = pglogrepl.LSN(req.LastOffset + 1)
clientXLogPos = pglogrepl.LSN(req.LastOffset)
startLSN = clientXLogPos + 1
}

err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, p.startLSN, replicationOpts)
err = pglogrepl.StartReplication(p.ctx, pgConn, replicationSlot, startLSN, replicationOpts)
if err != nil {
return fmt.Errorf("error starting replication at startLsn - %d: %w", p.startLSN, err)
return fmt.Errorf("error starting replication at startLsn - %d: %w", startLSN, err)
}
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, p.startLSN))
p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN))

return p.consumeStream(pgConn, req, p.startLSN, req.RecordStream)
return p.consumeStream(pgConn, req, clientXLogPos, req.RecordStream)
}

// start consuming the cdc stream
Expand All @@ -181,19 +184,20 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// clientXLogPos is the last checkpoint id, we need to ack that we have processed
// until clientXLogPos each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
consumedXLogPos = clientXLogPos

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}
proposedConsumedXLogPos := consumedXLogPos

var standByLastLogged time.Time
cdcRecordsStorage := cdc_records.NewCDCRecordsStore(p.flowJobName)
Expand Down Expand Up @@ -252,19 +256,27 @@ func (p *PostgresCDCSource) consumeStream(
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
if proposedConsumedXLogPos > consumedXLogPos {
p.logger.Info(fmt.Sprintf("Heartbeat adjusting lsn from %d to %d", consumedXLogPos, proposedConsumedXLogPos))
consumedXLogPos = proposedConsumedXLogPos
err := p.SetLastOffset(int64(consumedXLogPos))
if err != nil {
return fmt.Errorf("storing updated LSN failed: %w", err)
}
}

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
pkmRequiresResponse = false

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info(fmt.Sprintf("Sent Standby status message. %s", numRowsProcessedMessage))
standByLastLogged = time.Now()
}

pkmRequiresResponse = false
}

if (cdcRecordsStorage.Len() >= int(req.MaxBatchSize)) && !p.commitLock {
Expand Down Expand Up @@ -469,7 +481,7 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
// given that we have no records it is safe to update the flush wal position
// to the clientXLogPos. clientXLogPos can be moved forward due to PKM messages.
consumedXLogPos = clientXLogPos
proposedConsumedXLogPos = clientXLogPos
records.UpdateLatestCheckpoint(int64(clientXLogPos))
}
}
Expand Down
Loading

0 comments on commit 7dc93c7

Please sign in to comment.