Skip to content

Commit

Permalink
Return only at commit message in Postgres CDC (#503)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Biju <[email protected]>
  • Loading branch information
iskakaushik and heavycrystal authored Oct 12, 2023
1 parent 1716500 commit 4d42a0e
Show file tree
Hide file tree
Showing 21 changed files with 415 additions and 667 deletions.
8 changes: 4 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta,
TableSchemaDeltas: recordsWithTableSchemaDelta.TableSchemaDeltas,
}, nil
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDelta = recordsWithTableSchemaDelta.TableSchemaDelta
res.TableSchemaDeltas = recordsWithTableSchemaDelta.TableSchemaDeltas
res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
Expand Down Expand Up @@ -372,7 +372,7 @@ func (a *FlowableActivity) StartNormalize(
return res, nil
}

func (a *FlowableActivity) ReplayTableSchemaDelta(
func (a *FlowableActivity) ReplayTableSchemaDeltas(
ctx context.Context,
input *protos.ReplayTableSchemaDeltaInput,
) error {
Expand All @@ -384,7 +384,7 @@ func (a *FlowableActivity) ReplayTableSchemaDelta(
}
defer connectors.CloseConnector(dest)

return dest.ReplayTableSchemaDelta(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDelta)
return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand Down
63 changes: 28 additions & 35 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,39 +205,29 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDelta(flowJobName string,
schemaDelta *protos.TableSchemaDelta) error {
if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) {
return nil
}

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN `%s`", c.datasetID,
schemaDelta.DstTableName, droppedColumn)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
schemaDelta.SrcTableName, err)
func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] dropped column %s", droppedColumn)
}
for _, addedColumn := range schemaDelta.AddedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.SrcTableName, err)

for _, addedColumn := range schemaDelta.AddedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.SrcTableName, err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName,
addedColumn.ColumnType)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName,
addedColumn.ColumnType)
}

return nil
Expand Down Expand Up @@ -299,7 +289,8 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
Expand All @@ -323,7 +314,8 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}

func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
Expand Down Expand Up @@ -981,7 +973,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
{Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType},
}

staging_schema := bigquery.Schema{
stagingSchema := bigquery.Schema{
{Name: "_peerdb_uid", Type: bigquery.StringFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType},
{Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType},
Expand Down Expand Up @@ -1022,7 +1014,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
stagingTableName := c.getStagingTableName(req.FlowJobName)
stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName)
err = stagingTable.Create(c.ctx, &bigquery.TableMetadata{
Schema: staging_schema,
Schema: stagingSchema,
})
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, stagingTableName, err)
Expand All @@ -1034,7 +1026,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, batchID int64) (string, error) {
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type CDCNormalizeConnector interface {

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error
ReplayTableSchemaDeltas(flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
}

type QRepPullConnector interface {
Expand Down
68 changes: 36 additions & 32 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PostgresCDCSource struct {
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
}

type PostgresCDCConfig struct {
Expand All @@ -52,6 +53,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
commitLock: false,
}, nil
}

Expand Down Expand Up @@ -125,13 +127,12 @@ func (p *PostgresCDCSource) consumeStream(
}
result := &model.RecordsWithTableSchemaDelta{
RecordBatch: records,
TableSchemaDelta: nil,
TableSchemaDeltas: nil,
RelationMessageMapping: p.relationMessageMapping,
}

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
earlyReturn := false

defer func() {
err := conn.Close(p.ctx)
Expand All @@ -142,20 +143,21 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
}

for {
if time.Now().After(nextStandbyMessageDeadline) ||
earlyReturn ||
(len(records.Records) == int(req.MaxBatchSize)) {
// update the WALWritePosition to be clientXLogPos - 1
// as the clientXLogPos is the last checkpoint id + 1
// and we want to send the last checkpoint id as the last
// checkpoint id that we have processed.
lastProcessedXLogPos := clientXLogPos
if clientXLogPos > 0 {
lastProcessedXLogPos = clientXLogPos - 1
}
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: lastProcessedXLogPos})
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
Expand All @@ -165,20 +167,21 @@ func (p *PostgresCDCSource) consumeStream(
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) {
if !p.commitLock && (len(records.Records) == int(req.MaxBatchSize)) {
return result, nil
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil {
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records")
return result, nil
} else {
return nil, fmt.Errorf("ReceiveMessage failed: %w", err)
}
return nil, fmt.Errorf("ReceiveMessage failed: %w", err)
}

if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
Expand Down Expand Up @@ -285,12 +288,11 @@ func (p *PostgresCDCSource) consumeStream(
case *model.DeleteRecord:
records.Records = append(records.Records, rec)
case *model.RelationRecord:
tableSchemaDelta := rec.(*model.RelationRecord).TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 || len(tableSchemaDelta.DroppedColumns) > 0 {
result.TableSchemaDelta = tableSchemaDelta
log.Infof("Detected schema change for table %s, returning currently accumulated records",
result.TableSchemaDelta.SrcTableName)
earlyReturn = true
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
log.Infof("Detected schema change for table %s, addedColumns: %v",
tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns)
result.TableSchemaDeltas = append(result.TableSchemaDeltas, tableSchemaDelta)
}
}
}
Expand All @@ -311,7 +313,9 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre

switch msg := logicalMsg.(type) {
case *pglogrepl.BeginMessage:
log.Debugf("Ignoring BeginMessage")
log.Debugf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid)
log.Debugf("Locking PullRecords at BeginMessage, awaiting CommitMessage")
p.commitLock = true
case *pglogrepl.InsertMessage:
return p.processInsertMessage(xld.WALStart, msg)
case *pglogrepl.UpdateMessage:
Expand All @@ -320,7 +324,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
return p.processDeleteMessage(xld.WALStart, msg)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN)
batch.LastCheckPointID = int64(xld.WALStart)
p.commitLock = false
case *pglogrepl.RelationMessage:
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
Expand Down Expand Up @@ -563,10 +570,9 @@ func (p *PostgresCDCSource) processRelationMessage(
schemaDelta := &protos.TableSchemaDelta{
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
DroppedColumns: make([]string, 0),
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
Expand All @@ -578,17 +584,15 @@ func (p *PostgresCDCSource) processRelationMessage(
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(postgresOIDToQValueKind(column.DataType)),
})
log.Warnf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName)
}
}
for _, column := range prevRel.Columns {
// present in previous relation message, but not in current one, so dropped.
if currRelMap[column.Name] == nil {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
log.Warnf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName)
}
}

Expand Down
58 changes: 24 additions & 34 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,16 +667,13 @@ func (c *PostgresConnector) InitializeTableSchema(req map[string]*protos.TableSc

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error {
if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) {
return nil
}

func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
// Postgres is cool and supports transactional DDL. So we use a transaction.
tableSchemaModifyTx, err := c.pool.Begin(c.ctx)
if err != nil {
return fmt.Errorf("error starting transaction for schema modification for table %s: %w",
schemaDelta.DstTableName, err)
return fmt.Errorf("error starting transaction for schema modification: %w",
err)
}
defer func() {
deferErr := tableSchemaModifyTx.Rollback(c.ctx)
Expand All @@ -687,39 +684,32 @@ func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDel
}
}()

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"",
schemaDelta.DstTableName, droppedColumn))
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
schemaDelta.DstTableName, err)
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"srcTableName": schemaDelta.SrcTableName,
"dstTableName": schemaDelta.DstTableName,
}).Infof("[schema delta replay] dropped column %s", droppedColumn)
}
for _, addedColumn := range schemaDelta.AddedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s",
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToPostgresType(addedColumn.ColumnType)))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)

for _, addedColumn := range schemaDelta.AddedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s",
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToPostgresType(addedColumn.ColumnType)))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"srcTableName": schemaDelta.SrcTableName,
"dstTableName": schemaDelta.DstTableName,
}).Infof("[schema delta replay] added column %s with data type %s",
addedColumn.ColumnName, addedColumn.ColumnType)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"srcTableName": schemaDelta.SrcTableName,
"dstTableName": schemaDelta.DstTableName,
}).Infof("[schema delta replay] added column %s with data type %s",
addedColumn.ColumnName, addedColumn.ColumnType)
}

err = tableSchemaModifyTx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction for table schema modification for table %s: %w",
schemaDelta.DstTableName, err)
return fmt.Errorf("failed to commit transaction for table schema modification: %w",
err)
}

return nil
Expand Down
Loading

0 comments on commit 4d42a0e

Please sign in to comment.