Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC stream #590

Merged
merged 7 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pglogrepl"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/activity"
"golang.org/x/sync/errgroup"
)

// CheckConnectionResult is the result of a CheckConnection call.
Expand Down Expand Up @@ -165,11 +165,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

srcConn, err := connectors.GetCDCPullConnector(ctx, conn.Source)
if err != nil {
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(srcConn)
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand All @@ -196,28 +191,40 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)

recordBatch := model.NewCDCRecordStream()

startTime := time.Now()
recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
})

errGroup, errCtx := errgroup.WithContext(ctx)
srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source)
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
return nil, fmt.Errorf("failed to get source connector: %w", err)
}
recordBatch := recordsWithTableSchemaDelta.RecordBatch
defer connectors.CloseConnector(srcConn)

// start a goroutine to pull records from the source
errGroup.Go(func() error {
return srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
RecordStream: recordBatch,
})
})

pullRecordWithCount := fmt.Sprintf("pulled %d records", len(recordBatch.Records))
activity.RecordHeartbeat(ctx, pullRecordWithCount)
hasRecords := !recordBatch.WaitAndCheckEmpty()
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("the current sync flow has records: %v", hasRecords)

if a.CatalogMirrorMonitor.IsActive() && len(recordBatch.Records) > 0 {
if a.CatalogMirrorMonitor.IsActive() && hasRecords {
syncBatchID, err := dstConn.GetLastSyncBatchID(input.FlowConnectionConfigs.FlowJobName)
if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB {
return nil, err
Expand All @@ -226,34 +233,28 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
err = a.CatalogMirrorMonitor.AddCDCBatchForFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
monitoring.CDCBatchInfo{
BatchID: syncBatchID + 1,
RowsInBatch: uint32(len(recordBatch.Records)),
BatchStartLSN: pglogrepl.LSN(recordBatch.FirstCheckPointID),
BatchEndlSN: pglogrepl.LSN(recordBatch.LastCheckPointID),
RowsInBatch: 0,
BatchStartLSN: pglogrepl.LSN(recordBatch.GetFirstCheckpoint()),
BatchEndlSN: 0,
StartTime: startTime,
})
if err != nil {
return nil, err
}
}

pullDuration := time.Since(startTime)
numRecords := len(recordBatch.Records)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pulled %d records in %d seconds\n", numRecords, int(pullDuration.Seconds()))
activity.RecordHeartbeat(ctx, fmt.Sprintf("pulled %d records", numRecords))
if !hasRecords {
// wait for the pull goroutine to finish
err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

if numRecords == 0 {
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Info("no records to push")
metrics.LogSyncMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1)
metrics.LogNormalizeMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0, 1, 0)
metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName, 0)
return &model.SyncResponse{
RelationMessageMapping: recordsWithTableSchemaDelta.RelationMessageMapping,
TableSchemaDeltas: recordsWithTableSchemaDelta.TableSchemaDeltas,
}, nil
log.WithFields(log.Fields{"flowName": input.FlowConnectionConfigs.FlowJobName}).Info("no records to push")
syncResponse := &model.SyncResponse{}
syncResponse.RelationMessageMapping = <-recordBatch.RelationMessageMapping
syncResponse.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
return syncResponse, nil
}

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
Expand All @@ -279,14 +280,35 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
return nil, fmt.Errorf("failed to push records: %w", err)
}

err = errGroup.Wait()
if err != nil {
return nil, fmt.Errorf("failed to pull records: %w", err)
}

numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)
log.WithFields(log.Fields{
"flowName": input.FlowConnectionConfigs.FlowJobName,
}).Infof("pushed %d records in %d seconds\n", numRecords, int(syncDuration.Seconds()))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %w", err)
}

err = a.CatalogMirrorMonitor.UpdateNumRowsAndEndLSNForCDCBatch(
ctx,
input.FlowConnectionConfigs.FlowJobName,
res.CurrentSyncBatchID,
uint32(numRecords),
pglogrepl.LSN(lastCheckpoint),
)
if err != nil {
return nil, err
}

err = a.CatalogMirrorMonitor.
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName,
pglogrepl.LSN(recordBatch.LastCheckPointID))
UpdateLatestLSNAtTargetForCDCFlow(ctx, input.FlowConnectionConfigs.FlowJobName, pglogrepl.LSN(lastCheckpoint))
if err != nil {
return nil, err
}
Expand All @@ -300,15 +322,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
if err != nil {
return nil, err
}
res.TableSchemaDeltas = recordsWithTableSchemaDelta.TableSchemaDeltas
res.RelationMessageMapping = recordsWithTableSchemaDelta.RelationMessageMapping
res.TableSchemaDeltas = recordBatch.WaitForSchemaDeltas()
res.RelationMessageMapping = <-recordBatch.RelationMessageMapping

pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords)
activity.RecordHeartbeat(ctx, pushedRecordsWithCount)

metrics.LogCDCRawThroughputMetrics(ctx, input.FlowConnectionConfigs.FlowJobName,
float64(numRecords)/(pullDuration.Seconds()+syncDuration.Seconds()))

return res, nil
}

Expand Down
49 changes: 20 additions & 29 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -432,17 +431,9 @@ func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
// currently only supports inserts,updates and deletes
// more record types will be added in the future.
func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
if len(req.Records.Records) == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

rawTableName := c.getRawTableName(req.FlowJobName)

log.Printf("pushing %d records to %s.%s", len(req.Records.Records), c.datasetID, rawTableName)
log.Printf("pushing records to %s.%s...", c.datasetID, rawTableName)

// generate a sequential number for the last synced batch
// this sequence will be used to keep track of records that are normalized
Expand Down Expand Up @@ -486,9 +477,9 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
lastCP := req.Records.LastCheckPointID

// loop over req.Records
for _, record := range req.Records.Records {
for record := range req.Records.GetRecords() {
switch r := record.(type) {
case *model.InsertRecord:
// create the 3 required fields
Expand Down Expand Up @@ -586,14 +577,6 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}

numRecords := len(records)
if numRecords == 0 {
return &model.SyncResponse{
FirstSyncedCheckPointID: 0,
LastSyncedCheckPointID: 0,
NumRecordsSynced: 0,
}, nil
}

// insert the records into the staging table
stagingInserter := stagingTable.Inserter()
stagingInserter.IgnoreUnknownValues = true
Expand All @@ -613,6 +596,11 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
Expand All @@ -629,13 +617,11 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
stmts = append(stmts, appendStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
startTime := time.Now()
_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime))
log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
Expand All @@ -647,13 +633,15 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
}, nil
}

func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
rawTableName string, syncBatchID int64) (*model.SyncResponse, error) {
func (c *BigQueryConnector) syncRecordsViaAvro(
req *model.SyncRecordsRequest,
rawTableName string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
lastCP := req.Records.LastCheckPointID
recordStream := model.NewQRecordStream(len(req.Records.Records))
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
{
Expand Down Expand Up @@ -713,7 +701,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}

// loop over req.Records
for _, record := range req.Records.Records {
for record := range req.Records.GetRecords() {
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:
Expand Down Expand Up @@ -843,21 +831,24 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest,
}
}

startTime := time.Now()
close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %v", err)
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

numRecords, err := avroSync.SyncRecords(rawTableName, req.FlowJobName,
lastCP, rawTableMetadata, syncBatchID, recordStream)
if err != nil {
return nil, fmt.Errorf("failed to sync records via avro : %v", err)
}

metrics.LogSyncMetrics(c.ctx, req.FlowJobName, int64(numRecords), time.Since(startTime))
log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -150,13 +149,10 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
stmts = append(stmts, insertMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
// Execute the statements in a transaction
syncRecordsStartTime := time.Now()
_, err = bqClient.Query(strings.Join(stmts, "\n")).Read(s.connector.ctx)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName,
int64(numRecords), time.Since(syncRecordsStartTime))

// drop the staging table
if err := bqClient.Dataset(datasetID).Table(stagingTable).Delete(s.connector.ctx); err != nil {
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/bigquery/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"cloud.google.com/go/bigquery"
"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -106,8 +105,6 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to insert records into staging table: %v", err)
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, int64(len(valueSaverRecords)),
time.Since(startTime))

// Copy the records into the destination table in a transaction.
// append all the statements to one list
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

log "github.com/sirupsen/logrus"

connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
Expand Down Expand Up @@ -37,7 +38,7 @@ type CDCPullConnector interface {

// PullRecords pulls records from the source, and returns a RecordBatch.
// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(req *model.PullRecordsRequest) (*model.RecordsWithTableSchemaDelta, error)
PullRecords(req *model.PullRecordsRequest) error

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error
Expand Down
Loading