diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 6bd71e3ba8..1fddb5cb93 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -512,7 +512,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, matchData: "", batchID: syncBatchID, stagingBatchID: stagingBatchID, - unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns), + unchangedToastColumns: "", }) tableNameRowsMapping[r.DestinationTableName] += 1 case *model.UpdateRecord: @@ -571,7 +571,7 @@ func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, matchData: itemsJSON, batchID: syncBatchID, stagingBatchID: stagingBatchID, - unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns), + unchangedToastColumns: "", }) tableNameRowsMapping[r.DestinationTableName] += 1 @@ -741,7 +741,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, } entries[9] = qvalue.QValue{ Kind: qvalue.QValueKindString, - Value: utils.KeysToString(r.UnchangedToastColumns), + Value: "", } tableNameRowsMapping[r.DestinationTableName] += 1 @@ -802,7 +802,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(req *model.SyncRecordsRequest, } entries[9] = qvalue.QValue{ Kind: qvalue.QValueKindString, - Value: utils.KeysToString(r.UnchangedToastColumns), + Value: "", } tableNameRowsMapping[r.DestinationTableName] += 1 @@ -1035,7 +1035,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec // create the job in the metadata table jobStatement := fmt.Sprintf( - "INSERT INTO %s.%s (mirror_job_name, offset,sync_batch_id) VALUES ('%s',%d,%d);", + "INSERT INTO %s.%s (mirror_job_name,offset,sync_batch_id) VALUES ('%s',%d,%d);", c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID) if hasJob { jobStatement = fmt.Sprintf( @@ -1288,14 +1288,12 @@ func (m *MergeStmtGenerator) generateDeDupedCTE() string { ) _peerdb_ranked WHERE _peerdb_rank = 1 ) SELECT * FROM _peerdb_de_duplicated_data_res` - pkey := m.NormalizedTableSchema.PrimaryKeyColumns[0] - return fmt.Sprintf(cte, pkey) + pkeyColsStr := fmt.Sprintf("(CONCAT(%s))", strings.Join(m.NormalizedTableSchema.PrimaryKeyColumns, ", ' ', ")) + return fmt.Sprintf(cte, pkeyColsStr) } // generateMergeStmt generates a merge statement. func (m *MergeStmtGenerator) generateMergeStmt(tempTable string) string { - pkey := m.NormalizedTableSchema.PrimaryKeyColumns[0] - // comma separated list of column names backtickColNames := make([]string, 0) pureColNames := make([]string, 0) @@ -1305,18 +1303,26 @@ func (m *MergeStmtGenerator) generateMergeStmt(tempTable string) string { } csep := strings.Join(backtickColNames, ", ") - udateStatementsforToastCols := m.generateUpdateStatement(pureColNames, m.UnchangedToastColumns) - updateStringToastCols := strings.Join(udateStatementsforToastCols, " ") + updateStatementsforToastCols := m.generateUpdateStatements(pureColNames, m.UnchangedToastColumns) + updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") + + pkeySelectSQLArray := make([]string, 0, len(m.NormalizedTableSchema.PrimaryKeyColumns)) + for _, pkeyColName := range m.NormalizedTableSchema.PrimaryKeyColumns { + pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("_peerdb_target.%s = _peerdb_deduped.%s", + pkeyColName, pkeyColName)) + } + // _peerdb_target. = _peerdb_deduped. AND _peerdb_target. = _peerdb_deduped. ... + pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") return fmt.Sprintf(` MERGE %s.%s _peerdb_target USING %s _peerdb_deduped - ON _peerdb_target.%s = _peerdb_deduped.%s + ON %s WHEN NOT MATCHED and (_peerdb_deduped._peerdb_record_type != 2) THEN INSERT (%s) VALUES (%s) %s WHEN MATCHED AND (_peerdb_deduped._peerdb_record_type = 2) THEN DELETE; - `, m.Dataset, m.NormalizedTable, tempTable, pkey, pkey, csep, csep, updateStringToastCols) + `, m.Dataset, m.NormalizedTable, tempTable, pkeySelectSQL, csep, csep, updateStringToastCols) } /* @@ -1334,7 +1340,7 @@ and updating the other columns (not the unchanged toast columns) 6. Repeat steps 1-5 for each unique unchanged toast column group. 7. Return the list of generated update statements. */ -func (m *MergeStmtGenerator) generateUpdateStatement(allCols []string, unchangedToastCols []string) []string { +func (m *MergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string { updateStmts := make([]string, 0) for _, cols := range unchangedToastCols { diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 7320639680..3d8892d4c5 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -30,7 +30,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { " `col2` = _peerdb_deduped.col2", } - result := m.generateUpdateStatement(allCols, unchangedToastCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) @@ -56,7 +56,7 @@ func TestGenerateUpdateStatement_NoUnchangedToastCols(t *testing.T) { " `col3` = _peerdb_deduped.col3", } - result := m.generateUpdateStatement(allCols, unchangedToastCols) + result := m.generateUpdateStatements(allCols, unchangedToastCols) for i := range expected { expected[i] = removeSpacesTabsNewlines(expected[i]) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index ae2b0aba4e..8e44b40153 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -239,17 +239,14 @@ func (p *PostgresCDCSource) consumeStream( // tableName here is destination tableName. // should be ideally sourceTableName as we are in PullRecords. // will change in future - pkeyColsMerged := make([]string, 0) - for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns { - pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol) - if err != nil { - return nil, fmt.Errorf("error getting pkey column value: %w", err) - } - pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value)) + compositePKeyString, err := p.compositePKeyToString(req, rec) + if err != nil { + return nil, err } + tablePkeyVal := model.TableWithPkey{ TableName: tableName, - PkeyColVal: strings.Join(pkeyColsMerged, " "), + PkeyColVal: compositePKeyString, } _, ok := records.TablePKeyLastSeen[tablePkeyVal] if !ok { @@ -266,18 +263,14 @@ func (p *PostgresCDCSource) consumeStream( records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1 } case *model.InsertRecord: - pkeyColsMerged := make([]string, 0) - for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns { - pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol) - if err != nil { - return nil, fmt.Errorf("error getting pkey column value: %w", err) - } - pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal)) + compositePKeyString, err := p.compositePKeyToString(req, rec) + if err != nil { + return nil, err } tablePkeyVal := model.TableWithPkey{ TableName: tableName, - PkeyColVal: strings.Join(pkeyColsMerged, " "), + PkeyColVal: compositePKeyString, } records.Records = append(records.Records, rec) // all columns will be set in insert record, so add it to the map @@ -365,17 +358,16 @@ func (p *PostgresCDCSource) processInsertMessage( } // create empty map of string to interface{} - items, unchangedToastColumns, err := p.convertTupleToMap(msg.Tuple, rel) + items, _, err := p.convertTupleToMap(msg.Tuple, rel) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } return &model.InsertRecord{ - CheckPointID: int64(lsn), - Items: items, - DestinationTableName: p.TableNameMapping[tableName], - SourceTableName: tableName, - UnchangedToastColumns: unchangedToastColumns, + CheckPointID: int64(lsn), + Items: items, + DestinationTableName: p.TableNameMapping[tableName], + SourceTableName: tableName, }, nil } @@ -437,17 +429,16 @@ func (p *PostgresCDCSource) processDeleteMessage( } // create empty map of string to interface{} - items, unchangedToastColumns, err := p.convertTupleToMap(msg.OldTuple, rel) + items, _, err := p.convertTupleToMap(msg.OldTuple, rel) if err != nil { return nil, fmt.Errorf("error converting tuple to map: %w", err) } return &model.DeleteRecord{ - CheckPointID: int64(lsn), - Items: items, - DestinationTableName: p.TableNameMapping[tableName], - SourceTableName: tableName, - UnchangedToastColumns: unchangedToastColumns, + CheckPointID: int64(lsn), + Items: items, + DestinationTableName: p.TableNameMapping[tableName], + SourceTableName: tableName, }, nil } @@ -461,15 +452,15 @@ It takes a tuple and a relation message as input and returns func (p *PostgresCDCSource) convertTupleToMap( tuple *pglogrepl.TupleData, rel *protos.RelationMessage, -) (*model.RecordItems, map[string]bool, error) { +) (*model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map if tuple == nil { - return model.NewRecordItems(), make(map[string]bool), nil + return model.NewRecordItems(), make(map[string]struct{}), nil } // create empty map of string to interface{} items := model.NewRecordItems() - unchangedToastColumns := make(map[string]bool) + unchangedToastColumns := make(map[string]struct{}) for idx, col := range tuple.Columns { colName := rel.Columns[idx].Name @@ -491,7 +482,7 @@ func (p *PostgresCDCSource) convertTupleToMap( } items.AddColumn(colName, data) case 'u': // unchanged toast - unchangedToastColumns[colName] = true + unchangedToastColumns[colName] = struct{}{} default: return nil, nil, fmt.Errorf("unknown column data type: %s", string(col.DataType)) } @@ -599,3 +590,18 @@ func (p *PostgresCDCSource) processRelationMessage( CheckPointID: int64(lsn), }, nil } + +func (p *PostgresCDCSource) compositePKeyToString(req *model.PullRecordsRequest, rec model.Record) (string, error) { + tableName := rec.GetTableName() + pkeyColsMerged := make([]string, 0) + + for _, pkeyCol := range req.TableNameSchemaMapping[tableName].PrimaryKeyColumns { + pkeyColVal, err := rec.GetItems().GetValueByColName(pkeyCol) + if err != nil { + return "", fmt.Errorf("error getting pkey column value: %w", err) + } + pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value)) + } + + return strings.Join(pkeyColsMerged, " "), nil +} diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 43d00baf05..12d6395d29 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -286,7 +286,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S 0, "{}", syncBatchID, - utils.KeysToString(typedRecord.UnchangedToastColumns), + "", }) tableNameRowsMapping[typedRecord.DestinationTableName] += 1 case *model.UpdateRecord: @@ -324,7 +324,7 @@ func (c *PostgresConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S 2, itemsJSON, syncBatchID, - utils.KeysToString(typedRecord.UnchangedToastColumns), + "", }) tableNameRowsMapping[typedRecord.DestinationTableName] += 1 default: diff --git a/flow/connectors/postgres/postgres_cdc_test.go b/flow/connectors/postgres/postgres_cdc_test.go index a1de703ce7..2ef5609359 100644 --- a/flow/connectors/postgres/postgres_cdc_test.go +++ b/flow/connectors/postgres/postgres_cdc_test.go @@ -235,9 +235,9 @@ func (suite *PostgresCDCTestSuite) validateMutatedToastRecords(records []model.R suite.Equal(qvalue.QValueKindString, v.Kind) suite.Equal(65536, len(v.Value.(string))) suite.Equal(3, len(updateRecord.UnchangedToastColumns)) - suite.True(updateRecord.UnchangedToastColumns["lz4_t"]) - suite.True(updateRecord.UnchangedToastColumns["n_b"]) - suite.True(updateRecord.UnchangedToastColumns["lz4_b"]) + suite.Contains(updateRecord.UnchangedToastColumns, "lz4_t") + suite.Contains(updateRecord.UnchangedToastColumns, "n_b") + suite.Contains(updateRecord.UnchangedToastColumns, "lz4_b") suite.IsType(&model.UpdateRecord{}, records[1]) updateRecord = records[1].(*model.UpdateRecord) suite.Equal(srcTableName, updateRecord.SourceTableName) @@ -258,7 +258,7 @@ func (suite *PostgresCDCTestSuite) validateMutatedToastRecords(records []model.R suite.Equal(qvalue.QValueKindBytes, v.Kind) suite.Equal(65536, len(v.Value.([]byte))) suite.Equal(1, len(updateRecord.UnchangedToastColumns)) - suite.True(updateRecord.UnchangedToastColumns["n_t"]) + suite.Contains(updateRecord.UnchangedToastColumns, "n_t") // Test case for records[2] suite.IsType(&model.UpdateRecord{}, records[2]) updateRecord = records[2].(*model.UpdateRecord) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e5b93d75be..b6b8918cb6 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -565,7 +565,7 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra recordType: 0, matchData: "", batchID: syncBatchID, - unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns), + unchangedToastColumns: "", }) tableNameRowsMapping[typedRecord.DestinationTableName] += 1 case *model.UpdateRecord: @@ -605,7 +605,7 @@ func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, ra recordType: 2, matchData: itemsJSON, batchID: syncBatchID, - unchangedToastColumns: utils.KeysToString(typedRecord.UnchangedToastColumns), + unchangedToastColumns: "", }) tableNameRowsMapping[typedRecord.DestinationTableName] += 1 default: @@ -995,7 +995,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( pkeyColName, pkeyColName)) } // TARGET. = SOURCE. AND TARGET. = SOURCE. ... - pkeyColStr := strings.Join(pkeySelectSQLArray, " AND ") + pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") deletePart := "DELETE" if softDelete { @@ -1005,7 +1005,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( mergeStatement := fmt.Sprintf(mergeStatementSQL, destinationTableIdentifier, toVariantColumnName, rawTableIdentifier, normalizeBatchID, syncBatchID, flattenedCastsSQL, fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")), - pkeyColStr, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) + pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) result, err := normalizeRecordsTx.ExecContext(c.ctx, mergeStatement, destinationTableIdentifier) if err != nil { diff --git a/flow/connectors/utils/map.go b/flow/connectors/utils/map.go index 829da94bfb..769ef225a6 100644 --- a/flow/connectors/utils/map.go +++ b/flow/connectors/utils/map.go @@ -2,7 +2,11 @@ package utils import "strings" -func KeysToString(m map[string]bool) string { +func KeysToString(m map[string]struct{}) string { + if m == nil { + return "" + } + var keys []string for k := range m { keys = append(keys, k) diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 422106ea5c..c16020578a 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -90,7 +90,7 @@ func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsTo } entries[7] = qvalue.QValue{ Kind: qvalue.QValueKindString, - Value: KeysToString(typedRecord.UnchangedToastColumns), + Value: "", } req.TableMapping[typedRecord.DestinationTableName] += 1 case *model.UpdateRecord: @@ -149,11 +149,11 @@ func RecordsToRawTableStream(req model.RecordsToStreamRequest) (*model.RecordsTo } entries[7] = qvalue.QValue{ Kind: qvalue.QValueKindString, - Value: KeysToString(typedRecord.UnchangedToastColumns), + Value: "", } req.TableMapping[typedRecord.DestinationTableName] += 1 default: - return nil, fmt.Errorf("record type %T not supported in Snowflake flow connector", typedRecord) + return nil, fmt.Errorf("record type %T not supported", typedRecord) } if first { diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 75e1d35153..400ecc3d01 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1024,3 +1024,232 @@ func (s *PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_simple_cpkey") + dstTableName := "test_simple_cpkey" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + PRIMARY KEY(id,t) + ); + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_cpkey_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 10 rows into the source table + for i := 0; i < 10; i++ { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t) VALUES ($1,$2) + `, srcTableName), i, testValue) + s.NoError(err) + } + fmt.Println("Inserted 10 rows into the source table") + + // verify we got our 10 rows + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + s.compareTableContentsBQ(dstTableName, "id,c1,c2,t") + + _, err := s.pool.Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + s.NoError(err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + s.compareTableContentsBQ(dstTableName, "id,c1,c2,t") + + env.AssertExpectations(s.T()) +} + +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") + dstTableName := "test_cpkey_toast1" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + t2 TEXT, + PRIMARY KEY(id,t) + );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 2, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + rowsTx, err := s.pool.Begin(context.Background()) + s.NoError(err) + + // insert 10 rows into the source table + for i := 0; i < 10; i++ { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) + `, srcTableName), i, testValue) + s.NoError(err) + } + fmt.Println("Inserted 10 rows into the source table") + + _, err = rowsTx.Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + s.NoError(err) + _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + s.NoError(err) + + err = rowsTx.Commit(context.Background()) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") + + env.AssertExpectations(s.T()) +} + +func (s *PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") + dstTableName := "test_cpkey_toast2" + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + t2 TEXT, + PRIMARY KEY(id,t) + );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.bqHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + // insert 10 rows into the source table + for i := 0; i < 10; i++ { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) + `, srcTableName), i, testValue) + s.NoError(err) + } + fmt.Println("Inserted 10 rows into the source table") + + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + s.NoError(err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsBQ(dstTableName, "id,c1,c2,t,t2") + + env.AssertExpectations(s.T()) +} diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 14cfef5690..45666f0369 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -397,3 +397,80 @@ func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { env.AssertExpectations(s.T()) } + +func (s *PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") + dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + t2 TEXT, + PRIMARY KEY(id,t) + );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 4, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + + // insert 10 rows into the source table + for i := 0; i < 10; i++ { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) + `, srcTableName), i, testValue) + s.NoError(err) + } + fmt.Println("Inserted 10 rows into the source table") + + e2e.NormalizeFlowCountQuery(env, connectionGen, 2) + _, err = s.pool.Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + s.NoError(err) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + err = s.comparePGTables(srcTableName, dstTableName, "id,c1,c2,t,t2") + s.NoError(err) + + env.AssertExpectations(s.T()) +} diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 5ef1970f72..f30e078245 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -1045,8 +1045,8 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env) - srcTableName := s.attachSchemaSuffix("test_cpkey_toast_1") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast_1") + srcTableName := s.attachSchemaSuffix("test_cpkey_toast1") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast1") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( @@ -1073,6 +1073,86 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() s.NoError(err) + limits := peerflow.CDCFlowLimits{ + TotalSyncFlows: 2, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert, update and delete rows in the table. + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + rowsTx, err := s.pool.Begin(context.Background()) + s.NoError(err) + + // insert 10 rows into the source table + for i := 0; i < 10; i++ { + testValue := fmt.Sprintf("test_value_%d", i) + _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000)) + `, srcTableName), i, testValue) + s.NoError(err) + } + fmt.Println("Inserted 10 rows into the source table") + + _, err = rowsTx.Exec(context.Background(), + fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) + s.NoError(err) + _, err = rowsTx.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0) + s.NoError(err) + + err = rowsTx.Commit(context.Background()) + s.NoError(err) + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + s.Error(err) + s.Contains(err.Error(), "continue as new") + + // verify our updates and delete happened + s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2", false) + + env.AssertExpectations(s.T()) +} + +func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + srcTableName := s.attachSchemaSuffix("test_cpkey_toast2") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_cpkey_toast2") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT GENERATED ALWAYS AS IDENTITY, + c1 INT GENERATED BY DEFAULT AS IDENTITY, + c2 INT, + t TEXT, + t2 TEXT, + PRIMARY KEY(id,t) + );CREATE OR REPLACE FUNCTION random_string( int ) RETURNS TEXT as $$ + SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz', + round(random() * 30)::integer, 1), '') FROM generate_series(1, $1); + $$ language sql; + `, srcTableName)) + s.NoError(err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.sfHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + s.NoError(err) + limits := peerflow.CDCFlowLimits{ TotalSyncFlows: 4, MaxBatchSize: 100, @@ -1112,7 +1192,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { s.Contains(err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast_1", "id,c1,c2,t,t2", false) + s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2", false) env.AssertExpectations(s.T()) } diff --git a/flow/model/model.go b/flow/model/model.go index 88cc7b7956..08aa786f16 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -208,8 +208,6 @@ type InsertRecord struct { CommitID int64 // Items is a map of column name to value. Items *RecordItems - // unchanged toast columns - UnchangedToastColumns map[string]bool } // Implement Record interface for InsertRecord. @@ -237,7 +235,7 @@ type UpdateRecord struct { // NewItems is a map of column name to value. NewItems *RecordItems // unchanged toast columns - UnchangedToastColumns map[string]bool + UnchangedToastColumns map[string]struct{} } // Implement Record interface for UpdateRecord. @@ -263,8 +261,6 @@ type DeleteRecord struct { CheckPointID int64 // Items is a map of column name to value. Items *RecordItems - // unchanged toast columns - UnchangedToastColumns map[string]bool } // Implement Record interface for DeleteRecord.