From 7d1d9a2db1add651ca2dbfd80aa0e81a92d3cf37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 23 Jan 2024 19:20:04 +0000 Subject: [PATCH] Change CDCRecordStream TableSchemaChanges from channel to slice Simpler alternative to #1134 --- flow/activities/flowable.go | 5 +-- flow/connectors/bigquery/qrep_avro_sync.go | 5 +-- flow/connectors/eventhub/eventhub.go | 2 +- flow/connectors/postgres/cdc.go | 2 +- flow/connectors/postgres/postgres.go | 5 +-- flow/connectors/s3/s3.go | 2 +- flow/connectors/snowflake/snowflake.go | 5 +-- flow/model/model.go | 49 ++++++++-------------- 8 files changed, 29 insertions(+), 46 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index efda2433bc..8dd7ee3124 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -261,15 +261,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, return nil, fmt.Errorf("failed in pull records when: %w", err) } slog.InfoContext(ctx, "no records to push") - tableSchemaDeltas := recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings) - err := dstConn.ReplayTableSchemaDeltas(flowName, tableSchemaDeltas) + err := dstConn.ReplayTableSchemaDeltas(flowName, recordBatch.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } return &model.SyncResponse{ - TableSchemaDeltas: tableSchemaDeltas, + TableSchemaDeltas: recordBatch.SchemaDeltas, RelationMessageMapping: input.RelationMessageMapping, }, nil } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 62759e32a3..fa4ec2edf4 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -85,8 +85,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( req.FlowJobName, rawTableName, syncBatchID), ) - tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings) - err = s.connector.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas) + err = s.connector.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -123,7 +122,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, - TableSchemaDeltas: tableSchemaDeltas, + TableSchemaDeltas: req.Records.SchemaDeltas, }, nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 9eab9ae1b2..f7fb99fb3b 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -255,7 +255,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S LastSyncedCheckpointID: lastCheckpoint, NumRecordsSynced: int64(numRecords), TableNameRowsMapping: make(map[string]uint32), - TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings), + TableSchemaDeltas: req.Records.SchemaDeltas, }, nil } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7959e5ff67..8029b4ae60 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -507,7 +507,7 @@ func (p *PostgresCDCSource) consumeStream( if len(tableSchemaDelta.AddedColumns) > 0 { p.logger.Info(fmt.Sprintf("Detected schema change for table %s, addedColumns: %v", tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns)) - records.SchemaDeltas <- tableSchemaDelta + records.AddSchemaDelta(req.TableNameMapping, tableSchemaDelta) } } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 7fdd6b032d..6bc8aae212 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -350,8 +350,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S } } - tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings) - err := c.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas) + err := c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -412,7 +411,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S NumRecordsSynced: int64(len(records)), CurrentSyncBatchID: req.SyncBatchID, TableNameRowsMapping: tableNameRowsMapping, - TableSchemaDeltas: tableSchemaDeltas, + TableSchemaDeltas: req.Records.SchemaDeltas, }, nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index db96a927fb..19e8cbafe4 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -224,7 +224,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes LastSyncedCheckpointID: lastCheckpoint, NumRecordsSynced: int64(numRecords), TableNameRowsMapping: tableNameRowsMapping, - TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings), + TableSchemaDeltas: req.Records.SchemaDeltas, }, nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index b96d6d9360..3a8a64f409 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -547,8 +547,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( return nil, err } - tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings) - err = c.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas) + err = c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas) if err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -563,7 +562,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID, TableNameRowsMapping: tableNameRowsMapping, - TableSchemaDeltas: tableSchemaDeltas, + TableSchemaDeltas: req.Records.SchemaDeltas, }, nil } diff --git a/flow/model/model.go b/flow/model/model.go index 09800251bf..a01a6b29e0 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "math/big" - "slices" "sync/atomic" "time" @@ -425,7 +424,7 @@ type CDCRecordStream struct { // Records are a list of json objects. records chan Record // Schema changes from the slot - SchemaDeltas chan *protos.TableSchemaDelta + SchemaDeltas []*protos.TableSchemaDelta // Indicates if the last checkpoint has been set. lastCheckpointSet bool // lastCheckpointID is the last ID of the commit that corresponds to this batch. @@ -437,9 +436,8 @@ type CDCRecordStream struct { func NewCDCRecordStream() *CDCRecordStream { channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize() return &CDCRecordStream{ - records: make(chan Record, channelBuffer), - // TODO (kaushik): more than 1024 schema deltas can cause problems! - SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10), + records: make(chan Record, channelBuffer), + SchemaDeltas: make([]*protos.TableSchemaDelta, 0), emptySignal: make(chan bool, 1), lastCheckpointSet: false, lastCheckpointID: atomic.Int64{}, @@ -479,40 +477,29 @@ func (r *CDCRecordStream) WaitAndCheckEmpty() bool { return isEmpty } -func (r *CDCRecordStream) WaitForSchemaDeltas(tableMappings []*protos.TableMapping) []*protos.TableSchemaDelta { - schemaDeltas := make([]*protos.TableSchemaDelta, 0) -schemaLoop: - for delta := range r.SchemaDeltas { - for _, tm := range tableMappings { - if delta.SrcTableName == tm.SourceTableIdentifier && delta.DstTableName == tm.DestinationTableIdentifier { - if len(tm.Exclude) == 0 { - break - } - added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns)) - for _, column := range delta.AddedColumns { - if !slices.Contains(tm.Exclude, column.ColumnName) { - added = append(added, column) - } - } - if len(added) != 0 { - schemaDeltas = append(schemaDeltas, &protos.TableSchemaDelta{ - SrcTableName: delta.SrcTableName, - DstTableName: delta.DstTableName, - AddedColumns: added, - }) - } - continue schemaLoop +func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta) { + if tm, ok := tableNameMapping[delta.SrcTableName]; ok && len(tm.Exclude) != 0 { + added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns)) + for _, column := range delta.AddedColumns { + if _, has := tm.Exclude[column.ColumnName]; !has { + added = append(added, column) } } - schemaDeltas = append(schemaDeltas, delta) + if len(added) != 0 { + r.SchemaDeltas = append(r.SchemaDeltas, &protos.TableSchemaDelta{ + SrcTableName: delta.SrcTableName, + DstTableName: delta.DstTableName, + AddedColumns: added, + }) + } + } else { + r.SchemaDeltas = append(r.SchemaDeltas, delta) } - return schemaDeltas } func (r *CDCRecordStream) Close() { close(r.emptySignal) close(r.records) - close(r.SchemaDeltas) r.lastCheckpointSet = true }