Skip to content

Commit

Permalink
CDCRecordStream: remove RelationMessageMapping (#1129)
Browse files Browse the repository at this point in the history
RelationMessageMapping of input is mutated,
thus it can be added to sync response from input at end
  • Loading branch information
serprex authored Jan 23, 2024
1 parent ddca548 commit 7042307
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 15 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

return &model.SyncResponse{
RelationMessageMapping: <-recordBatch.RelationMessageMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: input.RelationMessageMapping,
}, nil
}

Expand All @@ -298,6 +298,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
res.RelationMessageMapping = input.RelationMessageMapping
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings),
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
}
records.RelationMessageMapping <- p.relationMessageMapping
p.logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len()))
err := cdcRecordsStorage.Close()
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings),
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,6 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
12 changes: 4 additions & 8 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,6 @@ type CDCRecordStream struct {
records chan Record
// Schema changes from the slot
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan RelationMessageMapping
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckPointID is the last ID of the commit that corresponds to this batch.
Expand All @@ -440,11 +438,10 @@ func NewCDCRecordStream() *CDCRecordStream {
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
RelationMessageMapping: make(chan RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckPointID: atomic.Int64{},
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
lastCheckpointSet: false,
lastCheckPointID: atomic.Int64{},
}
}

Expand Down Expand Up @@ -515,7 +512,6 @@ func (r *CDCRecordStream) Close() {
close(r.emptySignal)
close(r.records)
close(r.SchemaDeltas)
close(r.RelationMessageMapping)
r.lastCheckpointSet = true
}

Expand Down

0 comments on commit 7042307

Please sign in to comment.