From e75b052a8d22caf9f9ae5e20397cd36b44194e8e Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Sun, 15 Oct 2023 22:40:39 +0530 Subject: [PATCH] lint and s3 test fix --- .../postgres/qrep_query_executor.go | 3 +- flow/connectors/postgres/qvalue_convert.go | 7 +++-- flow/e2e/bigquery/qrep_flow_bq_test.go | 28 ------------------ flow/e2e/s3/s3_helper.go | 13 +++++---- flow/e2e/snowflake/qrep_flow_sf_test.go | 29 ------------------- flow/workflows/cdc_flow.go | 1 - 6 files changed, 14 insertions(+), 67 deletions(-) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 1dcf906e7e..a099277754 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -99,7 +99,8 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip ctype := postgresOIDToQValueKind(fd.DataTypeOID, qe.connStr) if ctype == qvalue.QValueKindInvalid { var typeName string - err := qe.pool.QueryRow(qe.ctx, "SELECT typname FROM pg_type WHERE oid = $1", fd.DataTypeOID).Scan(&typeName) + err := qe.pool.QueryRow(qe.ctx, "SELECT typname FROM pg_type WHERE oid = $1", + fd.DataTypeOID).Scan(&typeName) if err != nil { ctype = qvalue.QValueKindInvalid } else { diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 98b498362f..d620336f00 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -351,9 +351,10 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( } val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal} case qvalue.QValueKindPoint: - x_coord := value.(pgtype.Point).P.X - y_coord := value.(pgtype.Point).P.Y - val = &qvalue.QValue{Kind: qvalue.QValueKindPoint, Value: fmt.Sprintf("POINT(%f %f)", x_coord, y_coord)} + xCoord := value.(pgtype.Point).P.X + yCoord := value.(pgtype.Point).P.Y + val = &qvalue.QValue{Kind: qvalue.QValueKindPoint, + Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)} default: // log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind) textVal, ok := value.(string) diff --git a/flow/e2e/bigquery/qrep_flow_bq_test.go b/flow/e2e/bigquery/qrep_flow_bq_test.go index fb0fc3b876..5e6374cc1a 100644 --- a/flow/e2e/bigquery/qrep_flow_bq_test.go +++ b/flow/e2e/bigquery/qrep_flow_bq_test.go @@ -3,8 +3,6 @@ package e2e_bigquery import ( "context" "fmt" - "sort" - "strings" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" @@ -29,32 +27,6 @@ func (s *PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) { fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err) } -func (s *PeerFlowE2ETestSuiteBQ) compareTableSchemasBQ(tableName string) { - // read rows from source table - pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") - pgQueryExecutor.SetTestEnv(true) - - pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT * FROM e2e_test_%s.%s ORDER BY id", bigquerySuffix, tableName), - ) - s.NoError(err) - sort.Slice(pgRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(pgRows.Schema.Fields[i].Name, pgRows.Schema.Fields[j].Name) == -1 - }) - - // read rows from destination table - qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName) - bqRows, err := s.bqHelper.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT * FROM %s ORDER BY id", qualifiedTableName), - ) - s.NoError(err) - sort.Slice(bqRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(bqRows.Schema.Fields[i].Name, bqRows.Schema.Fields[j].Name) == -1 - }) - - s.True(pgRows.Schema.EqualNames(bqRows.Schema), "schemas from source and destination tables are not equal") -} - func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") diff --git a/flow/e2e/s3/s3_helper.go b/flow/e2e/s3/s3_helper.go index 5d40d9a0e0..7ea629ad2b 100644 --- a/flow/e2e/s3/s3_helper.go +++ b/flow/e2e/s3/s3_helper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "time" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" @@ -15,14 +16,14 @@ import ( ) const ( - peerName string = "test_s3_peer" - prefixName string = "test-s3" + peerName string = "test_s3_peer" ) type S3TestHelper struct { client *s3.S3 s3Config *protos.S3Config bucketName string + prefix string } func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { @@ -51,10 +52,11 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { if err != nil { return nil, err } + prefix := fmt.Sprintf("peerdb_test/%d", time.Now().UnixNano()) return &S3TestHelper{ client, &protos.S3Config{ - Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName), + Url: fmt.Sprintf("s3://%s/%s", bucketName, prefix), AccessKeyId: &config.AccessKeyID, SecretAccessKey: &config.SecretAccessKey, Region: &config.Region, @@ -68,6 +70,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) { }, }, bucketName, + prefix, }, nil } @@ -89,7 +92,7 @@ func (h *S3TestHelper) ListAllFiles( ) ([]*s3.Object, error) { Bucket := h.bucketName - Prefix := fmt.Sprintf("%s/%s/", prefixName, jobName) + Prefix := fmt.Sprintf("%s/%s/", h.prefix, jobName) files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, Prefix: &Prefix, @@ -105,7 +108,7 @@ func (h *S3TestHelper) ListAllFiles( // Delete all generated objects during the test func (h *S3TestHelper) CleanUp() error { Bucket := h.bucketName - Prefix := prefixName + Prefix := h.prefix files, err := h.client.ListObjects(&s3.ListObjectsInput{ Bucket: &Bucket, Prefix: &Prefix, diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 01d2532e51..8da1df3c5f 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -3,8 +3,6 @@ package e2e_snowflake import ( "context" "fmt" - "sort" - "strings" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/e2e" @@ -32,33 +30,6 @@ func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) { fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err) } -func (s *PeerFlowE2ETestSuiteSF) compareTableSchemasSF(tableName string) { - // read rows from source table - pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") - pgQueryExecutor.SetTestEnv(true) - pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery( - fmt.Sprintf("SELECT * FROM e2e_test_%s.%s LIMIT 0", snowflakeSuffix, tableName), - ) - require.NoError(s.T(), err) - sort.Slice(pgRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(pgRows.Schema.Fields[i].Name, pgRows.Schema.Fields[j].Name) == -1 - }) - - // read rows from destination table - qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName) - // excluding soft-delete column during schema conversion - sfSelQuery := fmt.Sprintf(`SELECT * EXCLUDE _PEERDB_IS_DELETED FROM %s LIMIT 0`, qualifiedTableName) - fmt.Printf("running query on snowflake: %s\n", sfSelQuery) - - sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery) - require.NoError(s.T(), err) - sort.Slice(sfRows.Schema.Fields, func(i int, j int) bool { - return strings.Compare(sfRows.Schema.Fields[i].Name, sfRows.Schema.Fields[j].Name) == -1 - }) - - s.True(pgRows.Schema.EqualNames(sfRows.Schema), "schemas from source and destination tables are not equal") -} - func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) { // read rows from source table pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart") diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index fb47ff8ffd..8029a10408 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -336,7 +336,6 @@ func CDCFlowWorkflowWithConfig( cfg.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]] } - } }