From 1834283a9f2b68d51b6194f44c4f00b5d78cf01a Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 30 Oct 2023 00:14:15 +0530 Subject: [PATCH] fixing tests pt.1 --- flow/connectors/snowflake/qrep.go | 10 +---- .../qrep_avro_consolidate_handler.go | 17 +++++---- flow/connectors/snowflake/qrep_avro_sync.go | 2 +- flow/e2e/snowflake/qrep_flow_sf_test.go | 37 +++++++------------ 4 files changed, 26 insertions(+), 40 deletions(-) diff --git a/flow/connectors/snowflake/qrep.go b/flow/connectors/snowflake/qrep.go index ed9a81542b..db75fc61a4 100644 --- a/flow/connectors/snowflake/qrep.go +++ b/flow/connectors/snowflake/qrep.go @@ -258,7 +258,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT: return fmt.Errorf("multi-insert sync mode not supported for snowflake") case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO: - colInfo, err := c.getColsFromTable(destTable, true) + colInfo, err := c.getNormalizedColsFromTable(destTable) if err != nil { log.WithFields(log.Fields{ "flowName": config.FlowJobName, @@ -288,8 +288,7 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error { return c.dropStage(config.StagingPath, config.FlowJobName) } -func (c *SnowflakeConnector) getColsFromTable(tableName string, - correctForAvro bool) (*model.ColumnInformation, error) { +func (c *SnowflakeConnector) getNormalizedColsFromTable(tableName string) (*model.ColumnInformation, error) { // parse the table name to get the schema and table name components, err := utils.ParseSchemaTable(tableName) if err != nil { @@ -316,11 +315,6 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string, if err := rows.Scan(&colName, &colType); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } - // Avro file was written with caseless identifiers being lowercase, as the information is fetched from Postgres - // Snowflake retrieves the column information with caseless identifiers being UPPERCASE - if correctForAvro && strings.ToUpper(colName) == colName { - colName = strings.ToLower(colName) - } columnMap[colName] = colType } var cols []string diff --git a/flow/connectors/snowflake/qrep_avro_consolidate_handler.go b/flow/connectors/snowflake/qrep_avro_consolidate_handler.go index 596cae1c68..7d2ad8a753 100644 --- a/flow/connectors/snowflake/qrep_avro_consolidate_handler.go +++ b/flow/connectors/snowflake/qrep_avro_consolidate_handler.go @@ -41,26 +41,29 @@ func NewSnowflakeAvroConsolidateHandler( func (s *SnowflakeAvroConsolidateHandler) generateCopyTransformation() *CopyInfo { var transformations []string var columnOrder []string - for colName, colType := range s.colInfo.ColumnMap { - if colName == "_PEERDB_IS_DELETED" { + for avroColName, colType := range s.colInfo.ColumnMap { + if avroColName == "_PEERDB_IS_DELETED" { continue } - normalizedColName := snowflakeIdentifierNormalize(colName) + if strings.ToUpper(avroColName) == avroColName { + avroColName = strings.ToLower(avroColName) + } + normalizedColName := snowflakeIdentifierNormalize(avroColName) columnOrder = append(columnOrder, normalizedColName) // Avro files are written with lowercase in mind, so don't normalize it like everything else switch colType { case "GEOGRAPHY": transformations = append(transformations, - fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string, true) AS %s", colName, normalizedColName)) + fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string, true) AS %s", avroColName, normalizedColName)) case "GEOMETRY": transformations = append(transformations, - fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string, true) AS %s", colName, normalizedColName)) + fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string, true) AS %s", avroColName, normalizedColName)) case "NUMBER": transformations = append(transformations, - fmt.Sprintf("$1:\"%s\" AS %s", colName, normalizedColName)) + fmt.Sprintf("$1:\"%s\" AS %s", avroColName, normalizedColName)) default: transformations = append(transformations, - fmt.Sprintf("($1:\"%s\")::%s AS %s", colName, colType, normalizedColName)) + fmt.Sprintf("($1:\"%s\")::%s AS %s", avroColName, colType, normalizedColName)) } } transformationSQL := strings.Join(transformations, ",") diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 9853b19c48..0e52ffba86 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -78,7 +78,7 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords( "flowName": flowJobName, }).Infof("Created stage %s", stage) - colInfo, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier, true) + colInfo, err := s.connector.getNormalizedColsFromTable(s.config.DestinationTableIdentifier) if err != nil { return 0, err } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index cdcfaeca98..5e39aceb12 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,6 +3,7 @@ package e2e_snowflake import ( "context" "fmt" + "strings" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" @@ -18,18 +19,6 @@ func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int s.NoError(err) } -func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { - schema := e2e.GetOwnersSchema() - err := s.sfHelper.CreateTable(dstTable, schema) - - // fail if table creation fails - if err != nil { - s.FailNow("unable to create table on snowflake", err) - } - - fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) -} - func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") @@ -43,9 +32,9 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) var sfSelQuery string if caseSensitive { - sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, selector, qualifiedTableName) + sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, strings.ToUpper(selector), qualifiedTableName) } else { - sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, selector, qualifiedTableName) + sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, strings.ToUpper(selector), qualifiedTableName) } fmt.Printf("running query on snowflake: %s\n", sfSelQuery) @@ -63,7 +52,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { tblName := "test_qrep_flow_avro_sf" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -79,6 +67,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { s.sfHelper.Peer, "", ) + qrepConfig.SetupWatermarkTableOnDestination = true s.NoError(err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -91,7 +80,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() { s.NoError(err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(tblName, sel, false) env.AssertExpectations(s.T()) } @@ -104,7 +93,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() tblName := "test_qrep_flow_avro_sf_ups" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -124,6 +112,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, UpsertKeyColumns: []string{"id"}, } + qrepConfig.SetupWatermarkTableOnDestination = true s.NoError(err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -136,7 +125,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() s.NoError(err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(tblName, sel, false) env.AssertExpectations(s.T()) } @@ -149,7 +138,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { tblName := "test_qrep_flow_avro_sf_s3" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -165,6 +153,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.sfHelper.Peer, "", ) + qrepConfig.SetupWatermarkTableOnDestination = true s.NoError(err) qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) @@ -177,7 +166,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { s.NoError(err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(tblName, sel, false) env.AssertExpectations(s.T()) } @@ -189,7 +178,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { tblName := "test_qrep_flow_avro_sf_ups_xmin" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -210,6 +198,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { UpsertKeyColumns: []string{"id"}, } qrepConfig.WatermarkColumn = "xmin" + qrepConfig.SetupWatermarkTableOnDestination = true s.NoError(err) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -221,7 +210,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { s.NoError(err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(tblName, sel, false) env.AssertExpectations(s.T()) } @@ -234,7 +223,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( tblName := "test_qrep_flow_avro_sf_s3_int" s.setupSourceTable(tblName, numRows) - s.setupSFDestinationTable(tblName) dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) @@ -254,6 +242,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( "", ) s.NoError(err) + qrepConfig.SetupWatermarkTableOnDestination = true qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New()) e2e.RunQrepFlowWorkflow(env, qrepConfig) @@ -265,7 +254,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( s.NoError(err) sel := e2e.GetOwnersSelectorString() - s.compareTableContentsSF(tblName, sel, true) + s.compareTableContentsSF(tblName, sel, false) env.AssertExpectations(s.T()) }