From 29d47700db6d719669d2311adee2225633c8411f Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Dec 2023 12:29:52 +0530 Subject: [PATCH 1/5] partial mixed case support for SF --- flow/connectors/snowflake/client.go | 16 ++++++ flow/connectors/snowflake/qrep.go | 11 +++-- flow/connectors/snowflake/qrep_avro_sync.go | 46 +++++++++--------- flow/connectors/snowflake/snowflake.go | 53 +++++++++++--------- flow/connectors/utils/identifiers.go | 21 ++++++++ flow/e2e/snowflake/peer_flow_sf_test.go | 34 ++++++------- flow/e2e/snowflake/qrep_flow_sf_test.go | 54 ++++++++------------- flow/e2e/test_utils.go | 12 +++-- 8 files changed, 144 insertions(+), 103 deletions(-) diff --git a/flow/connectors/snowflake/client.go b/flow/connectors/snowflake/client.go index 0e2be32717..089965d573 100644 --- a/flow/connectors/snowflake/client.go +++ b/flow/connectors/snowflake/client.go @@ -3,6 +3,7 @@ package connsnowflake import ( "context" "fmt" + "strings" "time" "github.com/jmoiron/sqlx" @@ -84,3 +85,18 @@ func (c *SnowflakeConnector) getTableCounts(tables []string) (int64, error) { } return totalRecords, nil } + +func SnowflakeIdentifierNormalize(identifier string) string { + // https://www.alberton.info/dbms_identifiers_and_case_sensitivity.html + // Snowflake follows the SQL standard, but Postgres does the opposite. + // Ergo, we suffer. + if utils.IsLower(identifier) { + return fmt.Sprintf(`"%s"`, strings.ToUpper(identifier)) + } + return fmt.Sprintf(`"%s"`, identifier) +} + +func snowflakeSchemaTableNormalize(schemaTable *utils.SchemaTable) string { + return fmt.Sprintf(`%s.%s`, SnowflakeIdentifierNormalize(schemaTable.Schema), + SnowflakeIdentifierNormalize(schemaTable.Table)) +} diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index 1d81392d71..ab460f01ea 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -77,12 +77,17 @@ func (c *SnowflakeConnector) createMetadataInsertStatement( } func (c *SnowflakeConnector) getTableSchema(tableName string) ([]*sql.ColumnType, error) { + schematable, err := utils.ParseSchemaTable(tableName) + if err != nil { + return nil, fmt.Errorf("failed to parse table '%s'", tableName) + } + //nolint:gosec queryString := fmt.Sprintf(` SELECT * FROM %s LIMIT 0 - `, tableName) + `, snowflakeSchemaTableNormalize(schematable)) rows, err := c.database.QueryContext(c.ctx, queryString) if err != nil { @@ -296,10 +301,10 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string) (*model.ColumnIn } defer rows.Close() + var colName pgtype.Text + var colType pgtype.Text columnMap := map[string]string{} for rows.Next() { - var colName pgtype.Text - var colType pgtype.Text if err := rows.Scan(&colName, &colType); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 30834c2554..86ceed4b5d 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -302,30 +302,36 @@ func (c *SnowflakeConnector) GetCopyTransformation( ) (*CopyInfo, error) { colInfo, colsErr := c.getColsFromTable(dstTableName) if colsErr != nil { - return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr) + return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr) } transformations := make([]string, 0, len(colInfo.ColumnMap)) columnOrder := make([]string, 0, len(colInfo.ColumnMap)) - for colName, colType := range colInfo.ColumnMap { - columnOrder = append(columnOrder, fmt.Sprintf("\"%s\"", colName)) - if colName == syncedAtCol { - transformations = append(transformations, fmt.Sprintf("CURRENT_TIMESTAMP AS \"%s\"", colName)) + for avroColName, colType := range colInfo.ColumnMap { + normalizedColName := SnowflakeIdentifierNormalize(avroColName) + columnOrder = append(columnOrder, normalizedColName) + if avroColName == syncedAtCol { + transformations = append(transformations, fmt.Sprintf("CURRENT_TIMESTAMP AS %s", normalizedColName)) continue } + + if utils.IsUpper(avroColName) { + avroColName = strings.ToLower(avroColName) + } + // Avro files are written with lowercase in mind, so don't normalize it like everything else switch colType { case "GEOGRAPHY": transformations = append(transformations, - fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string, true) AS \"%s\"", strings.ToLower(colName), colName)) + fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string, true) AS %s", avroColName, normalizedColName)) case "GEOMETRY": transformations = append(transformations, - fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string, true) AS \"%s\"", strings.ToLower(colName), colName)) + fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string, true) AS %s", avroColName, normalizedColName)) case "NUMBER": transformations = append(transformations, - fmt.Sprintf("$1:\"%s\" AS \"%s\"", strings.ToLower(colName), colName)) + fmt.Sprintf("$1:\"%s\" AS %s", avroColName, normalizedColName)) default: transformations = append(transformations, - fmt.Sprintf("($1:\"%s\")::%s AS \"%s\"", strings.ToLower(colName), colType, colName)) + fmt.Sprintf("($1:\"%s\")::%s AS %s", avroColName, colType, normalizedColName)) } } transformationSQL := strings.Join(transformations, ",") @@ -361,14 +367,12 @@ func CopyStageToDestination( if err != nil { return fmt.Errorf("failed to get copy transformation: %w", err) } - switch appendMode { - case true: + if appendMode { err := writeHandler.HandleAppendMode(copyTransformation) if err != nil { return fmt.Errorf("failed to handle append mode: %w", err) } - - case false: + } else { upsertKeyCols := config.WriteMode.UpsertKeyColumns err := writeHandler.HandleUpsertMode(allCols, upsertKeyCols, config.WatermarkColumn, config.FlowJobName, copyTransformation) @@ -428,9 +432,11 @@ func NewSnowflakeAvroWriteHandler( func (s *SnowflakeAvroWriteHandler) HandleAppendMode( copyInfo *CopyInfo, ) error { + parsedDstTable, _ := utils.ParseSchemaTable(s.dstTableName) //nolint:gosec copyCmd := fmt.Sprintf("COPY INTO %s(%s) FROM (SELECT %s FROM @%s) %s", - s.dstTableName, copyInfo.columnsSQL, copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) + snowflakeSchemaTableNormalize(parsedDstTable), copyInfo.columnsSQL, + copyInfo.transformationSQL, s.stage, strings.Join(s.copyOpts, ",")) s.connector.logger.Info("running copy command: " + copyCmd) _, err := s.connector.database.ExecContext(s.connector.ctx, copyCmd) if err != nil { @@ -441,13 +447,12 @@ func (s *SnowflakeAvroWriteHandler) HandleAppendMode( return nil } -func GenerateMergeCommand( +func generateUpsertMergeCommand( allCols []string, upsertKeyCols []string, - watermarkCol string, tempTableName string, dstTable string, -) (string, error) { +) string { // all cols are acquired from snowflake schema, so let us try to make upsert key cols match the case // and also the watermark col, then the quoting should be fine caseMatchedCols := map[string]string{} @@ -495,7 +500,7 @@ func GenerateMergeCommand( `, dstTable, selectCmd, upsertKeyClause, updateSetClause, insertColumnsClause, insertValuesClause) - return mergeCmd, nil + return mergeCmd } // HandleUpsertMode handles the upsert mode @@ -530,10 +535,7 @@ func (s *SnowflakeAvroWriteHandler) HandleUpsertMode( } s.connector.logger.Info("copied file from stage " + s.stage + " to temp table " + tempTableName) - mergeCmd, err := GenerateMergeCommand(allCols, upsertKeyCols, watermarkCol, tempTableName, s.dstTableName) - if err != nil { - return fmt.Errorf("failed to generate merge command: %w", err) - } + mergeCmd := generateUpsertMergeCommand(allCols, upsertKeyCols, tempTableName, s.dstTableName) startTime := time.Now() rows, err := s.connector.database.ExecContext(s.connector.ctx, mergeCmd) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index d480aea7cc..164d4b6527 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -435,7 +435,7 @@ func (c *SnowflakeConnector) SetupNormalizedTables( } normalizedTableCreateSQL := generateCreateTableSQLForNormalizedTable( - tableIdentifier, tableSchema, req.SoftDeleteColName, req.SyncedAtColName) + normalizedSchemaTable, tableSchema, req.SoftDeleteColName, req.SyncedAtColName) _, err = c.database.ExecContext(c.ctx, normalizedTableCreateSQL) if err != nil { return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err) @@ -562,8 +562,8 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( qrepConfig := &protos.QRepConfig{ StagingPath: "", FlowJobName: req.FlowJobName, - DestinationTableIdentifier: fmt.Sprintf("%s.%s", c.metadataSchema, - rawTableIdentifier), + DestinationTableIdentifier: strings.ToLower(fmt.Sprintf("%s.%s", c.metadataSchema, + rawTableIdentifier)), } avroSyncer := NewSnowflakeAvroSyncMethod(qrepConfig, c) destinationTableSchema, err := c.getTableSchema(qrepConfig.DestinationTableIdentifier) @@ -759,28 +759,28 @@ func (c *SnowflakeConnector) checkIfTableExists(schemaIdentifier string, tableId } func generateCreateTableSQLForNormalizedTable( - sourceTableIdentifier string, + dstSchemaTable *utils.SchemaTable, sourceTableSchema *protos.TableSchema, softDeleteColName string, syncedAtColName string, ) string { createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2) for columnName, genericColumnType := range sourceTableSchema.Columns { - columnNameUpper := strings.ToUpper(columnName) + normalizedColName := SnowflakeIdentifierNormalize(columnName) sfColType, err := qValueKindToSnowflakeType(qvalue.QValueKind(genericColumnType)) if err != nil { slog.Warn(fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType), slog.Any("error", err)) continue } - createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`"%s" %s,`, columnNameUpper, sfColType)) + createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`%s %s,`, normalizedColName, sfColType)) } // add a _peerdb_is_deleted column to the normalized table // this is boolean default false, and is used to mark records as deleted if softDeleteColName != "" { createTableSQLArray = append(createTableSQLArray, - fmt.Sprintf(`"%s" BOOLEAN DEFAULT FALSE,`, softDeleteColName)) + fmt.Sprintf(`%s BOOLEAN DEFAULT FALSE,`, softDeleteColName)) } // add a _peerdb_synced column to the normalized table @@ -788,21 +788,21 @@ func generateCreateTableSQLForNormalizedTable( // default value is the current timestamp (snowflake) if syncedAtColName != "" { createTableSQLArray = append(createTableSQLArray, - fmt.Sprintf(`"%s" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,`, syncedAtColName)) + fmt.Sprintf(`%s TIMESTAMP DEFAULT CURRENT_TIMESTAMP,`, syncedAtColName)) } // add composite primary key to the table if len(sourceTableSchema.PrimaryKeyColumns) > 0 { - primaryKeyColsUpperQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) + normalizedPrimaryKeyCols := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns)) for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns { - primaryKeyColsUpperQuoted = append(primaryKeyColsUpperQuoted, - fmt.Sprintf(`"%s"`, strings.ToUpper(primaryKeyCol))) + normalizedPrimaryKeyCols = append(normalizedPrimaryKeyCols, + SnowflakeIdentifierNormalize(primaryKeyCol)) } createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),", - strings.TrimSuffix(strings.Join(primaryKeyColsUpperQuoted, ","), ","))) + strings.TrimSuffix(strings.Join(normalizedPrimaryKeyCols, ","), ","))) } - return fmt.Sprintf(createNormalizedTableSQL, sourceTableIdentifier, + return fmt.Sprintf(createNormalizedTableSQL, snowflakeSchemaTableNormalize(dstSchemaTable), strings.TrimSuffix(strings.Join(createTableSQLArray, ""), ",")) } @@ -821,6 +821,10 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( normalizeReq *model.NormalizeRecordsRequest, ) (int64, error) { normalizedTableSchema := c.tableSchemaMapping[destinationTableIdentifier] + parsedDstTable, err := utils.ParseSchemaTable(destinationTableIdentifier) + if err != nil { + return 0, fmt.Errorf("unable to parse destination table '%s'", parsedDstTable) + } columnNames := maps.Keys(normalizedTableSchema.Columns) flattenedCastsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns)) @@ -832,7 +836,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( genericColumnType, err) } - targetColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)) + targetColumnName := SnowflakeIdentifierNormalize(columnName) switch qvalue.QValueKind(genericColumnType) { case qvalue.QValueKindBytes, qvalue.QValueKindBit: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("BASE64_DECODE_BINARY(%s:\"%s\") "+ @@ -865,7 +869,7 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( quotedUpperColNames := make([]string, 0, len(columnNames)) for _, columnName := range columnNames { - quotedUpperColNames = append(quotedUpperColNames, fmt.Sprintf(`"%s"`, strings.ToUpper(columnName))) + quotedUpperColNames = append(quotedUpperColNames, SnowflakeIdentifierNormalize(columnName)) } // append synced_at column quotedUpperColNames = append(quotedUpperColNames, @@ -876,8 +880,8 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( insertValuesSQLArray := make([]string, 0, len(columnNames)) for _, columnName := range columnNames { - quotedUpperColumnName := fmt.Sprintf(`"%s"`, strings.ToUpper(columnName)) - insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s", quotedUpperColumnName)) + normalizedColName := SnowflakeIdentifierNormalize(columnName) + insertValuesSQLArray = append(insertValuesSQLArray, fmt.Sprintf("SOURCE.%s", normalizedColName)) } // fill in synced_at column insertValuesSQLArray = append(insertValuesSQLArray, "CURRENT_TIMESTAMP") @@ -899,10 +903,13 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( } updateStringToastCols := strings.Join(updateStatementsforToastCols, " ") + normalizedpkeyColsArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) pkeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns)) for _, pkeyColName := range normalizedTableSchema.PrimaryKeyColumns { + normalizedPkeyColName := SnowflakeIdentifierNormalize(pkeyColName) + normalizedpkeyColsArray = append(normalizedpkeyColsArray, normalizedPkeyColName) pkeySelectSQLArray = append(pkeySelectSQLArray, fmt.Sprintf("TARGET.%s = SOURCE.%s", - pkeyColName, pkeyColName)) + normalizedPkeyColName, normalizedPkeyColName)) } // TARGET. = SOURCE. AND TARGET. = SOURCE. ... pkeySelectSQL := strings.Join(pkeySelectSQLArray, " AND ") @@ -916,9 +923,9 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement( } } - mergeStatement := fmt.Sprintf(mergeStatementSQL, destinationTableIdentifier, toVariantColumnName, - rawTableIdentifier, normalizeBatchID, syncBatchID, flattenedCastsSQL, - fmt.Sprintf("(%s)", strings.Join(normalizedTableSchema.PrimaryKeyColumns, ",")), + mergeStatement := fmt.Sprintf(mergeStatementSQL, snowflakeSchemaTableNormalize(parsedDstTable), + toVariantColumnName, rawTableIdentifier, normalizeBatchID, syncBatchID, flattenedCastsSQL, + fmt.Sprintf("(%s)", strings.Join(normalizedpkeyColsArray, ",")), pkeySelectSQL, insertColumnsSQL, insertValuesSQL, updateStringToastCols, deletePart) startTime := time.Now() @@ -1045,8 +1052,8 @@ func (c *SnowflakeConnector) generateUpdateStatements( otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)+2) for _, colName := range otherCols { - quotedUpperColName := fmt.Sprintf(`"%s"`, strings.ToUpper(colName)) - tmpArray = append(tmpArray, fmt.Sprintf("%s = SOURCE.%s", quotedUpperColName, quotedUpperColName)) + normalizedColName := SnowflakeIdentifierNormalize(colName) + tmpArray = append(tmpArray, fmt.Sprintf("%s = SOURCE.%s", normalizedColName, normalizedColName)) } // set the synced at column to the current timestamp diff --git a/flow/connectors/utils/identifiers.go b/flow/connectors/utils/identifiers.go index 2ae919488d..5318605a93 100644 --- a/flow/connectors/utils/identifiers.go +++ b/flow/connectors/utils/identifiers.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "strings" + "unicode" ) func QuoteIdentifier(identifier string) string { @@ -28,3 +29,23 @@ func ParseSchemaTable(tableName string) (*SchemaTable, error) { return &SchemaTable{schema, table}, nil } + +// I think these only work with ASCII? +func IsUpper(s string) bool { + for _, r := range s { + if !unicode.IsUpper(r) && unicode.IsLetter(r) { + return false + } + } + return true +} + +// I think these only work with ASCII? +func IsLower(s string) bool { + for _, r := range s { + if !unicode.IsLower(r) && unicode.IsLetter(r) { + return false + } + } + return true +} diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 2f02723d5d..d7e3e4315b 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -399,7 +399,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`, false) + s.compareTableContentsSF("test_toast_sf_1", `id,t1,t2,k`) env.AssertExpectations(s.t) } @@ -466,7 +466,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Nochanges_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`, false) + s.compareTableContentsSF("test_toast_sf_2", `id,t1,t2,k`) env.AssertExpectations(s.t) wg.Wait() @@ -541,7 +541,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`, false) + s.compareTableContentsSF("test_toast_sf_3", `id,t1,t2,k`) env.AssertExpectations(s.t) } @@ -607,7 +607,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`, false) + s.compareTableContentsSF("test_toast_sf_4", `id,t1,k`) env.AssertExpectations(s.t) } @@ -673,7 +673,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`, false) + s.compareTableContentsSF("test_toast_sf_5", `id,t1,t2,k`) env.AssertExpectations(s.t) } @@ -871,7 +871,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) + s.compareTableContentsSF("test_simple_schema_changes", "id,c1") // alter source table, add column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -900,7 +900,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2", false) + s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c2") // alter source table, add column c3, drop column c2 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -930,7 +930,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3", false) + s.compareTableContentsSF("test_simple_schema_changes", "id,c1,c3") // alter source table, drop column c3 and insert another row. _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` @@ -960,7 +960,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }) require.NoError(s.t, err) s.Equal(expectedTableSchema, output.TableNameSchemaMapping[dstTableName]) - s.compareTableContentsSF("test_simple_schema_changes", "id,c1", false) + s.compareTableContentsSF("test_simple_schema_changes", "id,c1") }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) @@ -1024,7 +1024,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { // verify we got our 10 rows e2e.NormalizeFlowCountQuery(env, connectionGen, 2) - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) + s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1) @@ -1043,7 +1043,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t", false) + s.compareTableContentsSF("test_simple_cpkey", "id,c1,c2,t") env.AssertExpectations(s.t) } @@ -1119,7 +1119,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2", false) + s.compareTableContentsSF("test_cpkey_toast1", "id,c1,c2,t,t2") env.AssertExpectations(s.t) } @@ -1191,7 +1191,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2", false) + s.compareTableContentsSF("test_cpkey_toast2", "id,c1,c2,t,t2") env.AssertExpectations(s.t) } @@ -1357,7 +1357,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { wg.Wait() // verify our updates and delete happened - s.compareTableContentsSF("test_softdel", "id,c1,c2,t", false) + s.compareTableContentsSF("test_softdel", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) @@ -1440,7 +1440,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_softdel_iud", "id,c1,c2,t", false) + s.compareTableContentsSF("test_softdel_iud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) @@ -1527,7 +1527,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_softdel_ud", "id,c1,c2,t", false) + s.compareTableContentsSF("test_softdel_ud", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) @@ -1602,7 +1602,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { require.Contains(s.t, err.Error(), "continue as new") // verify our updates and delete happened - s.compareTableContentsSF("test_softdel_iad", "id,c1,c2,t", false) + s.compareTableContentsSF("test_softdel_iad", "id,c1,c2,t") newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED = TRUE`, dstTableName) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 4499f7f437..0c5b88a0d6 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,7 +3,6 @@ package e2e_snowflake import ( "context" "fmt" - "log/slog" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" @@ -20,34 +19,22 @@ func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) require.NoError(s.t, err) } -func (s PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { - schema := e2e.GetOwnersSchema() - err := s.sfHelper.CreateTable(dstTable, schema) - // fail if table creation fails - if err != nil { - require.FailNow(s.t, "unable to create table on snowflake", err) - } - - slog.Info(fmt.Sprintf("created table on snowflake: %s.%s.", s.sfHelper.testSchemaName, dstTable)) +func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName, selector string) { + s.compareTableContentsWithDiffSelectorsSF(tableName, selector, selector) } -func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { +func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableName, pgSelector, sfSelector string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", selector, s.pgSuffix, tableName), + fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", pgSelector, s.pgSuffix, tableName), ) require.NoError(s.t, err) // read rows from destination table qualifiedTableName := fmt.Sprintf("%s.%s.%s", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) - var sfSelQuery string - if caseSensitive { - sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, selector, qualifiedTableName) - } else { - sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, selector, qualifiedTableName) - } + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) fmt.Printf("running query on snowflake: %s\n", sfSelQuery) sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) @@ -64,7 +51,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { tblName := "test_qrep_flow_avro_sf" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -81,6 +67,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { false, "", ) + qrepConfig.SetupWatermarkTableOnDestination = true require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -91,8 +78,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { err = env.GetWorkflowError() require.NoError(s.t, err) - sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + sel := e2e.GetOwnersSelectorStringsSF() + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) env.AssertExpectations(s.t) } @@ -105,7 +92,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() tblName := "test_qrep_flow_avro_sf_ups" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -126,6 +112,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } + qrepConfig.SetupWatermarkTableOnDestination = true require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -136,8 +123,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() err = env.GetWorkflowError() require.NoError(s.t, err) - sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + sel := e2e.GetOwnersSelectorStringsSF() + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) env.AssertExpectations(s.t) } @@ -150,7 +137,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { tblName := "test_qrep_flow_avro_sf_s3" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -169,6 +155,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { ) require.NoError(s.t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) + qrepConfig.SetupWatermarkTableOnDestination = true e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -178,8 +165,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { err = env.GetWorkflowError() require.NoError(s.t, err) - sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + sel := e2e.GetOwnersSelectorStringsSF() + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) env.AssertExpectations(s.t) } @@ -192,7 +179,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { tblName := "test_qrep_flow_avro_sf_ups_xmin" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -214,6 +200,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { UpsertKeyColumns: []string{"id"}, } qrepConfig.WatermarkColumn = "xmin" + qrepConfig.SetupWatermarkTableOnDestination = true require.NoError(s.t, err) e2e.RunXminFlowWorkflow(env, qrepConfig) @@ -224,8 +211,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { err = env.GetWorkflowError() require.NoError(s.t, err) - sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + sel := e2e.GetOwnersSelectorStringsSF() + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) env.AssertExpectations(s.t) } @@ -238,7 +225,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() tblName := "test_qrep_flow_avro_sf_s3_int" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -261,6 +247,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() ) require.NoError(s.t, err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) + qrepConfig.SetupWatermarkTableOnDestination = true e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -270,8 +257,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() err = env.GetWorkflowError() require.NoError(s.t, err) - sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + sel := e2e.GetOwnersSelectorStringsSF() + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) env.AssertExpectations(s.t) } @@ -304,6 +291,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() { WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } + qrepConfig.SetupWatermarkTableOnDestination = true require.NoError(s.t, err) e2e.RunQrepFlowWorkflow(env, qrepConfig) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index beb4ae04b9..11be8c6ee7 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -11,6 +11,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/activities" + connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" @@ -360,14 +361,15 @@ func GetOwnersSchema() *model.QRecordSchema { } } -func GetOwnersSelectorString() string { +func GetOwnersSelectorStringsSF() [2]string { schema := GetOwnersSchema() - fields := make([]string, 0, len(schema.Fields)) + pgFields := make([]string, 0, len(schema.Fields)) + sfFields := make([]string, 0, len(schema.Fields)) for _, field := range schema.Fields { - // append quoted field name - fields = append(fields, fmt.Sprintf(`"%s"`, field.Name)) + pgFields = append(pgFields, fmt.Sprintf(`"%s"`, field.Name)) + sfFields = append(sfFields, connsnowflake.SnowflakeIdentifierNormalize(field.Name)) } - return strings.Join(fields, ",") + return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")} } func NewTemporalTestWorkflowEnvironment() *testsuite.TestWorkflowEnvironment { From 8a304af709be1b98349e27292fd3dc2f56ca2fbd Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Dec 2023 20:18:34 +0530 Subject: [PATCH 2/5] added partial mixed case test for SF --- flow/e2e/snowflake/peer_flow_sf_test.go | 90 ++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 8 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index d7e3e4315b..c9215ef509 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -35,22 +35,21 @@ type PeerFlowE2ETestSuiteSF struct { func TestPeerFlowE2ETestSuiteSF(t *testing.T) { e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - err := e2e.TearDownPostgres(s.pool, s.pgSuffix) - if err != nil { - slog.Error("failed to tear down Postgres", slog.Any("error", err)) - s.FailNow() - } + // err := e2e.TearDownPostgres(s.pool, s.pgSuffix) + // if err != nil { + // slog.Error("failed to tear down Postgres", slog.Any("error", err)) + // s.FailNow() + // } if s.sfHelper != nil { - err = s.sfHelper.Cleanup() + err := s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) s.FailNow() } } - err = s.connector.Close() - + err := s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) s.FailNow() @@ -1610,3 +1609,78 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { require.NoError(s.t, err) s.Equal(0, numNewRows) } + +func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { + env := e2e.NewTemporalTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(s.t, env) + + srcTableName := s.attachSchemaSuffix("testMixedCase") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_mixed_case") + + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( + "pulseArmor" SERIAL PRIMARY KEY, + "highGold" TEXT NOT NULL, + "eVe" TEXT NOT NULL + ); + `, s.pgSuffix, "testMixedCase")) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("test_mixed_case"), + TableNameMapping: map[string]string{srcTableName: dstTableName}, + PostgresPort: e2e.PostgresPort, + Destination: s.sfHelper.Peer, + } + + flowConnConfig, err := connectionGen.GenerateFlowConnectionConfigs() + require.NoError(s.t, err) + + limits := peerflow.CDCFlowLimits{ + ExitAfterRecords: 20, + MaxBatchSize: 100, + } + + // in a separate goroutine, wait for PeerFlowStatusQuery to finish setup + // and then insert 20 rows into the source table + go func() { + e2e.SetupCDCFlowStatusQuery(env, connectionGen) + // insert 20 rows into the source table + for i := 0; i < 20; i++ { + testKey := fmt.Sprintf("test_key_%d", i) + testValue := fmt.Sprintf("test_value_%d", i) + _, err = s.pool.Exec(context.Background(), fmt.Sprintf(` + INSERT INTO e2e_test_%s."%s"("highGold","eVe") VALUES ($1, $2) + `, s.pgSuffix, "testMixedCase"), testKey, testValue) + require.NoError(s.t, err) + } + fmt.Println("Inserted 20 rows into the source table") + }() + + env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + err = env.GetWorkflowError() + + // allow only continue as new error + require.Contains(s.t, err.Error(), "continue as new") + + count, err := s.sfHelper.CountRows("test_mixed_case") + require.NoError(s.t, err) + s.Equal(20, count) + + // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago + // it should match the count. + newerSyncedAtQuery := fmt.Sprintf(` + SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' + `, dstTableName) + numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) + require.NoError(s.t, err) + s.Equal(20, numNewRows) + + // TODO: verify that the data is correctly synced to the destination table + // on the Snowflake side + + env.AssertExpectations(s.t) +} From 93ae0f535f21db7a97cb3c9b380f37b2a6aee736 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Dec 2023 20:50:50 +0530 Subject: [PATCH 3/5] oops uncommenting --- flow/e2e/snowflake/peer_flow_sf_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index c9215ef509..b4af8b8549 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -35,21 +35,21 @@ type PeerFlowE2ETestSuiteSF struct { func TestPeerFlowE2ETestSuiteSF(t *testing.T) { e2eshared.GotSuite(t, SetupSuite, func(s PeerFlowE2ETestSuiteSF) { - // err := e2e.TearDownPostgres(s.pool, s.pgSuffix) - // if err != nil { - // slog.Error("failed to tear down Postgres", slog.Any("error", err)) - // s.FailNow() - // } + err := e2e.TearDownPostgres(s.pool, s.pgSuffix) + if err != nil { + slog.Error("failed to tear down Postgres", slog.Any("error", err)) + s.FailNow() + } if s.sfHelper != nil { - err := s.sfHelper.Cleanup() + err = s.sfHelper.Cleanup() if err != nil { slog.Error("failed to tear down Snowflake", slog.Any("error", err)) s.FailNow() } } - err := s.connector.Close() + err = s.connector.Close() if err != nil { slog.Error("failed to close Snowflake connector", slog.Any("error", err)) s.FailNow() From 22823bd6e902552cdd295729d69b4af0fe4e1085 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Dec 2023 22:03:00 +0530 Subject: [PATCH 4/5] mixedCase is not good --- flow/e2e/snowflake/peer_flow_sf_test.go | 36 +++++++++---------------- flow/e2e/snowflake/qrep_flow_sf_test.go | 27 ++++++++++++------- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index b4af8b8549..bb0729aabe 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -41,13 +41,13 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { s.FailNow() } - if s.sfHelper != nil { - err = s.sfHelper.Cleanup() - if err != nil { - slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - s.FailNow() - } - } + // if s.sfHelper != nil { + // err = s.sfHelper.Cleanup() + // if err != nil { + // slog.Error("failed to tear down Snowflake", slog.Any("error", err)) + // s.FailNow() + // } + // } err = s.connector.Close() if err != nil { @@ -1615,13 +1615,14 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { e2e.RegisterWorkflowsAndActivities(s.t, env) srcTableName := s.attachSchemaSuffix("testMixedCase") - dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_mixed_case") + dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "testMixedCase") _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" ( "pulseArmor" SERIAL PRIMARY KEY, "highGold" TEXT NOT NULL, - "eVe" TEXT NOT NULL + "eVe" TEXT NOT NULL, + id SERIAL ); `, s.pgSuffix, "testMixedCase")) require.NoError(s.t, err) @@ -1666,21 +1667,8 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { // allow only continue as new error require.Contains(s.t, err.Error(), "continue as new") - count, err := s.sfHelper.CountRows("test_mixed_case") - require.NoError(s.t, err) - s.Equal(20, count) - - // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago - // it should match the count. - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM %s WHERE _PEERDB_SYNCED_AT > CURRENT_TIMESTAMP() - INTERVAL '30 MINUTE' - `, dstTableName) - numNewRows, err := s.sfHelper.RunIntQuery(newerSyncedAtQuery) - require.NoError(s.t, err) - s.Equal(20, numNewRows) - - // TODO: verify that the data is correctly synced to the destination table - // on the Snowflake side + s.compareTableContentsWithDiffSelectorsSF("testMixedCase", `"pulseArmor","highGold","eVe",id`, + `"pulseArmor","highGold","eVe",id`, true) env.AssertExpectations(s.t) } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 0c5b88a0d6..a1fe767874 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -20,20 +20,29 @@ func (s PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, numRows int) } func (s PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName, selector string) { - s.compareTableContentsWithDiffSelectorsSF(tableName, selector, selector) + s.compareTableContentsWithDiffSelectorsSF(tableName, selector, selector, false) } -func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableName, pgSelector, sfSelector string) { +func (s PeerFlowE2ETestSuiteSF) compareTableContentsWithDiffSelectorsSF(tableName, pgSelector, sfSelector string, + tableCaseSensitive bool, +) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") pgQueryExecutor.SetTestEnv(true) pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT %s FROM e2e_test_%s.%s ORDER BY id", pgSelector, s.pgSuffix, tableName), + fmt.Sprintf(`SELECT %s FROM e2e_test_%s."%s" ORDER BY id`, pgSelector, s.pgSuffix, tableName), ) require.NoError(s.t, err) // read rows from destination table - qualifiedTableName := fmt.Sprintf("%s.%s.%s", s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + + var qualifiedTableName string + if tableCaseSensitive { + qualifiedTableName = fmt.Sprintf(`%s.%s."%s"`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + } else { + qualifiedTableName = fmt.Sprintf(`%s.%s.%s`, s.sfHelper.testDatabaseName, s.sfHelper.testSchemaName, tableName) + } + sfSelQuery := fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, sfSelector, qualifiedTableName) fmt.Printf("running query on snowflake: %s\n", sfSelQuery) @@ -79,7 +88,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) env.AssertExpectations(s.t) } @@ -124,7 +133,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) env.AssertExpectations(s.t) } @@ -166,7 +175,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) env.AssertExpectations(s.t) } @@ -212,7 +221,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) env.AssertExpectations(s.t) } @@ -258,7 +267,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() require.NoError(s.t, err) sel := e2e.GetOwnersSelectorStringsSF() - s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1]) + s.compareTableContentsWithDiffSelectorsSF(tblName, sel[0], sel[1], false) env.AssertExpectations(s.t) } From 1f1dd4274091064c62bb842e16255e779c681e69 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Dec 2023 22:06:34 +0530 Subject: [PATCH 5/5] uncommenting oops pt.2 --- flow/e2e/snowflake/peer_flow_sf_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index bb0729aabe..fc7e134784 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -41,13 +41,13 @@ func TestPeerFlowE2ETestSuiteSF(t *testing.T) { s.FailNow() } - // if s.sfHelper != nil { - // err = s.sfHelper.Cleanup() - // if err != nil { - // slog.Error("failed to tear down Snowflake", slog.Any("error", err)) - // s.FailNow() - // } - // } + if s.sfHelper != nil { + err = s.sfHelper.Cleanup() + if err != nil { + slog.Error("failed to tear down Snowflake", slog.Any("error", err)) + s.FailNow() + } + } err = s.connector.Close() if err != nil {