Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/cdc-stream' into customer-mirage
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Oct 30, 2023
2 parents 62069c8 + ecea49b commit 89c4f7a
Show file tree
Hide file tree
Showing 20 changed files with 524 additions and 1,408 deletions.
119 changes: 69 additions & 50 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down Expand Up @@ -165,11 +165,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

srcConn, err := connectors.GetCDCPullConnector(ctx, conn.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand All @@ -196,28 +191,40 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()
recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
})

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
recordBatch := recordsWithTableSchemaDelta.RecordBatch
defer connectors.CloseConnector(srcConn)

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
})
})

pullRecordWithCount := fmt.Sprintf("pulled %d records", len(recordBatch.Records))
activity.RecordHeartbeat(ctx, pullRecordWithCount)
hasRecords := !recordBatch.WaitAndCheckEmpty()
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("the current sync flow has records: %v", hasRecords)

if a.CatalogMirrorMonitor.IsActive() && len(recordBatch.Records) > 0 {
if a.CatalogMirrorMonitor.IsActive() && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
Expand All @@ -226,34 +233,28 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: uint32(len(recordBatch.Records)),
BatchStartLSN: pglogrepl.LSN(recordBatch.FirstCheckPointID),
BatchEndlSN: pglogrepl.LSN(recordBatch.LastCheckPointID),
RowsInBatch: 0,
BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()),
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
return nil, err
}
}

pullDuration := time.Since(startTime)
numRecords := len(recordBatch.Records)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pulled %d records in %d seconds\n", numRecords, int(pullDuration.Seconds()))
activity.RecordHeartbeat(ctx, fmt.Sprintf("pulled %d records", numRecords))
if !hasRecords {
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

if numRecords == 0 {
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
metrics.LogSyncMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1)
metrics.LogNormalizeMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1, 0)
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDeltas: recordsWithTableSchemaDelta.TableSchemaDeltas,
}, nil
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
return syncResponse, nil
}

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
Expand All @@ -279,14 +280,35 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to push records: %w", err)
}

err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds()))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
pglogrepl.LSN(lastCheckpoint),
)
if err != nil {
return nil, err
}

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(recordBatch.LastCheckPointID))
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint))
if err != nil {
return nil, err
}
Expand All @@ -300,15 +322,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordsWithTableSchemaDelta.TableSchemaDeltas
res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)

metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName,
float64(numRecords)/(pullDuration.Seconds()+syncDuration.Seconds()))

return res, nil
}

Expand Down
50 changes: 20 additions & 30 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -432,17 +431,9 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
if len(req.Records.Records) == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

rawTableName := c.getRawTableName(req.FlowJobName)

log.Printf("pushing %d records to %s.%s", len(req.Records.Records), c.datasetID, rawTableName)
log.Printf("pushing records to %s.%s...", c.datasetID, rawTableName)

// generate a sequential number for the last synced batch
// this sequence will be used to keep track of records that are normalized
Expand Down Expand Up @@ -486,9 +477,9 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
lastCP := req.Records.LastCheckPointID

// loop over req.Records
for _, record := range req.Records.Records {
for record := range req.Records.GetRecords() {
switch r := record.(type) {
case *model.InsertRecord:
// create the 3 required fields
Expand Down Expand Up @@ -586,14 +577,6 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}

numRecords := len(records)
if numRecords == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

// insert the records into the staging table
stagingInserter := stagingTable.Inserter()
stagingInserter.IgnoreUnknownValues = true
Expand All @@ -613,6 +596,11 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
Expand All @@ -629,13 +617,11 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
stmts = append(stmts, appendStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
startTime := time.Now()
_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime))
log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
Expand All @@ -647,13 +633,15 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}, nil
}

func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
rawTableName string, syncBatchID int64) (*model.SyncResponse, error) {
func (c *BigQueryConnector) syncRecordsViaAvro(
req *model.SyncRecordsRequest,
rawTableName string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
lastCP := req.Records.LastCheckPointID
recordStream := model.NewQRecordStream(len(req.Records.Records))
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
{
Expand Down Expand Up @@ -713,7 +701,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}

// loop over req.Records
for _, record := range req.Records.Records {
for record := range req.Records.GetRecords() {
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:
Expand Down Expand Up @@ -843,21 +831,23 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
}

startTime := time.Now()
close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %v", err)
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

numRecords, err := avroSync.SyncRecords(rawTableName, req.FlowJobName,
lastCP, rawTableMetadata, syncBatchID, recordStream)
if err != nil {
return nil, fmt.Errorf("failed to sync records via avro : %v", err)
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime))
log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -150,13 +149,10 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
stmts = append(stmts, insertMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
// Execute the statements in a transaction
syncRecordsStartTime := time.Now()
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName,
int64(numRecords), time.Since(syncRecordsStartTime))

// drop the staging table
if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/bigquery/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -106,8 +105,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to insert records into staging table: %v", err)
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, int64(len(valueSaverRecords)),
time.Since(startTime))

// Copy the records into the destination table in a transaction.
// append all the statements to one list
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

log "github.com/sirupsen/logrus"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
Expand Down Expand Up @@ -37,7 +38,7 @@ type CDCPullConnector interface {

// PullRecords pulls records from the source, and returns a RecordBatch.
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(req *model.PullRecordsRequest) (*model.RecordsWithTableSchemaDelta, error)
PullRecords(req *model.PullRecordsRequest) error

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error
Expand Down
Loading

0 comments on commit 89c4f7a

Please sign in to comment.