Skip to content

Commit

Permalink
CDC stream
Browse files Browse the repository at this point in the history
- this is not ready to land
- tests have been deleted, need to re-add them
  • Loading branch information
iskakaushik committed Oct 29, 2023
1 parent 38402a1 commit f55b3b8
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 1,360 deletions.
84 changes: 40 additions & 44 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down Expand Up @@ -196,37 +196,42 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()
recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,

errGroup, ctx := errgroup.WithContext(ctx)

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
})
})
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}
recordBatch := recordsWithTableSchemaDelta.RecordBatch

pullRecordWithCount := fmt.Sprintf("pulled %d records", len(recordBatch.Records))
activity.RecordHeartbeat(ctx, pullRecordWithCount)
hasRecords := recordBatch.WaitAndCheckEmpty()

if a.CatalogMirrorMonitor.IsActive() && len(recordBatch.Records) > 0 {
if a.CatalogMirrorMonitor.IsActive() && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
}

err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: uint32(len(recordBatch.Records)),
BatchID: syncBatchID + 1,
// TODO (kaushik) : after push we need to update the num records!
RowsInBatch: 0,
BatchStartLSN: pglogrepl.LSN(recordBatch.FirstCheckPointID),
BatchEndlSN: pglogrepl.LSN(recordBatch.LastCheckPointID),
StartTime: startTime,
Expand All @@ -236,24 +241,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
}

pullDuration := time.Since(startTime)
numRecords := len(recordBatch.Records)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pulled %d records in %d seconds\n", numRecords, int(pullDuration.Seconds()))
activity.RecordHeartbeat(ctx, fmt.Sprintf("pulled %d records", numRecords))

if numRecords == 0 {
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
metrics.LogSyncMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1)
metrics.LogNormalizeMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1, 0)
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDeltas: recordsWithTableSchemaDelta.TableSchemaDeltas,
}, nil
if !hasRecords {
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
return syncResponse, nil
}

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
Expand All @@ -279,6 +272,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to push records: %w", err)
}

err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
Expand All @@ -300,15 +299,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordsWithTableSchemaDelta.TableSchemaDeltas
res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)

metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName,
float64(numRecords)/(pullDuration.Seconds()+syncDuration.Seconds()))

return res, nil
}

Expand Down
28 changes: 9 additions & 19 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -432,17 +431,9 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
if len(req.Records.Records) == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

rawTableName := c.getRawTableName(req.FlowJobName)

log.Printf("pushing %d records to %s.%s", len(req.Records.Records), c.datasetID, rawTableName)
log.Printf("pushing records to %s.%s...", c.datasetID, rawTableName)

// generate a sequential number for the last synced batch
// this sequence will be used to keep track of records that are normalized
Expand Down Expand Up @@ -488,7 +479,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
var firstCP int64 = 0
lastCP := req.Records.LastCheckPointID
// loop over req.Records
for _, record := range req.Records.Records {
for record := range req.Records.GetRecords() {
switch r := record.(type) {
case *model.InsertRecord:
// create the 3 required fields
Expand Down Expand Up @@ -629,13 +620,11 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
stmts = append(stmts, appendStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
startTime := time.Now()
_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime))
log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
Expand All @@ -647,13 +636,16 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}, nil
}

func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
rawTableName string, syncBatchID int64) (*model.SyncResponse, error) {
func (c *BigQueryConnector) syncRecordsViaAvro(
req *model.SyncRecordsRequest,
rawTableName string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
lastCP := req.Records.LastCheckPointID
recordStream := model.NewQRecordStream(len(req.Records.Records))
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
{
Expand Down Expand Up @@ -713,7 +705,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}

// loop over req.Records
for _, record := range req.Records.Records {
for record := range req.Records.GetRecords() {
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:
Expand Down Expand Up @@ -843,7 +835,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
}

startTime := time.Now()
close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
Expand All @@ -857,7 +848,6 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
return nil, fmt.Errorf("failed to sync records via avro : %v", err)
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime))
log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -150,13 +149,10 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
stmts = append(stmts, insertMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
// Execute the statements in a transaction
syncRecordsStartTime := time.Now()
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName,
int64(numRecords), time.Since(syncRecordsStartTime))

// drop the staging table
if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/bigquery/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -106,8 +105,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to insert records into staging table: %v", err)
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, int64(len(valueSaverRecords)),
time.Since(startTime))

// Copy the records into the destination table in a transaction.
// append all the statements to one list
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

log "github.com/sirupsen/logrus"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
Expand Down Expand Up @@ -37,7 +38,7 @@ type CDCPullConnector interface {

// PullRecords pulls records from the source, and returns a RecordBatch.
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(req *model.PullRecordsRequest) (*model.RecordsWithTableSchemaDelta, error)
PullRecords(req *model.PullRecordsRequest) error

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error
Expand Down
Loading

0 comments on commit f55b3b8

Please sign in to comment.