Skip to content

Commit

Permalink
Merge branch 'main' into more-lints
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 23, 2024
2 parents 3acf8bf + 7042307 commit aed4060
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 15 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}

return &model.SyncResponse{
RelationMessageMapping: <-recordBatch.RelationMessageMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: input.RelationMessageMapping,
}, nil
}

Expand All @@ -299,6 +299,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
res.RelationMessageMapping = input.RelationMessageMapping
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
NumRecordsSynced: rowsSynced,
TableNameRowsMapping: make(map[string]uint32),
TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings),
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ func (p *PostgresCDCSource) consumeStream(
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
}
records.RelationMessageMapping <- p.relationMessageMapping
p.logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len()))
err := cdcRecordsStorage.Close()
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func (c *S3Connector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncRes
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.WaitForSchemaDeltas(req.TableMappings),
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
1 change: 0 additions & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,6 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: tableSchemaDeltas,
RelationMessageMapping: <-req.Records.RelationMessageMapping,
}, nil
}

Expand Down
12 changes: 4 additions & 8 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,6 @@ type CDCRecordStream struct {
records chan Record
// Schema changes from the slot
SchemaDeltas chan *protos.TableSchemaDelta
// Relation message mapping
RelationMessageMapping chan RelationMessageMapping
// Indicates if the last checkpoint has been set.
lastCheckpointSet bool
// lastCheckpointID is the last ID of the commit that corresponds to this batch.
Expand All @@ -440,11 +438,10 @@ func NewCDCRecordStream() *CDCRecordStream {
return &CDCRecordStream{
records: make(chan Record, channelBuffer),
// TODO (kaushik): more than 1024 schema deltas can cause problems!
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
RelationMessageMapping: make(chan RelationMessageMapping, 1),
lastCheckpointSet: false,
lastCheckpointID: atomic.Int64{},
SchemaDeltas: make(chan *protos.TableSchemaDelta, 1<<10),
emptySignal: make(chan bool, 1),
lastCheckpointSet: false,
lastCheckpointID: atomic.Int64{},
}
}

Expand Down Expand Up @@ -515,7 +512,6 @@ func (r *CDCRecordStream) Close() {
close(r.emptySignal)
close(r.records)
close(r.SchemaDeltas)
close(r.RelationMessageMapping)
r.lastCheckpointSet = true
}

Expand Down

0 comments on commit aed4060

Please sign in to comment.