diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index dbbe2aa6c..b51741aff 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -32,20 +32,22 @@ const ( createRawTableBatchIDIndexSQL = "CREATE INDEX IF NOT EXISTS %s_batchid_idx ON %s.%s(_peerdb_batch_id)" createRawTableDstTableIndexSQL = "CREATE INDEX IF NOT EXISTS %s_dst_table_idx ON %s.%s(_peerdb_destination_table_name)" - getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1" - setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2" - getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1" - getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1" - createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" + getLastOffsetSQL = "SELECT lsn_offset FROM %s.%s WHERE mirror_job_name=$1" + setLastOffsetSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1) WHERE mirror_job_name=$2" + getLastSyncBatchID_SQL = "SELECT sync_batch_id FROM %s.%s WHERE mirror_job_name=$1" + getLastSyncAndNormalizeBatchID_SQL = "SELECT sync_batch_id,normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1" + createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" insertJobMetadataSQL = "INSERT INTO %s.%s VALUES ($1,$2,$3,$4)" checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1" updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET lsn_offset=GREATEST(lsn_offset, $1), sync_batch_id=$2 WHERE mirror_job_name=$3" updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2" + getDistinctDestinationTableNamesSQL = `SELECT DISTINCT _peerdb_destination_table_name FROM %s.%s WHERE + _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2` getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name, ARRAY_AGG(DISTINCT _peerdb_unchanged_toast_columns) FROM %s.%s WHERE - _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2 GROUP BY _peerdb_destination_table_name` + _peerdb_batch_id>$1 AND _peerdb_batch_id<=$2 AND _peerdb_record_type!=2 GROUP BY _peerdb_destination_table_name` srcTableName = "src" mergeStatementSQL = `WITH src_rank AS ( SELECT _peerdb_data,_peerdb_record_type,_peerdb_unchanged_toast_columns, @@ -428,46 +430,40 @@ func generateCreateTableSQLForNormalizedTable( } func (c *PostgresConnector) GetLastSyncBatchID(jobName string) (int64, error) { - rows, err := c.pool.Query(c.ctx, fmt.Sprintf( + var result pgtype.Int8 + err := c.pool.QueryRow(c.ctx, fmt.Sprintf( getLastSyncBatchID_SQL, c.metadataSchema, mirrorJobsTableIdentifier, - ), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Postgres peer for last syncBatchId: %w", err) - } - defer rows.Close() - - var result pgtype.Int8 - if !rows.Next() { - c.logger.Info("No row found, returning 0") - return 0, nil - } - err = rows.Scan(&result) + ), jobName).Scan(&result) if err != nil { + if err == pgx.ErrNoRows { + c.logger.Info("No row found, returning 0") + return 0, nil + } return 0, fmt.Errorf("error while reading result row: %w", err) } return result.Int64, nil } -func (c *PostgresConnector) getLastNormalizeBatchID(jobName string) (int64, error) { - rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getLastNormalizeBatchID_SQL, c.metadataSchema, - mirrorJobsTableIdentifier), jobName) - if err != nil { - return 0, fmt.Errorf("error querying Postgres peer for last normalizeBatchId: %w", err) - } - defer rows.Close() - - var result pgtype.Int8 - if !rows.Next() { - c.logger.Info("No row found returning 0") - return 0, nil - } - err = rows.Scan(&result) +func (c *PostgresConnector) GetLastSyncAndNormalizeBatchID(jobName string) (*model.SyncAndNormalizeBatchID, error) { + var syncResult, normalizeResult pgtype.Int8 + err := c.pool.QueryRow(c.ctx, fmt.Sprintf( + getLastSyncAndNormalizeBatchID_SQL, + c.metadataSchema, + mirrorJobsTableIdentifier, + ), jobName).Scan(&syncResult, &normalizeResult) if err != nil { - return 0, fmt.Errorf("error while reading result row: %w", err) + if err == pgx.ErrNoRows { + c.logger.Info("No row found, returning 0") + return &model.SyncAndNormalizeBatchID{}, nil + } + return nil, fmt.Errorf("error while reading result row: %w", err) } - return result.Int64, nil + return &model.SyncAndNormalizeBatchID{ + SyncBatchID: syncResult.Int64, + NormalizeBatchID: normalizeResult.Int64, + }, nil } func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) { @@ -549,6 +545,30 @@ func (c *PostgresConnector) updateNormalizeMetadata(flowJobName string, normaliz return nil } +func (c *PostgresConnector) getDistinctTableNamesInBatch(flowJobName string, syncBatchID int64, + normalizeBatchID int64, +) ([]string, error) { + rawTableIdentifier := getRawTableIdentifier(flowJobName) + + rows, err := c.pool.Query(c.ctx, fmt.Sprintf(getDistinctDestinationTableNamesSQL, c.metadataSchema, + rawTableIdentifier), normalizeBatchID, syncBatchID) + if err != nil { + return nil, fmt.Errorf("error while retrieving table names for normalization: %w", err) + } + defer rows.Close() + + var result pgtype.Text + destinationTableNames := make([]string, 0) + for rows.Next() { + err = rows.Scan(&result) + if err != nil { + return nil, fmt.Errorf("failed to read row: %w", err) + } + destinationTableNames = append(destinationTableNames, result.String) + } + return destinationTableNames, nil +} + func (c *PostgresConnector) getTableNametoUnchangedCols(flowJobName string, syncBatchID int64, normalizeBatchID int64, ) (map[string][]string, error) { @@ -768,7 +788,7 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, ssep := strings.Join(tmpArray, ",") updateStmt := fmt.Sprintf(`WHEN MATCHED AND - src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s' + src._peerdb_record_type!=2 AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) @@ -780,7 +800,7 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, fmt.Sprintf(`"%s" = TRUE`, peerdbCols.SoftDeleteColName)) ssep := strings.Join(tmpArray, ", ") updateStmt := fmt.Sprintf(`WHEN MATCHED AND - src._peerdb_record_type = 2 AND _peerdb_unchanged_toast_columns='%s' + src._peerdb_record_type=2 AND _peerdb_unchanged_toast_columns='%s' THEN UPDATE SET %s `, cols, ssep) updateStmts = append(updateStmts, updateStmt) } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index a3f991d74..d869b2215 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -404,30 +404,41 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) - syncBatchID, err := c.GetLastSyncBatchID(req.FlowJobName) + + jobMetadataExists, err := c.jobMetadataExists(req.FlowJobName) if err != nil { return nil, err } - normalizeBatchID, err := c.getLastNormalizeBatchID(req.FlowJobName) - if err != nil { - return nil, err + // no SyncFlow has run, chill until more records are loaded. + if !jobMetadataExists { + c.logger.Info("no metadata found for mirror") + return &model.NormalizeResponse{ + Done: false, + }, nil } - jobMetadataExists, err := c.jobMetadataExists(req.FlowJobName) + + batchIDs, err := c.GetLastSyncAndNormalizeBatchID(req.FlowJobName) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get batch for the current mirror: %v", err) } - // normalize has caught up with sync or no SyncFlow has run, chill until more records are loaded. - if normalizeBatchID >= syncBatchID || !jobMetadataExists { + // normalize has caught up with sync, chill until more records are loaded. + if batchIDs.NormalizeBatchID >= batchIDs.SyncBatchID { c.logger.Info(fmt.Sprintf("no records to normalize: syncBatchID %d, normalizeBatchID %d", - syncBatchID, normalizeBatchID)) + batchIDs.SyncBatchID, batchIDs.NormalizeBatchID)) return &model.NormalizeResponse{ Done: false, - StartBatchID: normalizeBatchID, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID, + EndBatchID: batchIDs.SyncBatchID, }, nil } - unchangedToastColsMap, err := c.getTableNametoUnchangedCols(req.FlowJobName, syncBatchID, normalizeBatchID) + destinationTableNames, err := c.getDistinctTableNamesInBatch( + req.FlowJobName, batchIDs.SyncBatchID, batchIDs.NormalizeBatchID) + if err != nil { + return nil, err + } + unchangedToastColsMap, err := c.getTableNametoUnchangedCols(req.FlowJobName, + batchIDs.SyncBatchID, batchIDs.NormalizeBatchID) if err != nil { return nil, err } @@ -449,16 +460,17 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) } mergeStatementsBatch := &pgx.Batch{} totalRowsAffected := 0 - for destinationTableName, unchangedToastCols := range unchangedToastColsMap { + for _, destinationTableName := range destinationTableNames { peerdbCols := protos.PeerDBColumns{ SoftDeleteColName: req.SoftDeleteColName, SyncedAtColName: req.SyncedAtColName, SoftDelete: req.SoftDelete, } - normalizeStatements := c.generateNormalizeStatements(destinationTableName, unchangedToastCols, + normalizeStatements := c.generateNormalizeStatements(destinationTableName, unchangedToastColsMap[destinationTableName], rawTableIdentifier, supportsMerge, &peerdbCols) + fmt.Println(normalizeStatements) for _, normalizeStatement := range normalizeStatements { - mergeStatementsBatch.Queue(normalizeStatement, normalizeBatchID, syncBatchID, destinationTableName).Exec( + mergeStatementsBatch.Queue(normalizeStatement, batchIDs.NormalizeBatchID, batchIDs.SyncBatchID, destinationTableName).Exec( func(ct pgconn.CommandTag) error { totalRowsAffected += int(ct.RowsAffected()) return nil @@ -475,7 +487,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) c.logger.Info(fmt.Sprintf("normalized %d records", totalRowsAffected)) // updating metadata with new normalizeBatchID - err = c.updateNormalizeMetadata(req.FlowJobName, syncBatchID, normalizeRecordsTx) + err = c.updateNormalizeMetadata(req.FlowJobName, batchIDs.SyncBatchID, normalizeRecordsTx) if err != nil { return nil, err } @@ -487,8 +499,8 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest) return &model.NormalizeResponse{ Done: true, - StartBatchID: normalizeBatchID + 1, - EndBatchID: syncBatchID, + StartBatchID: batchIDs.NormalizeBatchID + 1, + EndBatchID: batchIDs.SyncBatchID, }, nil } diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 8c34bc7c8..43f3a0a6c 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -3,6 +3,7 @@ package e2e_postgres import ( "context" "fmt" + "sync" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -558,3 +559,343 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { require.NoError(s.t, checkErr) env.AssertExpectations(s.t) } + +func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + cmpTableName := s.attachSchemaSuffix("test_softdel") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := s.attachSchemaSuffix("test_softdel_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + wg := sync.WaitGroup{} + wg.Add(1) + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + // since we delete stuff, create another table to compare with + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + wg.Done() + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + wg.Wait() + + // verify our updates and delete happened + err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") + require.NoError(s.t, err) + + softDeleteQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + dstTableName) + numRows, err := s.countRowsInQuery(softDeleteQuery) + require.NoError(s.t, err) + require.Equal(s.t, int64(1), numRows) +} + +func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + cmpTableName := s.attachSchemaSuffix("test_softdel_iud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := s.attachSchemaSuffix("test_softdel_iud_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + insertTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + require.NoError(s.t, insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") + require.NoError(s.t, err) + + softDeleteQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + dstTableName) + numRows, err := s.countRowsInQuery(softDeleteQuery) + require.NoError(s.t, err) + require.Equal(s.t, int64(1), numRows) +} + +func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + cmpTableName := s.attachSchemaSuffix("test_softdel_ud") + srcTableName := fmt.Sprintf("%s_src", cmpTableName) + dstTableName := s.attachSchemaSuffix("test_softdel_ud_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_ud"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + + insertTx, err := s.pool.Begin(context.Background()) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET t=random_string(10000) WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + UPDATE %s SET c1=c1+4 WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + // since we delete stuff, create another table to compare with + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE %s AS SELECT * FROM %s`, cmpTableName, srcTableName)) + require.NoError(s.t, err) + _, err = insertTx.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + + require.NoError(s.t, insertTx.Commit(context.Background())) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") + require.NoError(s.t, err) + + softDeleteQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + dstTableName) + numRows, err := s.countRowsInQuery(softDeleteQuery) + require.NoError(s.t, err) + require.Equal(s.t, int64(1), numRows) +} + +func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + srcTableName := s.attachSchemaSuffix("test_softdel_iad") + dstTableName := s.attachSchemaSuffix("test_softdel_iad_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 INT, + c2 INT, + t TEXT + ); + `, srcTableName)) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_softdel_iad"), + } + + config := &protos.FlowConnectionConfigs{ + FlowJobName: connectionGen.FlowJobName, + Destination: s.peer, + TableMappings: []*protos.TableMapping{ + { + SourceTableIdentifier: srcTableName, + DestinationTableIdentifier: dstTableName, + }, + }, + Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + CdcStagingPath: connectionGen.CdcStagingPath, + SoftDelete: true, + SoftDeleteColName: "_PEERDB_IS_DELETED", + SyncedAtColName: "_PEERDB_SYNCED_AT", + } + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 3, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2,t) VALUES (1,2,random_string(9000))`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 1) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + DELETE FROM %s WHERE id=1`, srcTableName)) + require.NoError(s.t, err) + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(id,c1,c2,t) VALUES (1,3,4,random_string(10000))`, srcTableName)) + require.NoError(s.t, err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + require.True(s.t, env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + require.Contains(s.t, err.Error(), "continue as new") + + // verify our updates and delete happened + err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t") + require.NoError(s.t, err) + + softDeleteQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"=TRUE`, + dstTableName) + numRows, err := s.countRowsInQuery(softDeleteQuery) + require.NoError(s.t, err) + require.Equal(s.t, int64(0), numRows) +} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index af6431fcd..064c28d16 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -165,6 +165,12 @@ func (s PeerFlowE2ETestSuitePG) checkSyncedAt(dstSchemaQualified string) error { return rows.Err() } +func (s PeerFlowE2ETestSuitePG) countRowsInQuery(query string) (int64, error) { + var count pgtype.Int8 + err := s.pool.QueryRow(context.Background(), query).Scan(&count) + return count.Int64, err +} + func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() { setupTx, err := s.pool.Begin(context.Background()) require.NoError(s.t, err) diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 50608087d..516716ca2 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -76,7 +76,7 @@ func (a *Alerter) AlertIf(ctx context.Context, alertKey string, alertMessage str var createdTimestamp time.Time err = row.Scan(&createdTimestamp) if err != nil && err != pgx.ErrNoRows { - a.logger.Warn("failed to send alert: %v", err) + a.logger.Warn("failed to send alert: ", slog.String("err", err.Error())) return }