From 07ee601e86be9fc79701e207f53f68212b070e7b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Mon, 18 Dec 2023 21:46:32 +0530 Subject: [PATCH 1/6] qrep for pg->pg append --- flow/connectors/postgres/qrep.go | 3 ++- flow/connectors/postgres/qrep_sync_method.go | 27 ++++++++++++++++---- flow/workflows/qrep_flow.go | 1 + flow/workflows/snapshot_flow.go | 1 + 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index be8daa903d..ce114b702b 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -471,7 +471,8 @@ func (c *PostgresConnector) SyncQRepRecords( stagingTableSync := &QRepStagingTableSync{connector: c} return stagingTableSync.SyncQRepRecords( - config.FlowJobName, dstTable, partition, stream, config.WriteMode) + config.FlowJobName, dstTable, partition, stream, + config.WriteMode, config.SyncedAtColName) } // SetupQRepMetadataTables function for postgres connector diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index a54769e3d8..d11f917e05 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -35,6 +35,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( partition *protos.QRepPartition, stream *model.QRecordStream, writeMode *protos.QRepWriteMode, + syncedAtCol string, ) (int, error) { syncLog := slog.Group("sync-qrep-log", slog.String(string(shared.FlowNameKey), flowJobName), @@ -81,6 +82,19 @@ func (s *QRepStagingTableSync) SyncQRepRecords( if err != nil { return -1, fmt.Errorf("failed to copy records into destination table: %v", err) } + + if syncedAtCol != "" { + updateSyncedAtStmt := fmt.Sprintf( + `UPDATE %s SET "%s" = CURRENT_TIMESTAMP WHERE "%s" IS NULL;`, + pgx.Identifier{dstTableName.Schema, dstTableName.Table}.Sanitize(), + syncedAtCol, + syncedAtCol, + ) + _, err = tx.Exec(context.Background(), updateSyncedAtStmt) + if err != nil { + return -1, fmt.Errorf("failed to update synced_at column: %v", err) + } + } } else { // Step 2.1: Create a temp staging table stagingTableName := fmt.Sprintf("_peerdb_staging_%s", shared.RandomString(8)) @@ -88,7 +102,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} createStagingTableStmt := fmt.Sprintf( - "CREATE UNLOGGED TABLE %s (LIKE %s);", + "CREATE UNLOGGED TABLE %s (LIKE %s INCLUDING INDEXES);", stagingTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), ) @@ -128,16 +142,19 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } selectStrArray = append(selectStrArray, fmt.Sprintf(`"%s"`, col)) } - + setClauseArray = append(setClauseArray, + fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol)) setClause := strings.Join(setClauseArray, ",") - selectStr := strings.Join(selectStrArray, ",") + selectStrColsSQL := strings.Join(append(selectStrArray, + fmt.Sprintf(`"%s"`, syncedAtCol)), ",") + selectStrValuesSQL := strings.Join(append(selectStrArray, `CURRENT_TIMESTAMP`), ",") // Step 2.3: Perform the upsert operation, ON CONFLICT UPDATE upsertStmt := fmt.Sprintf( "INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (%s) DO UPDATE SET %s;", dstTableIdentifier.Sanitize(), - selectStr, - selectStr, + selectStrColsSQL, + selectStrValuesSQL, stagingTableIdentifier.Sanitize(), strings.Join(writeMode.UpsertKeyColumns, ", "), setClause, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 2373427c8e..644f61a611 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -125,6 +125,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex TableNameSchemaMapping: map[string]*protos.TableSchema{ q.config.DestinationTableIdentifier: tblSchemaOutput.TableNameSchemaMapping[q.config.WatermarkTable], }, + SyncedAtColName: q.config.SyncedAtColName, } future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 527fde5720..bc8448b4b7 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -176,6 +176,7 @@ func (s *SnapshotFlowExecution) cloneTable( NumRowsPerPartition: numRowsPerPartition, MaxParallelWorkers: numWorkers, StagingPath: s.config.SnapshotStagingPath, + SyncedAtColName: s.config.SyncedAtColName, WriteMode: &protos.QRepWriteMode{ WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, }, From 97f734d29a5755210c6b354c462e7b9da2635d39 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 19 Dec 2023 21:18:01 +0530 Subject: [PATCH 2/6] pg and bq qrep --- flow/cmd/handler.go | 7 ++ flow/connectors/bigquery/qrep.go | 2 +- flow/connectors/bigquery/qrep_avro_sync.go | 17 +++-- flow/e2e/bigquery/peer_flow_bq_test.go | 19 +++-- flow/e2e/bigquery/qrep_flow_bq_test.go | 39 +++++++++- flow/e2e/postgres/qrep_flow_pg_test.go | 82 +++++++++++++++++++++- flow/e2e/s3/qrep_flow_s3_test.go | 6 +- flow/e2e/snowflake/qrep_flow_sf_test.go | 12 +++- flow/e2e/test_utils.go | 6 +- 9 files changed, 172 insertions(+), 18 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 9efdc59fb5..7b03c9de67 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -268,6 +268,13 @@ func (h *FlowRequestHandler) CreateQRepFlow( } else { workflowFn = peerflow.QRepFlowWorkflow } + + if req.QrepConfig.SyncedAtColName == "" { + cfg.SyncedAtColName = "_PEERDB_SYNCED_AT" + } else { + // make them all uppercase + cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName) + } _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index a353d432eb..6dd1f3149b 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -46,7 +46,7 @@ func (c *BigQueryConnector) SyncQRepRecords( partition.PartitionId, destTable)) avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath} - return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream) + return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream, config.SyncedAtColName) } func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 9bb01157fe..882f11a8c2 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( flowJobName, dstTableName, syncBatchID), ) // You will need to define your Avro schema as a string - avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "") if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -107,6 +107,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( partition *protos.QRepPartition, dstTableMetadata *bigquery.TableMetadata, stream *model.QRecordStream, + syncedAtCol string, ) (int, error) { startTime := time.Now() flowLog := slog.Group("sync_metadata", @@ -115,7 +116,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( slog.String("destinationTable", dstTableName), ) // You will need to define your Avro schema as a string - avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata) + avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol) if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } @@ -137,9 +138,13 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( // Start a transaction stmts := []string{"BEGIN TRANSACTION;"} + selector := "*" + if syncedAtCol != "" { // PeerDB column + selector = "*, CURRENT_TIMESTAMP" + } // Insert the records from the staging table into the destination table - insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT * FROM `%s.%s`;", - datasetID, dstTableName, datasetID, stagingTable) + insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;", + datasetID, dstTableName, selector, datasetID, stagingTable) stmts = append(stmts, insertStmt) @@ -181,11 +186,15 @@ type AvroSchema struct { func DefineAvroSchema(dstTableName string, dstTableMetadata *bigquery.TableMetadata, + syncedAtCol string, ) (*model.QRecordAvroSchemaDefinition, error) { avroFields := []AvroField{} nullableFields := make(map[string]struct{}) for _, bqField := range dstTableMetadata.Schema { + if bqField.Name == syncedAtCol { + continue + } avroType, err := GetAvroType(bqField) if err != nil { return nil, err diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 30e203aeba..b28577f4d3 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -52,10 +52,14 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.bqSuffix) } -func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID int8) error { +func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDelete bool) error { qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, dstQualified) - query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d", - qualifiedTableName, rowID) + selector := "`_PEERDB_SYNCED_AT`" + if softDelete { + selector += ", `_PEERDB_IS_DELETED`" + } + query := fmt.Sprintf("SELECT %s FROM %s", + selector, qualifiedTableName) recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query) if err != nil { @@ -63,6 +67,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID i } recordCount := 0 + for _, record := range recordBatch.Records { for _, entry := range record.Entries { if entry.Kind == qvalue.QValueKindBoolean { @@ -78,12 +83,14 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, rowID i if !ok { return fmt.Errorf("peerdb column failed: _PEERDB_SYNCED_AT is not valid") } + recordCount += 1 } } } - if recordCount != 2 { - return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED or _PEERDB_SYNCED_AT not present") + + if recordCount == 0 { + return fmt.Errorf("peerdb column check failed: no records found") } return nil @@ -1191,7 +1198,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - err = s.checkPeerdbColumns(dstTableName, 1) + err = s.checkPeerdbColumns(dstTableName, true) require.NoError(s.t, err) env.AssertExpectations(s.t) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index f520014b04..299b0d1c9d 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -10,7 +10,7 @@ import ( ) func (s PeerFlowE2ETestSuiteBQ) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, s.bqSuffix, tableName) + err := e2e.CreateTableForQRep(s.pool, s.bqSuffix, tableName) require.NoError(s.t, err) err = e2e.PopulateSourceTable(s.pool, s.bqSuffix, tableName, rowCount) require.NoError(s.t, err) @@ -64,6 +64,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { tblName, query, s.bqHelper.Peer, + "", + false, "") require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -78,3 +80,38 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuiteBQ) Test_Columns_QRep_BQ() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + numRows := 10 + + tblName := "test_columns_bq_qrep" + s.setupSourceTable(tblName, numRows) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.bqSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig("test_qrep_flow_avro", + fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tblName), + tblName, + query, + s.bqHelper.Peer, + "", + true, + "_PEERDB_SYNCED_AT") + require.NoError(s.t, err) + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + require.NoError(s.t, err) + + err = s.checkPeerdbColumns(tblName, false) + require.NoError(s.t, err) + + env.AssertExpectations(s.t) +} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index 192863e397..afe986eae8 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -10,6 +10,7 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/joho/godotenv" "github.com/stretchr/testify/suite" @@ -67,7 +68,7 @@ func (s *PeerFlowE2ETestSuitePG) TearDownSuite() { } func (s *PeerFlowE2ETestSuitePG) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, postgresSuffix, tableName) + err := e2e.CreateTableForQRep(s.pool, postgresSuffix, tableName) s.NoError(err) err = e2e.PopulateSourceTable(s.pool, postgresSuffix, tableName, rowCount) s.NoError(err) @@ -134,6 +135,30 @@ func (s *PeerFlowE2ETestSuitePG) compareQuery(srcSchemaQualified, dstSchemaQuali return nil } +func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { + query := fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s`, dstSchemaQualified) + + rows, _ := s.pool.Query(context.Background(), query) + + defer rows.Close() + for rows.Next() { + var syncedAt pgtype.Timestamp + err := rows.Scan(&syncedAt) + if err != nil { + return err + } + + if !syncedAt.Valid { + return fmt.Errorf("synced_at is not valid") + } + } + + if rows.Err() != nil { + return rows.Err() + } + return nil +} + func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.T()) @@ -146,8 +171,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { //nolint:gosec dstTable := "test_qrep_flow_avro_pg_2" - // the name is misleading, but this is the destination table - err := e2e.CreateSourceTableQRep(s.pool, postgresSuffix, dstTable) + + err := e2e.CreateTableForQRep(s.pool, postgresSuffix, dstTable) s.NoError(err) srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) @@ -165,6 +190,8 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { query, postgresPeer, "", + true, + "", ) s.NoError(err) @@ -183,3 +210,52 @@ func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.T()) + + numRows := 10 + + //nolint:gosec + srcTable := "test_qrep_columns_pg_1" + s.setupSourceTable(srcTable, numRows) + + //nolint:gosec + dstTable := "test_qrep_columns_pg_2" + + srcSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, srcTable) + dstSchemaQualified := fmt.Sprintf("%s_%s.%s", "e2e_test", postgresSuffix, dstTable) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + postgresSuffix, srcTable) + + postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_columns_pg", + srcSchemaQualified, + dstSchemaQualified, + query, + postgresPeer, + "", + true, + "_PEERDB_SYNCED_AT", + ) + s.NoError(err) + + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + s.NoError(err) + + err = s.checkSyncedAt(dstSchemaQualified) + if err != nil { + s.FailNow(err.Error()) + } + + env.AssertExpectations(s.T()) +} diff --git a/flow/e2e/s3/qrep_flow_s3_test.go b/flow/e2e/s3/qrep_flow_s3_test.go index 62523e1adf..fda57ced09 100644 --- a/flow/e2e/s3/qrep_flow_s3_test.go +++ b/flow/e2e/s3/qrep_flow_s3_test.go @@ -30,7 +30,7 @@ func TestPeerFlowE2ETestSuiteS3(t *testing.T) { } func (s *PeerFlowE2ETestSuiteS3) setupSourceTable(tableName string, rowCount int) { - err := e2e.CreateSourceTableQRep(s.pool, s3Suffix, tableName) + err := e2e.CreateTableForQRep(s.pool, s3Suffix, tableName) s.NoError(err) err = e2e.PopulateSourceTable(s.pool, s3Suffix, tableName, rowCount) s.NoError(err) @@ -106,6 +106,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() { query, s.s3Helper.GetPeer(), "stage", + false, + "", ) s.NoError(err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url @@ -152,6 +154,8 @@ func (s *PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() { query, s.s3Helper.GetPeer(), "stage", + false, + "", ) s.NoError(err) qrepConfig.StagingPath = s.s3Helper.s3Config.Url diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 49ed3614b9..a45134aa6c 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -13,7 +13,7 @@ import ( ) func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) { - err := e2e.CreateSourceTableQRep(s.pool, s.pgSuffix, tableName) + err := e2e.CreateTableForQRep(s.pool, s.pgSuffix, tableName) require.NoError(s.t, err) err = e2e.PopulateSourceTable(s.pool, s.pgSuffix, tableName, numRows) require.NoError(s.t, err) @@ -77,6 +77,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { query, s.sfHelper.Peer, "", + false, + "", ) require.NoError(s.t, err) @@ -116,6 +118,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() query, s.sfHelper.Peer, "", + false, + "", ) qrepConfig.WriteMode = &protos.QRepWriteMode{ WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, @@ -159,6 +163,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { query, s.sfHelper.Peer, "", + false, + "", ) require.NoError(s.t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) @@ -199,6 +205,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { query, s.sfHelper.Peer, "", + false, + "", ) qrepConfig.WriteMode = &protos.QRepWriteMode{ WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, @@ -247,6 +255,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() sfPeer, "", + false, + "", ) require.NoError(s.t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 7c12240580..0aa1c12242 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -123,7 +123,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, } } -func CreateSourceTableQRep(pool *pgxpool.Pool, suffix string, tableName string) error { +func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) error { tblFields := []string{ "id UUID NOT NULL PRIMARY KEY", "card_id UUID", @@ -287,6 +287,8 @@ func CreateQRepWorkflowConfig( query string, dest *protos.Peer, stagingPath string, + setupDst bool, + syncedAtCol string, ) (*protos.QRepConfig, error) { connectionGen := QRepFlowConnectionGenerationConfig{ FlowJobName: flowJobName, @@ -304,6 +306,8 @@ func CreateQRepWorkflowConfig( return nil, err } qrepConfig.InitialCopyOnly = true + qrepConfig.SyncedAtColName = syncedAtCol + qrepConfig.SetupWatermarkTableOnDestination = setupDst return qrepConfig, nil } From 8583bcaa6e5c914ade6a5659583e41f71ed6cf98 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 19 Dec 2023 23:42:54 +0530 Subject: [PATCH 3/6] synced at for sf qrep --- flow/connectors/snowflake/qrep.go | 4 +- flow/connectors/snowflake/qrep_avro_sync.go | 7 +++- flow/e2e/bigquery/qrep_flow_bq_test.go | 2 +- flow/e2e/snowflake/qrep_flow_sf_test.go | 45 +++++++++++++++++++++ flow/e2e/snowflake/snowflake_helper.go | 23 +++++++++++ 5 files changed, 77 insertions(+), 4 deletions(-) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index b099a54ff4..98d20b63ff 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -249,7 +249,7 @@ func (c *SnowflakeConnector) createExternalStage(stageName string, config *proto } func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig) error { - c.logger.Error("Consolidating partitions") + c.logger.Info("Consolidating partitions") destTable := config.DestinationTableIdentifier stageName := c.getStageNameForJob(config.FlowJobName) @@ -272,7 +272,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig // CleanupQRepFlow function for snowflake connector func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error { - c.logger.Error("Cleaning up flow job") + c.logger.Info("Cleaning up flow job") return c.dropStage(config.StagingPath, config.FlowJobName) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index eb83b554b1..7184898ae3 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -300,6 +300,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage func (c *SnowflakeConnector) GetCopyTransformation( dstTableName string, + syncedAtCol string, ) (*CopyInfo, error) { colInfo, colsErr := c.getColsFromTable(dstTableName) if colsErr != nil { @@ -310,6 +311,10 @@ func (c *SnowflakeConnector) GetCopyTransformation( columnOrder := make([]string, 0, len(colInfo.ColumnMap)) for colName, colType := range colInfo.ColumnMap { columnOrder = append(columnOrder, fmt.Sprintf("\"%s\"", colName)) + if colName == syncedAtCol { + transformations = append(transformations, fmt.Sprintf("CURRENT_TIMESTAMP AS \"%s\"", colName)) + continue + } switch colType { case "GEOGRAPHY": transformations = append(transformations, @@ -354,7 +359,7 @@ func CopyStageToDestination( } } - copyTransformation, err := connector.GetCopyTransformation(dstTableName) + copyTransformation, err := connector.GetCopyTransformation(dstTableName, config.SyncedAtColName) if err != nil { return fmt.Errorf("failed to get copy transformation: %w", err) } diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index 299b0d1c9d..ca74a412c3 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -81,7 +81,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() { env.AssertExpectations(s.t) } -func (s PeerFlowE2ETestSuiteBQ) Test_Columns_QRep_BQ() { +func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() { env := e2e.NewTemporalTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.t) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index a45134aa6c..3ac7fee713 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -274,3 +274,48 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + numRows := 10 + + tblName := "test_qrep_columns_sf" + s.setupSourceTable(tblName, numRows) + + dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", + s.pgSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_columns_qrep_sf", + fmt.Sprintf("e2e_test_%s.%s", s.pgSuffix, tblName), + dstSchemaQualified, + query, + s.sfHelper.Peer, + "", + true, + "_PEERDB_SYNCED_AT", + ) + qrepConfig.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, + UpsertKeyColumns: []string{"id"}, + } + require.NoError(s.t, err) + + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + require.NoError(s.t, err) + + err = s.sfHelper.checkSyncedAt(fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s.%s`, + s.sfHelper.testSchemaName, tblName)) + require.NoError(s.t, err) + + env.AssertExpectations(s.t) +} diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 38fefeddc0..0401d34f58 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "os" + "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/e2e" @@ -175,3 +176,25 @@ func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) { return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind) } } + +// runs a query that returns an int result +func (s *SnowflakeTestHelper) checkSyncedAt(query string) error { + recordBatch, err := s.testClient.ExecuteAndProcessQuery(query) + if err != nil { + return err + } + + for _, record := range recordBatch.Records { + for _, entry := range record.Entries { + if entry.Kind != qvalue.QValueKindTimestamp { + return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp") + } + _, ok := entry.Value.(time.Time) + if !ok { + return fmt.Errorf("synced_at column failed: _PEERDB_SYNCED_AT is not valid") + } + } + } + + return nil +} From b3667a83e727a22e6dd8be8821b70710c2337537 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 19 Dec 2023 23:57:27 +0530 Subject: [PATCH 4/6] lint --- flow/connectors/bigquery/qrep.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 6dd1f3149b..bf1c603d43 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -46,7 +46,8 @@ func (c *BigQueryConnector) SyncQRepRecords( partition.PartitionId, destTable)) avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath} - return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream, config.SyncedAtColName) + return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, + tblMetadata, stream, config.SyncedAtColName) } func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition, From 791ad751da26ba349afff42003a3f8bce19f472e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 20 Dec 2023 00:12:16 +0530 Subject: [PATCH 5/6] better code for pg upsert --- flow/connectors/postgres/qrep_sync_method.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/flow/connectors/postgres/qrep_sync_method.go b/flow/connectors/postgres/qrep_sync_method.go index d11f917e05..6725032411 100644 --- a/flow/connectors/postgres/qrep_sync_method.go +++ b/flow/connectors/postgres/qrep_sync_method.go @@ -102,7 +102,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords( dstTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} createStagingTableStmt := fmt.Sprintf( - "CREATE UNLOGGED TABLE %s (LIKE %s INCLUDING INDEXES);", + "CREATE UNLOGGED TABLE %s (LIKE %s);", stagingTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), ) @@ -145,16 +145,15 @@ func (s *QRepStagingTableSync) SyncQRepRecords( setClauseArray = append(setClauseArray, fmt.Sprintf(`"%s" = CURRENT_TIMESTAMP`, syncedAtCol)) setClause := strings.Join(setClauseArray, ",") - selectStrColsSQL := strings.Join(append(selectStrArray, - fmt.Sprintf(`"%s"`, syncedAtCol)), ",") - selectStrValuesSQL := strings.Join(append(selectStrArray, `CURRENT_TIMESTAMP`), ",") + selectSQL := strings.Join(selectStrArray, ",") // Step 2.3: Perform the upsert operation, ON CONFLICT UPDATE upsertStmt := fmt.Sprintf( - "INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (%s) DO UPDATE SET %s;", + `INSERT INTO %s (%s, "%s") SELECT %s, CURRENT_TIMESTAMP FROM %s ON CONFLICT (%s) DO UPDATE SET %s;`, dstTableIdentifier.Sanitize(), - selectStrColsSQL, - selectStrValuesSQL, + selectSQL, + syncedAtCol, + selectSQL, stagingTableIdentifier.Sanitize(), strings.Join(writeMode.UpsertKeyColumns, ", "), setClause, From ac5cb981344ae152b55fbeb116f2d4fb517ec230 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 20 Dec 2023 00:22:31 +0530 Subject: [PATCH 6/6] lint: return value --- flow/e2e/postgres/qrep_flow_pg_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index afe986eae8..1c86c973b9 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -153,10 +153,7 @@ func (s *PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error } } - if rows.Err() != nil { - return rows.Err() - } - return nil + return rows.Err() } func (s *PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {