Skip to content

Commit

Permalink
Basic ADD COLUMN and DROP COLUMN replay support for PG, BQ and SF (#368)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Sep 6, 2023
1 parent b49c51e commit 40a454a
Show file tree
Hide file tree
Showing 30 changed files with 3,027 additions and 870 deletions.
42 changes: 31 additions & 11 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func (a *FlowableActivity) CreateNormalizedTable(
}

// StartFlow implements StartFlow.
func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlowInput) (*model.SyncResponse, error) {
func (a *FlowableActivity) StartFlow(ctx context.Context,
input *protos.StartFlowInput) (*model.SyncResponse, error) {
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
Expand Down Expand Up @@ -189,7 +190,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
}).Info("pulling records...")

startTime := time.Now()
records, err := src.PullRecords(&model.PullRecordsRequest{
recordsWithTableSchemaDelta, err := src.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: input.FlowConnectionConfigs.TableNameMapping,
Expand All @@ -199,11 +200,13 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
})
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}
if a.CatalogMirrorMonitor.IsActive() && len(records.Records) > 0 {
recordBatch := recordsWithTableSchemaDelta.RecordBatch
if a.CatalogMirrorMonitor.IsActive() && len(recordBatch.Records) > 0 {
syncBatchID, err := dest.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
Expand All @@ -212,9 +215,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: uint32(len(records.Records)),
BatchStartLSN: pglogrepl.LSN(records.FirstCheckPointID),
BatchEndlSN: pglogrepl.LSN(records.LastCheckPointID),
RowsInBatch: uint32(len(recordBatch.Records)),
BatchStartLSN: pglogrepl.LSN(recordBatch.FirstCheckPointID),
BatchEndlSN: pglogrepl.LSN(recordBatch.LastCheckPointID),
StartTime: startTime,
})
if err != nil {
Expand All @@ -223,7 +226,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
}

// log the number of records
numRecords := len(records.Records)
numRecords := len(recordBatch.Records)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Printf("pulled %d records", numRecords)
Expand All @@ -233,11 +236,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
return nil, nil
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDelta: recordsWithTableSchemaDelta.TableSchemaDelta,
}, nil
}

res, err := dest.SyncRecords(&model.SyncRecordsRequest{
Records: records,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncMode: input.FlowConnectionConfigs.CdcSyncMode,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
Expand All @@ -254,7 +260,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(records.LastCheckPointID))
pglogrepl.LSN(recordBatch.LastCheckPointID))
if err != nil {
return nil, err
}
Expand All @@ -265,7 +271,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
return nil, err
}
}

res.TableSchemaDelta = recordsWithTableSchemaDelta.TableSchemaDelta
res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping
activity.RecordHeartbeat(ctx, "pushed records")

return res, nil
Expand Down Expand Up @@ -325,6 +332,19 @@ func (a *FlowableActivity) StartNormalize(
return res, nil
}

func (a *FlowableActivity) ReplayTableSchemaDelta(
ctx context.Context,
input *protos.ReplayTableSchemaDeltaInput,
) error {
dest, err := connectors.GetConnector(ctx, input.FlowConnectionConfigs.Destination)
defer connectors.CloseConnector(dest)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}

return dest.ReplayTableSchemaDelta(input.FlowConnectionConfigs.FlowJobName, input.TableSchemaDelta)
}

// SetupQRepMetadataTables sets up the metadata tables for QReplication.
func (a *FlowableActivity) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
conn, err := connectors.GetConnector(ctx, config.DestinationPeer)
Expand Down
40 changes: 39 additions & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,43 @@ func (c *BigQueryConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDelta(flowJobName string,
schemaDelta *protos.TableSchemaDelta) error {
if (schemaDelta == nil) || (len(schemaDelta.AddedColumns) == 0 && len(schemaDelta.DroppedColumns) == 0) {
return nil
}

for _, droppedColumn := range schemaDelta.DroppedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN %s", c.datasetID,
schemaDelta.DstTableName, droppedColumn)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to drop column %s for table %s: %w", droppedColumn,
schemaDelta.SrcTableName, err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] dropped column %s", droppedColumn)
}
for _, addedColumn := range schemaDelta.AddedColumns {
_, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN %s %s", c.datasetID,
schemaDelta.DstTableName, addedColumn.ColumnName, addedColumn.ColumnType)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.SrcTableName, err)
}
log.WithFields(log.Fields{
"flowName": flowJobName,
"tableName": schemaDelta.SrcTableName,
}).Infof("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName,
addedColumn.ColumnType)
}

return nil
}

// SetupMetadataTables sets up the metadata tables.
func (c *BigQueryConnector) SetupMetadataTables() error {
// check if the dataset exists
Expand Down Expand Up @@ -383,7 +420,8 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
}

// PullRecords pulls records from the source.
func (c *BigQueryConnector) PullRecords(req *model.PullRecordsRequest) (*model.RecordBatch, error) {
func (c *BigQueryConnector) PullRecords(req *model.PullRecordsRequest) (
*model.RecordsWithTableSchemaDelta, error) {
panic("not implemented")
}

Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ type Connector interface {

// InitializeTableSchema initializes the table schema of all the destination tables for the connector.
InitializeTableSchema(req map[string]*protos.TableSchema) error
// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error

// Methods related to retrieving and pusing records for this connector as a source and destination.

// PullRecords pulls records from the source, and returns a RecordBatch.
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(req *model.PullRecordsRequest) (*model.RecordBatch, error)
PullRecords(req *model.PullRecordsRequest) (*model.RecordsWithTableSchemaDelta, error)

// SyncRecords pushes records to the destination peer and stores it in PeerDB specific tables.
// This method should be idempotent, and should be able to be called multiple times with the same request.
Expand Down
7 changes: 6 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ func (c *EventHubConnector) InitializeTableSchema(req map[string]*protos.TableSc
return nil
}

func (c *EventHubConnector) PullRecords(req *model.PullRecordsRequest) (*model.RecordBatch, error) {
func (c *EventHubConnector) ReplayTableSchemaDelta(flowJobName string, schemaDelta *protos.TableSchemaDelta) error {
log.Warnf("ReplayTableSchemaDelta is a no-op for EventHub flow connector")
return nil
}

func (c *EventHubConnector) PullRecords(req *model.PullRecordsRequest) (*model.RecordsWithTableSchemaDelta, error) {
panic("pull records not implemented for event hub")
}

Expand Down
Loading

0 comments on commit 40a454a

Please sign in to comment.