Skip to content

Commit

Permalink
Change CDCRecordStream TableSchemaChanges from channel to slice
Browse files Browse the repository at this point in the history
Simpler alternative to #1134
  • Loading branch information
serprex committed Jan 23, 2024
1 parent cd6086c commit 7d1d9a2
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 46 deletions.
5 changes: 2 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed in pull records when: %w", err)
}
slog.InfoContext(ctx, "no records to push")
tableSchemaDeltas := recordBatch.WaitForSchemaDeltas(input.FlowConnectionConfigs.TableMappings)

err := dstConn.ReplayTableSchemaDeltas(flowName, tableSchemaDeltas)
err := dstConn.ReplayTableSchemaDeltas(flowName, recordBatch.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

return &model.SyncResponse{
TableSchemaDeltas: tableSchemaDeltas,
TableSchemaDeltas: recordBatch.SchemaDeltas,
RelationMessageMapping: input.RelationMessageMapping,
}, nil
}
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
req.FlowJobName, rawTableName, syncBatchID),
)

tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings)
err = s.connector.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas)
err = s.connector.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -123,7 +122,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: make(map[string]uint32),
TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings),
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (p *PostgresCDCSource) consumeStream(
if len(tableSchemaDelta.AddedColumns) > 0 {
p.logger.Info(fmt.Sprintf("Detected schema change for table %s, addedColumns: %v",
tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns))
records.SchemaDeltas <- tableSchemaDelta
records.AddSchemaDelta(req.TableNameMapping, tableSchemaDelta)
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}
}

tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings)
err := c.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas)
err := c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -412,7 +411,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
NumRecordsSynced: int64(len(records)),
CurrentSyncBatchID: req.SyncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings),
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
5 changes: 2 additions & 3 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
return nil, err
}

tableSchemaDeltas := req.Records.WaitForSchemaDeltas(req.TableMappings)
err = c.ReplayTableSchemaDeltas(req.FlowJobName, tableSchemaDeltas)
err = c.ReplayTableSchemaDeltas(req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand All @@ -563,7 +562,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
49 changes: 18 additions & 31 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math"
"math/big"
"slices"
"sync/atomic"
"time"

Expand Down Expand Up @@ -425,7 +424,7 @@ type CDCRecordStream struct {
// Records are a list of json objects.
records chan Record
// Schema changes from the slot
SchemaDeltas chan *protos.TableSchemaDelta
SchemaDeltas []*protos.TableSchemaDelta
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckpointID is the last ID of the commit that corresponds to this batch.
Expand All @@ -437,9 +436,8 @@ type CDCRecordStream struct {
func NewCDCRecordStream() *CDCRecordStream {
channelBuffer := peerdbenv.PeerDBCDCChannelBufferSize()
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
records: make(chan Record, channelBuffer),
SchemaDeltas: make([]*protos.TableSchemaDelta, 0),
emptySignal: make(chan bool, 1),
lastCheckpointSet: false,
lastCheckpointID: atomic.Int64{},
Expand Down Expand Up @@ -479,40 +477,29 @@ func (r *CDCRecordStream) WaitAndCheckEmpty() bool {
return isEmpty
}

func (r *CDCRecordStream) WaitForSchemaDeltas(tableMappings []*protos.TableMapping) []*protos.TableSchemaDelta {
schemaDeltas := make([]*protos.TableSchemaDelta, 0)
schemaLoop:
for delta := range r.SchemaDeltas {
for _, tm := range tableMappings {
if delta.SrcTableName == tm.SourceTableIdentifier && delta.DstTableName == tm.DestinationTableIdentifier {
if len(tm.Exclude) == 0 {
break
}
added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns))
for _, column := range delta.AddedColumns {
if !slices.Contains(tm.Exclude, column.ColumnName) {
added = append(added, column)
}
}
if len(added) != 0 {
schemaDeltas = append(schemaDeltas, &protos.TableSchemaDelta{
SrcTableName: delta.SrcTableName,
DstTableName: delta.DstTableName,
AddedColumns: added,
})
}
continue schemaLoop
func (r *CDCRecordStream) AddSchemaDelta(tableNameMapping map[string]NameAndExclude, delta *protos.TableSchemaDelta) {
if tm, ok := tableNameMapping[delta.SrcTableName]; ok && len(tm.Exclude) != 0 {
added := make([]*protos.DeltaAddedColumn, 0, len(delta.AddedColumns))
for _, column := range delta.AddedColumns {
if _, has := tm.Exclude[column.ColumnName]; !has {
added = append(added, column)
}
}
schemaDeltas = append(schemaDeltas, delta)
if len(added) != 0 {
r.SchemaDeltas = append(r.SchemaDeltas, &protos.TableSchemaDelta{
SrcTableName: delta.SrcTableName,
DstTableName: delta.DstTableName,
AddedColumns: added,
})
}
} else {
r.SchemaDeltas = append(r.SchemaDeltas, delta)
}
return schemaDeltas
}

func (r *CDCRecordStream) Close() {
close(r.emptySignal)
close(r.records)
close(r.SchemaDeltas)
r.lastCheckpointSet = true
}

Expand Down

0 comments on commit 7d1d9a2

Please sign in to comment.