Skip to content

Commit

Permalink
lint and s3 test fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 15, 2023
1 parent b00e32f commit e75b052
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 67 deletions.
3 changes: 2 additions & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
ctype := postgresOIDToQValueKind(fd.DataTypeOID, qe.connStr)
if ctype == qvalue.QValueKindInvalid {
var typeName string
err := qe.pool.QueryRow(qe.ctx, "SELECT typname FROM pg_type WHERE oid = $1", fd.DataTypeOID).Scan(&typeName)
err := qe.pool.QueryRow(qe.ctx, "SELECT typname FROM pg_type WHERE oid = $1",
fd.DataTypeOID).Scan(&typeName)
if err != nil {
ctype = qvalue.QValueKindInvalid
} else {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,10 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
}
val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
case qvalue.QValueKindPoint:
x_coord := value.(pgtype.Point).P.X
y_coord := value.(pgtype.Point).P.Y
val = &qvalue.QValue{Kind: qvalue.QValueKindPoint, Value: fmt.Sprintf("POINT(%f %f)", x_coord, y_coord)}
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
val = &qvalue.QValue{Kind: qvalue.QValueKindPoint,
Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)}
default:
// log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind)
textVal, ok := value.(string)
Expand Down
28 changes: 0 additions & 28 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package e2e_bigquery
import (
"context"
"fmt"
"sort"
"strings"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
Expand All @@ -29,32 +27,6 @@ func (s *PeerFlowE2ETestSuiteBQ) setupBQDestinationTable(dstTable string) {
fmt.Printf("created table on bigquery: %s.%s. %v\n", s.bqHelper.Config.DatasetId, dstTable, err)
}

func (s *PeerFlowE2ETestSuiteBQ) compareTableSchemasBQ(tableName string) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
pgQueryExecutor.SetTestEnv(true)

pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery(
fmt.Sprintf("SELECT * FROM e2e_test_%s.%s ORDER BY id", bigquerySuffix, tableName),
)
s.NoError(err)
sort.Slice(pgRows.Schema.Fields, func(i int, j int) bool {
return strings.Compare(pgRows.Schema.Fields[i].Name, pgRows.Schema.Fields[j].Name) == -1
})

// read rows from destination table
qualifiedTableName := fmt.Sprintf("`%s.%s`", s.bqHelper.Config.DatasetId, tableName)
bqRows, err := s.bqHelper.ExecuteAndProcessQuery(
fmt.Sprintf("SELECT * FROM %s ORDER BY id", qualifiedTableName),
)
s.NoError(err)
sort.Slice(bqRows.Schema.Fields, func(i int, j int) bool {
return strings.Compare(bqRows.Schema.Fields[i].Name, bqRows.Schema.Fields[j].Name) == -1
})

s.True(pgRows.Schema.EqualNames(bqRows.Schema), "schemas from source and destination tables are not equal")
}

func (s *PeerFlowE2ETestSuiteBQ) compareTableContentsBQ(tableName string, colsString string) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
Expand Down
13 changes: 8 additions & 5 deletions flow/e2e/s3/s3_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/e2e"
Expand All @@ -15,14 +16,14 @@ import (
)

const (
peerName string = "test_s3_peer"
prefixName string = "test-s3"
peerName string = "test_s3_peer"
)

type S3TestHelper struct {
client *s3.S3
s3Config *protos.S3Config
bucketName string
prefix string
}

func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) {
Expand Down Expand Up @@ -51,10 +52,11 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) {
if err != nil {
return nil, err
}
prefix := fmt.Sprintf("peerdb_test/%d", time.Now().UnixNano())
return &S3TestHelper{
client,
&protos.S3Config{
Url: fmt.Sprintf("s3://%s/%s", bucketName, prefixName),
Url: fmt.Sprintf("s3://%s/%s", bucketName, prefix),
AccessKeyId: &config.AccessKeyID,
SecretAccessKey: &config.SecretAccessKey,
Region: &config.Region,
Expand All @@ -68,6 +70,7 @@ func NewS3TestHelper(switchToGCS bool) (*S3TestHelper, error) {
},
},
bucketName,
prefix,
}, nil
}

Expand All @@ -89,7 +92,7 @@ func (h *S3TestHelper) ListAllFiles(
) ([]*s3.Object, error) {

Bucket := h.bucketName
Prefix := fmt.Sprintf("%s/%s/", prefixName, jobName)
Prefix := fmt.Sprintf("%s/%s/", h.prefix, jobName)
files, err := h.client.ListObjects(&s3.ListObjectsInput{
Bucket: &Bucket,
Prefix: &Prefix,
Expand All @@ -105,7 +108,7 @@ func (h *S3TestHelper) ListAllFiles(
// Delete all generated objects during the test
func (h *S3TestHelper) CleanUp() error {
Bucket := h.bucketName
Prefix := prefixName
Prefix := h.prefix
files, err := h.client.ListObjects(&s3.ListObjectsInput{
Bucket: &Bucket,
Prefix: &Prefix,
Expand Down
29 changes: 0 additions & 29 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package e2e_snowflake
import (
"context"
"fmt"
"sort"
"strings"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/e2e"
Expand Down Expand Up @@ -32,33 +30,6 @@ func (s *PeerFlowE2ETestSuiteSF) setupSFDestinationTable(dstTable string) {
fmt.Printf("created table on snowflake: %s.%s. %v\n", s.sfHelper.testSchemaName, dstTable, err)
}

func (s *PeerFlowE2ETestSuiteSF) compareTableSchemasSF(tableName string) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
pgQueryExecutor.SetTestEnv(true)
pgRows, err := pgQueryExecutor.ExecuteAndProcessQuery(
fmt.Sprintf("SELECT * FROM e2e_test_%s.%s LIMIT 0", snowflakeSuffix, tableName),
)
require.NoError(s.T(), err)
sort.Slice(pgRows.Schema.Fields, func(i int, j int) bool {
return strings.Compare(pgRows.Schema.Fields[i].Name, pgRows.Schema.Fields[j].Name) == -1
})

// read rows from destination table
qualifiedTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tableName)
// excluding soft-delete column during schema conversion
sfSelQuery := fmt.Sprintf(`SELECT * EXCLUDE _PEERDB_IS_DELETED FROM %s LIMIT 0`, qualifiedTableName)
fmt.Printf("running query on snowflake: %s\n", sfSelQuery)

sfRows, err := s.sfHelper.ExecuteAndProcessQuery(sfSelQuery)
require.NoError(s.T(), err)
sort.Slice(sfRows.Schema.Fields, func(i int, j int) bool {
return strings.Compare(sfRows.Schema.Fields[i].Name, sfRows.Schema.Fields[j].Name) == -1
})

s.True(pgRows.Schema.EqualNames(sfRows.Schema), "schemas from source and destination tables are not equal")
}

func (s *PeerFlowE2ETestSuiteSF) compareTableContentsSF(tableName string, selector string, caseSensitive bool) {
// read rows from source table
pgQueryExecutor := connpostgres.NewQRepQueryExecutor(s.pool, context.Background(), "testflow", "testpart")
Expand Down
1 change: 0 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func CDCFlowWorkflowWithConfig(
cfg.TableNameSchemaMapping[modifiedDstTables[i]] =
getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
}

}
}

Expand Down

0 comments on commit e75b052

Please sign in to comment.