diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 02a1a2fb9a..149825c2cf 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -137,21 +137,15 @@ func (m *mergeStmtGenerator) generateMergeStmt(tempTable string, peerdbCols *pro backtickColNames = append(backtickColNames, fmt.Sprintf("`%s`", colName)) pureColNames = append(pureColNames, colName) } - // append synced_at column - colsArray := append(backtickColNames, - fmt.Sprintf("`%s`", strings.ToUpper(peerdbCols.SyncedAtColName)), - ) - valuesArray := append(backtickColNames, "CURRENT_TIMESTAMP") - insertColumnsSQL := strings.Join(colsArray, ", ") - // fill in synced_at column - insertValuesSQL := strings.Join(valuesArray, ", ") + csep := strings.Join(backtickColNames, ", ") + insertColumnsSQL := csep + fmt.Sprintf(", `%s`", peerdbCols.SyncedAtColName) + insertValuesSQL := csep + ",CURRENT_TIMESTAMP" updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns, peerdbCols) if m.peerdbCols.SoftDelete { - softDeleteInsertColumnsSQL := strings.Join(append(colsArray, - m.peerdbCols.SoftDeleteColName), ",") - softDeleteInsertValuesSQL := strings.Join(append(valuesArray, "TRUE"), ",") + softDeleteInsertColumnsSQL := insertColumnsSQL + fmt.Sprintf(", `%s`", peerdbCols.SoftDeleteColName) + softDeleteInsertValuesSQL := insertValuesSQL + ", TRUE" updateStatementsforToastCols = append(updateStatementsforToastCols, fmt.Sprintf("WHEN NOT MATCHED AND (_peerdb_deduped._PEERDB_RECORD_TYPE = 2) THEN INSERT (%s) VALUES(%s)", diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index a1f97e7cec..0a79810530 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -536,7 +536,8 @@ func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, sync func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifier string, unchangedToastColumns []string, rawTableIdentifier string, supportsMerge bool, - peerdbCols *protos.PeerDBColumns) []string { + peerdbCols *protos.PeerDBColumns, +) []string { if supportsMerge { return []string{c.generateMergeStatement(destinationTableIdentifier, unchangedToastColumns, rawTableIdentifier, peerdbCols)} @@ -547,7 +548,8 @@ func (c *PostgresConnector) generateNormalizeStatements(destinationTableIdentifi } func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifier string, - rawTableIdentifier string, peerdbCols *protos.PeerDBColumns) []string { + rawTableIdentifier string, peerdbCols *protos.PeerDBColumns, +) []string { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] columnNames := make([]string, 0, len(normalizedTableSchema.Columns)) flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns)) @@ -642,7 +644,6 @@ func (c *PostgresConnector) generateMergeStatement( flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",") insertValuesSQLArray := make([]string, 0, len(columnNames)) for _, columnName := range columnNames { - fmt.Println("MERGE columnName: ", columnName) insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("src.%s", columnName)) } @@ -693,7 +694,8 @@ func (c *PostgresConnector) generateMergeStatement( } func (c *PostgresConnector) generateUpdateStatement(allCols []string, - unchangedToastColsLists []string, peerdbCols *protos.PeerDBColumns) []string { + unchangedToastColsLists []string, peerdbCols *protos.PeerDBColumns, +) []string { updateStmts := make([]string, 0, len(unchangedToastColsLists)) for _, cols := range unchangedToastColsLists { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index de3ddae7e5..0473f26945 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/e2e" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" "github.com/jackc/pgx/v5/pgxpool" @@ -51,6 +52,42 @@ func (s PeerFlowE2ETestSuiteBQ) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, s.bqSuffix) } +func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { + query := fmt.Sprintf("SELECT `_PEERDB_IS_DELETED`,`_PEERDB_SYNCED_AT` FROM %s WHERE id = %d", + dstSchemaQualified, rowID) + + recordBatch, err := s.bqHelper.ExecuteAndProcessQuery(query) + if err != nil { + return err + } + + recordCount := 0 + for _, record := range recordBatch.Records { + for _, entry := range record.Entries { + if entry.Kind == qvalue.QValueKindBoolean { + isDeleteVal, ok := entry.Value.(bool) + if !(ok && isDeleteVal) { + return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED is not true") + } + recordCount += 1 + } + + if entry.Kind == qvalue.QValueKindTimestamp { + _, ok := entry.Value.(time.Time) + if !ok { + return fmt.Errorf("peerdb column failed: _PEERDB_SYNCED_AT is not valid") + } + recordCount += 1 + } + } + } + if recordCount != 2 { + return fmt.Errorf("peerdb column failed: _PEERDB_IS_DELETED or _PEERDB_SYNCED_AT not present") + } + + return nil +} + // setupBigQuery sets up the bigquery connection. func setupBigQuery(t *testing.T) *BigQueryTestHelper { bqHelper, err := NewBigQueryTestHelper() @@ -419,7 +456,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Nochanges_BQ() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsBQ(dstTableName, "id,t1,t2,k") + s.compareTableContentsBQ(dstTableName, "id,t1,t2,k,_PEERDB_IS_DELETED, _PEERDB_SYNCED_AT") env.AssertExpectations(s.t) <-done } @@ -1095,3 +1132,68 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { env.AssertExpectations(s.t) } + +func (s *PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.t) + + srcTableName := s.attachSchemaSuffix("test_peerdb_cols") + dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 2, + MaxBatchSize: 100, + } + + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 1 row into the source table + testKey := fmt.Sprintf("test_key_%d", 1) + testValue := fmt.Sprintf("test_value_%d", 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + require.NoError(s.t, err) + + // delete that row + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1 + `, srcTableName)) + require.NoError(s.t, err) + fmt.Println("Inserted 10 rows into the source table") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + require.Contains(s.t, err.Error(), "continue as new") + + err = s.checkPeerdbColumns(dstTableName, 1) + require.NoError(s.t, err) + + env.AssertExpectations(s.t) +} diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index ac28879f45..b71ad726a9 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -201,6 +201,7 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto ret.Source = GeneratePostgresPeer(c.PostgresPort) ret.Destination = c.Destination ret.CdcStagingPath = c.CdcStagingPath + ret.SoftDelete = true ret.SoftDeleteColName = "_PEERDB_IS_DELETED" ret.SyncedAtColName = "_PEERDB_SYNCED_AT" return ret, nil diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 2720891fb6..f805fc9eba 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" peerflow "github.com/PeerDB-io/peer-flow/workflows" + "github.com/jackc/pgx/v5/pgtype" ) func (s *PeerFlowE2ETestSuitePG) attachSchemaSuffix(tableName string) string { @@ -18,6 +19,27 @@ func (s *PeerFlowE2ETestSuitePG) attachSuffix(input string) string { return fmt.Sprintf("%s_%s", input, postgresSuffix) } +func (s *PeerFlowE2ETestSuitePG) checkPeerdbColumns(dstSchemaQualified string, rowID int8) error { + query := fmt.Sprintf(`SELECT "_PEERDB_IS_DELETED","_PEERDB_SYNCED_AT" FROM %s WHERE id = %d`, + dstSchemaQualified, rowID) + var isDeleted pgtype.Bool + var syncedAt pgtype.Timestamp + err := s.pool.QueryRow(context.Background(), query).Scan(&isDeleted, &syncedAt) + if err != nil { + return fmt.Errorf("failed to query row: %w", err) + } + + if !isDeleted.Bool { + return fmt.Errorf("isDeleted is not true") + } + + if !syncedAt.Valid { + return fmt.Errorf("syncedAt is not valid") + } + + return nil +} + func (s *PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env, s.T()) @@ -474,3 +496,66 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env, s.T()) + + srcTableName := s.attachSchemaSuffix("test_peerdb_cols") + dstTableName := s.attachSchemaSuffix("test_peerdb_cols_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + key TEXT NOT NULL, + value TEXT NOT NULL + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 2, + MaxBatchSize: 100, + } + + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 1 row into the source table + testKey := fmt.Sprintf("test_key_%d", 1) + testValue := fmt.Sprintf("test_value_%d", 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(key, value) VALUES ($1, $2) + `, srcTableName), testKey, testValue) + s.NoError(err) + + // delete that row + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1 + `, srcTableName)) + s.NoError(err) + fmt.Println("Inserted and deleted a row for peerdb column check") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + err = env.GetWorkflowError() + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + checkErr := s.checkPeerdbColumns(dstTableName, 1) + s.NoError(checkErr) + env.AssertExpectations(s.T()) +}