Skip to content

Commit

Permalink
fixing tests pt.1
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 29, 2023
1 parent d2c930d commit 1834283
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 40 deletions.
10 changes: 2 additions & 8 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *SnowflakeConnector) ConsolidateQRepPartitions(config *protos.QRepConfig
case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT:
return fmt.Errorf("multi-insert sync mode not supported for snowflake")
case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO:
colInfo, err := c.getColsFromTable(destTable, true)
colInfo, err := c.getNormalizedColsFromTable(destTable)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand Down Expand Up @@ -288,8 +288,7 @@ func (c *SnowflakeConnector) CleanupQRepFlow(config *protos.QRepConfig) error {
return c.dropStage(config.StagingPath, config.FlowJobName)
}

func (c *SnowflakeConnector) getColsFromTable(tableName string,
correctForAvro bool) (*model.ColumnInformation, error) {
func (c *SnowflakeConnector) getNormalizedColsFromTable(tableName string) (*model.ColumnInformation, error) {
// parse the table name to get the schema and table name
components, err := utils.ParseSchemaTable(tableName)
if err != nil {
Expand All @@ -316,11 +315,6 @@ func (c *SnowflakeConnector) getColsFromTable(tableName string,
if err := rows.Scan(&colName, &colType); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
// Avro file was written with caseless identifiers being lowercase, as the information is fetched from Postgres
// Snowflake retrieves the column information with caseless identifiers being UPPERCASE
if correctForAvro && strings.ToUpper(colName) == colName {
colName = strings.ToLower(colName)
}
columnMap[colName] = colType
}
var cols []string
Expand Down
17 changes: 10 additions & 7 deletions flow/connectors/snowflake/qrep_avro_consolidate_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,29 @@ func NewSnowflakeAvroConsolidateHandler(
func (s *SnowflakeAvroConsolidateHandler) generateCopyTransformation() *CopyInfo {
var transformations []string
var columnOrder []string
for colName, colType := range s.colInfo.ColumnMap {
if colName == "_PEERDB_IS_DELETED" {
for avroColName, colType := range s.colInfo.ColumnMap {
if avroColName == "_PEERDB_IS_DELETED" {
continue
}
normalizedColName := snowflakeIdentifierNormalize(colName)
if strings.ToUpper(avroColName) == avroColName {
avroColName = strings.ToLower(avroColName)
}
normalizedColName := snowflakeIdentifierNormalize(avroColName)
columnOrder = append(columnOrder, normalizedColName)
// Avro files are written with lowercase in mind, so don't normalize it like everything else
switch colType {
case "GEOGRAPHY":
transformations = append(transformations,
fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string, true) AS %s", colName, normalizedColName))
fmt.Sprintf("TO_GEOGRAPHY($1:\"%s\"::string, true) AS %s", avroColName, normalizedColName))
case "GEOMETRY":
transformations = append(transformations,
fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string, true) AS %s", colName, normalizedColName))
fmt.Sprintf("TO_GEOMETRY($1:\"%s\"::string, true) AS %s", avroColName, normalizedColName))
case "NUMBER":
transformations = append(transformations,
fmt.Sprintf("$1:\"%s\" AS %s", colName, normalizedColName))
fmt.Sprintf("$1:\"%s\" AS %s", avroColName, normalizedColName))
default:
transformations = append(transformations,
fmt.Sprintf("($1:\"%s\")::%s AS %s", colName, colType, normalizedColName))
fmt.Sprintf("($1:\"%s\")::%s AS %s", avroColName, colType, normalizedColName))
}
}
transformationSQL := strings.Join(transformations, ",")
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *SnowflakeAvroSyncHandler) SyncRecords(
"flowName": flowJobName,
}).Infof("Created stage %s", stage)

colInfo, err := s.connector.getColsFromTable(s.config.DestinationTableIdentifier, true)
colInfo, err := s.connector.getNormalizedColsFromTable(s.config.DestinationTableIdentifier)
if err != nil {
return 0, err
}
Expand Down
37 changes: 13 additions & 24 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e_snowflake
import (
"context"
"fmt"
"strings"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
Expand All @@ -18,18 +19,6 @@ func (s *PeerFlowE2ETestSuiteSF) setupSourceTable(tableName string, rowCount int
s.NoError(err)
}

func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) {
schema := e2e.GetOwnersSchema()
err := s.sfHelper.CreateTable(dstTable, schema)

// fail if table creation fails
if err != nil {
s.FailNow("unable to create table on snowflake", err)
}

fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err)
}

func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
Expand All @@ -43,9 +32,9 @@ func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, select
qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName)
var sfSelQuery string
if caseSensitive {
sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, selector, qualifiedTableName)
sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY "id"`, strings.ToUpper(selector), qualifiedTableName)
} else {
sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, selector, qualifiedTableName)
sfSelQuery = fmt.Sprintf(`SELECT %s FROM %s ORDER BY id`, strings.ToUpper(selector), qualifiedTableName)
}
fmt.Printf("running query on snowflake: %s\n", sfSelQuery)

Expand All @@ -63,7 +52,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() {

tblName := "test_qrep_flow_avro_sf"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

Expand All @@ -79,6 +67,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() {
s.sfHelper.Peer,
"",
)
qrepConfig.SetupWatermarkTableOnDestination = true
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand All @@ -91,7 +80,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() {
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)
s.compareTableContentsSF(tblName, sel, false)

env.AssertExpectations(s.T())
}
Expand All @@ -104,7 +93,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()

tblName := "test_qrep_flow_avro_sf_ups"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

Expand All @@ -124,6 +112,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
UpsertKeyColumns: []string{"id"},
}
qrepConfig.SetupWatermarkTableOnDestination = true
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand All @@ -136,7 +125,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)
s.compareTableContentsSF(tblName, sel, false)

env.AssertExpectations(s.T())
}
Expand All @@ -149,7 +138,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {

tblName := "test_qrep_flow_avro_sf_s3"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

Expand All @@ -165,6 +153,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
s.sfHelper.Peer,
"",
)
qrepConfig.SetupWatermarkTableOnDestination = true
s.NoError(err)
qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New())

Expand All @@ -177,7 +166,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)
s.compareTableContentsSF(tblName, sel, false)

env.AssertExpectations(s.T())
}
Expand All @@ -189,7 +178,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {

tblName := "test_qrep_flow_avro_sf_ups_xmin"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

Expand All @@ -210,6 +198,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
UpsertKeyColumns: []string{"id"},
}
qrepConfig.WatermarkColumn = "xmin"
qrepConfig.SetupWatermarkTableOnDestination = true
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand All @@ -221,7 +210,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)
s.compareTableContentsSF(tblName, sel, false)

env.AssertExpectations(s.T())
}
Expand All @@ -234,7 +223,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(

tblName := "test_qrep_flow_avro_sf_s3_int"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

Expand All @@ -254,6 +242,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(
"",
)
s.NoError(err)
qrepConfig.SetupWatermarkTableOnDestination = true
qrepConfig.StagingPath = fmt.Sprintf("s3://peerdb-test-bucket/avro/%s", uuid.New())

e2e.RunQrepFlowWorkflow(env, qrepConfig)
Expand All @@ -265,7 +254,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)
s.compareTableContentsSF(tblName, sel, false)

env.AssertExpectations(s.T())
}

0 comments on commit 1834283

Please sign in to comment.