diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 832274fd7f..3342e2d008 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -193,6 +193,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, tblNameMapping[v.SourceTableIdentifier] = v.DestinationTableIdentifier } + idleTimeout := utils.GetEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 10) + startTime := time.Now() recordsWithTableSchemaDelta, err := srcConn.PullRecords(&model.PullRecordsRequest{ FlowJobName: input.FlowConnectionConfigs.FlowJobName, @@ -200,7 +202,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastSyncState: input.LastSyncState, MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize), - IdleTimeout: 10 * time.Second, + IdleTimeout: time.Duration(idleTimeout) * time.Second, TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 9bfff285ae..142f353146 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -154,6 +154,8 @@ func (p *PostgresCDCSource) consumeStream( consumedXLogPos = clientXLogPos - 1 } + var standByLastLogged time.Time + for { if time.Now().After(nextStandbyMessageDeadline) || (len(records.Records) >= int(req.MaxBatchSize)) { @@ -167,7 +169,12 @@ func (p *PostgresCDCSource) consumeStream( numRowsProcessedMessage := fmt.Sprintf("processed %d rows", len(records.Records)) utils.RecordHeartbeatWithRecover(p.ctx, numRowsProcessedMessage) - log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) + + if time.Since(standByLastLogged) > 10*time.Second { + log.Infof("Sent Standby status message. %s", numRowsProcessedMessage) + standByLastLogged = time.Now() + } + nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) if !p.commitLock && (len(records.Records) >= int(req.MaxBatchSize)) { diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index e240d2b7d6..485f1bbf62 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -494,7 +494,8 @@ func (c *PostgresConnector) SyncQRepRecords( switch syncMode { case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: stagingTableSync := &QRepStagingTableSync{connector: c} - return stagingTableSync.SyncQRepRecords(config.FlowJobName, dstTable, partition, stream) + return stagingTableSync.SyncQRepRecords( + config.FlowJobName, dstTable, partition, stream, config.WriteMode) case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: return 0, fmt.Errorf("[postgres] SyncQRepRecords not implemented for storage avro sync mode") default: diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index 3f6047d9f6..09264cbaf3 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -3,11 +3,13 @@ package connpostgres import ( "context" "fmt" + "strings" "time" "github.com/PeerDB-io/peer-flow/connectors/utils/metrics" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + util "github.com/PeerDB-io/peer-flow/utils" "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/encoding/protojson" @@ -31,6 +33,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( dstTableName *SchemaTable, partition *protos.QRepPartition, stream *model.QRecordStream, + writeMode *protos.QRepWriteMode, ) (int, error) { partitionID := partition.PartitionId startTime := time.Now() @@ -66,19 +69,100 @@ func (s *QRepStagingTableSync) SyncQRepRecords( // Step 2: Insert records into the destination table. copySource := model.NewQRecordBatchCopyFromSource(stream) - // Perform the COPY FROM operation syncRecordsStartTime := time.Now() - syncedRows, err := tx.CopyFrom( - context.Background(), - pgx.Identifier{dstTableName.Schema, dstTableName.Table}, - schema.GetColumnNames(), - copySource, - ) + var numRowsSynced int64 - if err != nil { - return -1, fmt.Errorf("failed to copy records into destination table: %v", err) + if writeMode == nil || + writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_APPEND { + // Perform the COPY FROM operation + numRowsSynced, err = tx.CopyFrom( + context.Background(), + pgx.Identifier{dstTableName.Schema, dstTableName.Table}, + schema.GetColumnNames(), + copySource, + ) + if err != nil { + return -1, fmt.Errorf("failed to copy records into destination table: %v", err) + } + } else { + // Step 2.1: Create a temp staging table + stagingTableName := fmt.Sprintf("_peerdb_staging_%s", util.RandomString(8)) + stagingTableIdentifier := pgx.Identifier{dstTableName.Schema, stagingTableName} + dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} + + createStagingTableStmt := fmt.Sprintf( + "CREATE UNLOGGED TABLE %s (LIKE %s);", + stagingTableIdentifier.Sanitize(), + dstTableIdentifier.Sanitize(), + ) + + log.Infof("Creating staging table %s - '%s'", stagingTableName, createStagingTableStmt) + _, err = tx.Exec(context.Background(), createStagingTableStmt) + + if err != nil { + return -1, fmt.Errorf("failed to create staging table: %v", err) + } + + // Step 2.2: Insert records into the staging table + numRowsSynced, err = tx.CopyFrom( + context.Background(), + stagingTableIdentifier, + schema.GetColumnNames(), + copySource, + ) + if err != nil { + return -1, fmt.Errorf("failed to copy records into staging table: %v", err) + } + + // construct the SET clause for the upsert operation + upsertMatchColsList := writeMode.UpsertKeyColumns + upsertMatchCols := make(map[string]bool) + for _, col := range upsertMatchColsList { + upsertMatchCols[col] = true + } + + setClause := "" + for _, col := range schema.GetColumnNames() { + _, ok := upsertMatchCols[col] + if !ok { + setClause += fmt.Sprintf("%s = EXCLUDED.%s,", col, col) + } + } + + setClause = strings.TrimSuffix(setClause, ",") + selectStr := strings.Join(schema.GetColumnNames(), ", ") + + // Step 2.3: Perform the upsert operation, ON CONFLICT UPDATE + upsertStmt := fmt.Sprintf( + "INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (%s) DO UPDATE SET %s;", + dstTableIdentifier.Sanitize(), + selectStr, + selectStr, + stagingTableIdentifier.Sanitize(), + strings.Join(writeMode.UpsertKeyColumns, ", "), + setClause, + ) + log.Infof("Performing upsert operation: %s", upsertStmt) + res, err := tx.Exec(context.Background(), upsertStmt) + if err != nil { + return -1, fmt.Errorf("failed to perform upsert operation: %v", err) + } + + numRowsSynced = res.RowsAffected() + + // Step 2.4: Drop the staging table + dropStagingTableStmt := fmt.Sprintf( + "DROP TABLE %s;", + stagingTableIdentifier.Sanitize(), + ) + log.Infof("Dropping staging table %s", stagingTableName) + _, err = tx.Exec(context.Background(), dropStagingTableStmt) + if err != nil { + return -1, fmt.Errorf("failed to drop staging table: %v", err) + } } - metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, syncedRows, time.Since(syncRecordsStartTime)) + + metrics.LogQRepSyncMetrics(s.connector.ctx, flowJobName, numRowsSynced, time.Since(syncRecordsStartTime)) // marshal the partition to json using protojson pbytes, err := protojson.Marshal(partition) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 6a05bf9832..6658b9e5cb 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -494,6 +494,14 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. } syncBatchID = syncBatchID + 1 + var res *model.SyncResponse + if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { + res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) + if err != nil { + return nil, err + } + } + // transaction for SyncRecords syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { @@ -510,13 +518,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. } }() - var res *model.SyncResponse - if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { - res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) - if err != nil { - return nil, err - } - } else if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT { + if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT { res, err = c.syncRecordsViaSQL(req, rawTableIdentifier, syncBatchID, syncRecordsTx) if err != nil { return nil, err @@ -539,7 +541,6 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, rawTableIdentifier string, syncBatchID int64, syncRecordsTx *sql.Tx) (*model.SyncResponse, error) { - records := make([]snowflakeRawRecord, 0) tableNameRowsMapping := make(map[string]uint32) diff --git a/flow/connectors/utils/env.go b/flow/connectors/utils/env.go index d3c1acfff5..2911e3d8ef 100644 --- a/flow/connectors/utils/env.go +++ b/flow/connectors/utils/env.go @@ -28,3 +28,20 @@ func GetEnvBool(name string, defaultValue bool) bool { return b } + +// GetEnvInt returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set or is not a valid +// integer value. +func GetEnvInt(name string, defaultValue int) int { + val, ok := GetEnv(name) + if !ok { + return defaultValue + } + + i, err := strconv.Atoi(val) + if err != nil { + return defaultValue + } + + return i +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 8029a10408..064bb4c1b6 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -102,6 +102,16 @@ func (s *CDCFlowState) TruncateProgress() { if len(s.NormalizeFlowStatuses) > 10 { s.NormalizeFlowStatuses = s.NormalizeFlowStatuses[len(s.NormalizeFlowStatuses)-10:] } + + if s.SyncFlowErrors != nil { + fmt.Println("SyncFlowErrors: ", s.SyncFlowErrors) + s.SyncFlowErrors = nil + } + + if s.NormalizeFlowErrors != nil { + fmt.Println("NormalizeFlowErrors: ", s.NormalizeFlowErrors) + s.NormalizeFlowErrors = nil + } } func (s *CDCFlowState) SendWALHeartbeat(ctx workflow.Context, cfg *protos.FlowConnectionConfigs) error { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index d63c57d1ed..3969067664 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -82,10 +82,10 @@ func (s *SyncFlowExecution) executeSyncFlow( } startFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 24 * time.Hour, + StartToCloseTimeout: 72 * time.Hour, // TODO: activity needs to call heartbeat. // see https://github.com/PeerDB-io/nexus/issues/216 - HeartbeatTimeout: 30 * time.Second, + HeartbeatTimeout: 24 * time.Hour, }) // execute StartFlow on the peers to start the flow