Skip to content

Commit

Permalink
Merge branch 'main' into eliminate-qrep-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 23, 2023
2 parents fab9a74 + b1be612 commit d01be4c
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 23 deletions.
4 changes: 3 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,16 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier
}

idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10)

startTime := time.Now()
recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastSyncState: input.LastSyncState,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: 10 * time.Second,
IdleTimeout: time.Duration(idleTimeout) * time.Second,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (p *PostgresCDCSource) consumeStream(
consumedXLogPos = clientXLogPos - 1
}

var standByLastLogged time.Time

for {
if time.Now().After(nextStandbyMessageDeadline) ||
(len(records.Records) >= int(req.MaxBatchSize)) {
Expand All @@ -167,7 +169,12 @@ func (p *PostgresCDCSource) consumeStream(

numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records))
utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage)
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)

if time.Since(standByLastLogged) > 10*time.Second {
log.Infof("Sent Standby status message. %s", numRowsProcessedMessage)
standByLastLogged = time.Now()
}

nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)

if !p.commitLock && (len(records.Records) >= int(req.MaxBatchSize)) {
Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ func (c *PostgresConnector) SyncQRepRecords(
switch syncMode {
case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT:
stagingTableSync := &QRepStagingTableSync{connector: c}
return stagingTableSync.SyncQRepRecords(config.FlowJobName, dstTable, partition, stream)
return stagingTableSync.SyncQRepRecords(
config.FlowJobName, dstTable, partition, stream, config.WriteMode)
case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO:
return 0, fmt.Errorf("[postgres] SyncQRepRecords not implemented for storage avro sync mode")
default:
Expand Down
104 changes: 94 additions & 10 deletions flow/connectors/postgres/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package connpostgres
import (
"context"
"fmt"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils/metrics"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
util "github.com/PeerDB-io/peer-flow/utils"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protojson"
Expand All @@ -31,6 +33,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
dstTableName *SchemaTable,
partition *protos.QRepPartition,
stream *model.QRecordStream,
writeMode *protos.QRepWriteMode,
) (int, error) {
partitionID := partition.PartitionId
startTime := time.Now()
Expand Down Expand Up @@ -66,19 +69,100 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
// Step 2: Insert records into the destination table.
copySource := model.NewQRecordBatchCopyFromSource(stream)

// Perform the COPY FROM operation
syncRecordsStartTime := time.Now()
syncedRows, err := tx.CopyFrom(
context.Background(),
pgx.Identifier{dstTableName.Schema, dstTableName.Table},
schema.GetColumnNames(),
copySource,
)
var numRowsSynced int64

if err != nil {
return -1, fmt.Errorf("failed to copy records into destination table: %v", err)
if writeMode == nil ||
writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_APPEND {
// Perform the COPY FROM operation
numRowsSynced, err = tx.CopyFrom(
context.Background(),
pgx.Identifier{dstTableName.Schema, dstTableName.Table},
schema.GetColumnNames(),
copySource,
)
if err != nil {
return -1, fmt.Errorf("failed to copy records into destination table: %v", err)
}
} else {
// Step 2.1: Create a temp staging table
stagingTableName := fmt.Sprintf("_peerdb_staging_%s", util.RandomString(8))
stagingTableIdentifier := pgx.Identifier{dstTableName.Schema, stagingTableName}
dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table}

createStagingTableStmt := fmt.Sprintf(
"CREATE UNLOGGED TABLE %s (LIKE %s);",
stagingTableIdentifier.Sanitize(),
dstTableIdentifier.Sanitize(),
)

log.Infof("Creating staging table %s - '%s'", stagingTableName, createStagingTableStmt)
_, err = tx.Exec(context.Background(), createStagingTableStmt)

if err != nil {
return -1, fmt.Errorf("failed to create staging table: %v", err)
}

// Step 2.2: Insert records into the staging table
numRowsSynced, err = tx.CopyFrom(
context.Background(),
stagingTableIdentifier,
schema.GetColumnNames(),
copySource,
)
if err != nil {
return -1, fmt.Errorf("failed to copy records into staging table: %v", err)
}

// construct the SET clause for the upsert operation
upsertMatchColsList := writeMode.UpsertKeyColumns
upsertMatchCols := make(map[string]bool)
for _, col := range upsertMatchColsList {
upsertMatchCols[col] = true
}

setClause := ""
for _, col := range schema.GetColumnNames() {
_, ok := upsertMatchCols[col]
if !ok {
setClause += fmt.Sprintf("%s = EXCLUDED.%s,", col, col)
}
}

setClause = strings.TrimSuffix(setClause, ",")
selectStr := strings.Join(schema.GetColumnNames(), ", ")

// Step 2.3: Perform the upsert operation, ON CONFLICT UPDATE
upsertStmt := fmt.Sprintf(
"INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (%s) DO UPDATE SET %s;",
dstTableIdentifier.Sanitize(),
selectStr,
selectStr,
stagingTableIdentifier.Sanitize(),
strings.Join(writeMode.UpsertKeyColumns, ", "),
setClause,
)
log.Infof("Performing upsert operation: %s", upsertStmt)
res, err := tx.Exec(context.Background(), upsertStmt)
if err != nil {
return -1, fmt.Errorf("failed to perform upsert operation: %v", err)
}

numRowsSynced = res.RowsAffected()

// Step 2.4: Drop the staging table
dropStagingTableStmt := fmt.Sprintf(
"DROP TABLE %s;",
stagingTableIdentifier.Sanitize(),
)
log.Infof("Dropping staging table %s", stagingTableName)
_, err = tx.Exec(context.Background(), dropStagingTableStmt)
if err != nil {
return -1, fmt.Errorf("failed to drop staging table: %v", err)
}
}
metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, syncedRows, time.Since(syncRecordsStartTime))

metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, numRowsSynced, time.Since(syncRecordsStartTime))

// marshal the partition to json using protojson
pbytes, err := protojson.Marshal(partition)
Expand Down
17 changes: 9 additions & 8 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,14 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
}
syncBatchID = syncBatchID + 1

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
return nil, err
}
}

// transaction for SyncRecords
syncRecordsTx, err := c.database.BeginTx(c.ctx, nil)
if err != nil {
Expand All @@ -510,13 +518,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.
}
}()

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID)
if err != nil {
return nil, err
}
} else if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
res, err = c.syncRecordsViaSQL(req, rawTableIdentifier, syncBatchID, syncRecordsTx)
if err != nil {
return nil, err
Expand All @@ -539,7 +541,6 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.

func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, rawTableIdentifier string,
syncBatchID int64, syncRecordsTx *sql.Tx) (*model.SyncResponse, error) {

records := make([]snowflakeRawRecord, 0)
tableNameRowsMapping := make(map[string]uint32)

Expand Down
17 changes: 17 additions & 0 deletions flow/connectors/utils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,20 @@ func GetEnvBool(name string, defaultValue bool) bool {

return b
}

// GetEnvInt returns the value of the environment variable with the given name
// or defaultValue if the environment variable is not set or is not a valid
// integer value.
func GetEnvInt(name string, defaultValue int) int {
val, ok := GetEnv(name)
if !ok {
return defaultValue
}

i, err := strconv.Atoi(val)
if err != nil {
return defaultValue
}

return i
}
10 changes: 10 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ func (s *CDCFlowState) TruncateProgress() {
if len(s.NormalizeFlowStatuses) > 10 {
s.NormalizeFlowStatuses = s.NormalizeFlowStatuses[len(s.NormalizeFlowStatuses)-10:]
}

if s.SyncFlowErrors != nil {
fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors)
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
s.NormalizeFlowErrors = nil
}
}

func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/workflows/sync_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func (s *SyncFlowExecution) executeSyncFlow(
}

startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
StartToCloseTimeout: 72 * time.Hour,
// TODO: activity needs to call heartbeat.
// see https://github.com/PeerDB-io/nexus/issues/216
HeartbeatTimeout: 30 * time.Second,
HeartbeatTimeout: 24 * time.Hour,
})

// execute StartFlow on the peers to start the flow
Expand Down

0 comments on commit d01be4c

Please sign in to comment.