Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return only at commit message in Postgres CDC #503

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta,
TableSchemaDeltas: recordsWithTableSchemaDelta.TableSchemaDeltas,
}, nil
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDelta = recordsWithTableSchemaDelta.TableSchemaDelta
res.TableSchemaDeltas = recordsWithTableSchemaDelta.TableSchemaDeltas
res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
Expand Down Expand Up @@ -372,7 +372,7 @@ func (a *FlowableActivity) StartNormalize(
return res, nil
}

func (a *FlowableActivity) ReplayTableSchemaDelta(
func (a *FlowableActivity) ReplayTableSchemaDeltas(
ctx context.Context,
input *protos.ReplayTableSchemaDeltaInput,
) error {
Expand All @@ -384,7 +384,7 @@ func (a *FlowableActivity) ReplayTableSchemaDelta(
}
defer connectors.CloseConnector(dest)

return dest.ReplayTableSchemaDelta(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDelta)
return dest.ReplayTableSchemaDeltas(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDeltas)
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
Expand Down
63 changes: 28 additions & 35 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,39 +205,29 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// ReplayTableSchemaDeltas changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDelta(flowJobName string,
schemaDelta *protos.TableSchemaDelta) error {
if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) {
return nil
}

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN `%s`", c.datasetID,
schemaDelta.DstTableName, droppedColumn)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
schemaDelta.SrcTableName, err)
func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] dropped column %s", droppedColumn)
}
for _, addedColumn := range schemaDelta.AddedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.SrcTableName, err)

for _, addedColumn := range schemaDelta.AddedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.SrcTableName, err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName,
addedColumn.ColumnType)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName,
addedColumn.ColumnType)
}

return nil
Expand Down Expand Up @@ -299,7 +289,8 @@ func (c *BigQueryConnector) GetLastOffset(jobName string) (*protos.LastSyncState
}

func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
query := fmt.Sprintf("SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
Expand All @@ -323,7 +314,8 @@ func (c *BigQueryConnector) GetLastSyncBatchID(jobName string) (int64, error) {
}

func (c *BigQueryConnector) GetLastNormalizeBatchID(jobName string) (int64, error) {
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'", c.datasetID, MirrorJobsTable, jobName)
query := fmt.Sprintf("SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name = '%s'",
c.datasetID, MirrorJobsTable, jobName)
q := c.client.Query(query)
it, err := q.Read(c.ctx)
if err != nil {
Expand Down Expand Up @@ -981,7 +973,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
{Name: "_peerdb_unchanged_toast_columns", Type: bigquery.StringFieldType},
}

staging_schema := bigquery.Schema{
stagingSchema := bigquery.Schema{
{Name: "_peerdb_uid", Type: bigquery.StringFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType},
{Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType},
Expand Down Expand Up @@ -1022,7 +1014,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
stagingTableName := c.getStagingTableName(req.FlowJobName)
stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName)
err = stagingTable.Create(c.ctx, &bigquery.TableMetadata{
Schema: staging_schema,
Schema: stagingSchema,
})
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, stagingTableName, err)
Expand All @@ -1034,7 +1026,8 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}

// getUpdateMetadataStmt updates the metadata tables for a given job.
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64, batchID int64) (string, error) {
func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedCheckpointID int64,
batchID int64) (string, error) {
hasJob, err := c.metadataHasJob(jobName)
if err != nil {
return "", fmt.Errorf("failed to check if job exists: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type CDCNormalizeConnector interface {

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error
ReplayTableSchemaDeltas(flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
}

type QRepPullConnector interface {
Expand Down
68 changes: 36 additions & 32 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type PostgresCDCSource struct {
relationMessageMapping model.RelationMessageMapping
typeMap *pgtype.Map
startLSN pglogrepl.LSN
commitLock bool
}

type PostgresCDCConfig struct {
Expand All @@ -52,6 +53,7 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) (*PostgresCDCSource, err
publication: cdcConfig.Publication,
relationMessageMapping: cdcConfig.RelationMessageMapping,
typeMap: pgtype.NewMap(),
commitLock: false,
}, nil
}

Expand Down Expand Up @@ -125,13 +127,12 @@ func (p *PostgresCDCSource) consumeStream(
}
result := &model.RecordsWithTableSchemaDelta{
RecordBatch: records,
TableSchemaDelta: nil,
TableSchemaDeltas: nil,
RelationMessageMapping: p.relationMessageMapping,
}

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
earlyReturn := false

defer func() {
err := conn.Close(p.ctx)
Expand All @@ -142,20 +143,21 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

// clientXLogPos is the last checkpoint id + 1, we need to ack that we have processed
// until clientXLogPos - 1 each time we send a standby status update.
// consumedXLogPos is the lsn that has been committed on the destination.
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1
}

for {
if time.Now().After(nextStandbyMessageDeadline) ||
earlyReturn ||
(len(records.Records) == int(req.MaxBatchSize)) {
// update the WALWritePosition to be clientXLogPos - 1
// as the clientXLogPos is the last checkpoint id + 1
// and we want to send the last checkpoint id as the last
// checkpoint id that we have processed.
lastProcessedXLogPos := clientXLogPos
if clientXLogPos > 0 {
lastProcessedXLogPos = clientXLogPos - 1
}
// Update XLogPos to the last processed position, we can only confirm
// that this is the last row committed on the destination.
err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: lastProcessedXLogPos})
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return nil, fmt.Errorf("SendStandbyStatusUpdate failed: %w", err)
}
Expand All @@ -165,20 +167,21 @@ func (p *PostgresCDCSource) consumeStream(
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if earlyReturn || (len(records.Records) == int(req.MaxBatchSize)) {
if !p.commitLock && (len(records.Records) == int(req.MaxBatchSize)) {
return result, nil
}
}

ctx, cancel := context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
rawMsg, err := conn.ReceiveMessage(ctx)
cancel()
if err != nil {
if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
log.Infof("Idle timeout reached, returning currently accumulated records")
return result, nil
} else {
return nil, fmt.Errorf("ReceiveMessage failed: %w", err)
}
return nil, fmt.Errorf("ReceiveMessage failed: %w", err)
}

if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
Expand Down Expand Up @@ -285,12 +288,11 @@ func (p *PostgresCDCSource) consumeStream(
case *model.DeleteRecord:
records.Records = append(records.Records, rec)
case *model.RelationRecord:
tableSchemaDelta := rec.(*model.RelationRecord).TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 || len(tableSchemaDelta.DroppedColumns) > 0 {
result.TableSchemaDelta = tableSchemaDelta
log.Infof("Detected schema change for table %s, returning currently accumulated records",
result.TableSchemaDelta.SrcTableName)
earlyReturn = true
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
log.Infof("Detected schema change for table %s, addedColumns: %v",
tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns)
result.TableSchemaDeltas = append(result.TableSchemaDeltas, tableSchemaDelta)
}
}
}
Expand All @@ -311,7 +313,9 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre

switch msg := logicalMsg.(type) {
case *pglogrepl.BeginMessage:
log.Debugf("Ignoring BeginMessage")
log.Debugf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid)
log.Debugf("Locking PullRecords at BeginMessage, awaiting CommitMessage")
p.commitLock = true
case *pglogrepl.InsertMessage:
return p.processInsertMessage(xld.WALStart, msg)
case *pglogrepl.UpdateMessage:
Expand All @@ -320,7 +324,10 @@ func (p *PostgresCDCSource) processMessage(batch *model.RecordBatch, xld pglogre
return p.processDeleteMessage(xld.WALStart, msg)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
log.Debugf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN)
batch.LastCheckPointID = int64(xld.WALStart)
p.commitLock = false
case *pglogrepl.RelationMessage:
// TODO (kaushik): consider persistent state for a mirror job
// to be stored somewhere in temporal state. We might need to persist
Expand Down Expand Up @@ -563,10 +570,9 @@ func (p *PostgresCDCSource) processRelationMessage(
schemaDelta := &protos.TableSchemaDelta{
// set it to the source table for now, so we can update the schema on the source side
// then at the Workflow level we set it t
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
DroppedColumns: make([]string, 0),
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationId],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationId]],
AddedColumns: make([]*protos.DeltaAddedColumn, 0),
}
for _, column := range currRel.Columns {
// not present in previous relation message, but in current one, so added.
Expand All @@ -578,17 +584,15 @@ func (p *PostgresCDCSource) processRelationMessage(
// present in previous and current relation messages, but data types have changed.
// so we add it to AddedColumns and DroppedColumns, knowing that we process DroppedColumns first.
} else if prevRelMap[column.Name].RelId != currRelMap[column.Name].RelId {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.DeltaAddedColumn{
ColumnName: column.Name,
ColumnType: string(postgresOIDToQValueKind(column.DataType)),
})
log.Warnf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName)
}
}
for _, column := range prevRel.Columns {
// present in previous relation message, but not in current one, so dropped.
if currRelMap[column.Name] == nil {
schemaDelta.DroppedColumns = append(schemaDelta.DroppedColumns, column.Name)
log.Warnf("Detected dropped column %s in table %s, but not propagating", column,
schemaDelta.SrcTableName)
}
}

Expand Down
58 changes: 24 additions & 34 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,16 +667,13 @@ func (c *PostgresConnector) InitializeTableSchema(req map[string]*protos.TableSc

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error {
if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) {
return nil
}

func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
schemaDeltas []*protos.TableSchemaDelta) error {
// Postgres is cool and supports transactional DDL. So we use a transaction.
tableSchemaModifyTx, err := c.pool.Begin(c.ctx)
if err != nil {
return fmt.Errorf("error starting transaction for schema modification for table %s: %w",
schemaDelta.DstTableName, err)
return fmt.Errorf("error starting transaction for schema modification: %w",
err)
}
defer func() {
deferErr := tableSchemaModifyTx.Rollback(c.ctx)
Expand All @@ -687,39 +684,32 @@ func (c *PostgresConnector) ReplayTableSchemaDelta(flowJobName string, schemaDel
}
}()

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN \"%s\"",
schemaDelta.DstTableName, droppedColumn))
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
schemaDelta.DstTableName, err)
for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
return nil
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"srcTableName": schemaDelta.SrcTableName,
"dstTableName": schemaDelta.DstTableName,
}).Infof("[schema delta replay] dropped column %s", droppedColumn)
}
for _, addedColumn := range schemaDelta.AddedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s",
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToPostgresType(addedColumn.ColumnType)))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)

for _, addedColumn := range schemaDelta.AddedColumns {
_, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s",
schemaDelta.DstTableName, addedColumn.ColumnName,
qValueKindToPostgresType(addedColumn.ColumnType)))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"srcTableName": schemaDelta.SrcTableName,
"dstTableName": schemaDelta.DstTableName,
}).Infof("[schema delta replay] added column %s with data type %s",
addedColumn.ColumnName, addedColumn.ColumnType)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"srcTableName": schemaDelta.SrcTableName,
"dstTableName": schemaDelta.DstTableName,
}).Infof("[schema delta replay] added column %s with data type %s",
addedColumn.ColumnName, addedColumn.ColumnType)
}

err = tableSchemaModifyTx.Commit(c.ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction for table schema modification for table %s: %w",
schemaDelta.DstTableName, err)
return fmt.Errorf("failed to commit transaction for table schema modification: %w",
err)
}

return nil
Expand Down
Loading
Loading