diff --git a/flow/connectors/utils/catalog/env.go b/flow/connectors/utils/catalog/env.go index cb73b947cf..5e2cdc109c 100644 --- a/flow/connectors/utils/catalog/env.go +++ b/flow/connectors/utils/catalog/env.go @@ -23,7 +23,7 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error) poolMutex.Lock() defer poolMutex.Unlock() if pool == nil { - catalogConnectionString := genCatalogConnectionString() + catalogConnectionString := GetCatalogConnectionStringFromEnv() pool, err = pgxpool.New(ctx, catalogConnectionString) if err != nil { return nil, fmt.Errorf("unable to establish connection with catalog: %w", err) @@ -38,12 +38,16 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error) return pool, nil } -func genCatalogConnectionString() string { - return utils.GetPGConnectionString(&protos.PostgresConfig{ +func GetCatalogConnectionStringFromEnv() string { + return utils.GetPGConnectionString(GetCatalogPostgresConfigFromEnv()) +} + +func GetCatalogPostgresConfigFromEnv() *protos.PostgresConfig { + return &protos.PostgresConfig{ Host: peerdbenv.PeerDBCatalogHost(), - Port: peerdbenv.PeerDBCatalogPort(), + Port: uint32(peerdbenv.PeerDBCatalogPort()), User: peerdbenv.PeerDBCatalogUser(), Password: peerdbenv.PeerDBCatalogPassword(), Database: peerdbenv.PeerDBCatalogDatabase(), - }) + } } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 5d4fa6cadd..cdf7f1d021 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -224,7 +224,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_flow_no_data"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -260,7 +259,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_char_table"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -299,7 +297,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_complete_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -347,7 +344,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_1"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -403,7 +399,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_3"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -464,7 +459,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_4"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -519,7 +513,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_bq_5"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -580,7 +573,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_bq"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -664,7 +656,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_nans_bq"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -711,7 +702,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_invalid_geo_bq_avro_cdc"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -792,7 +782,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table_bq"), TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -850,7 +839,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: tableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -936,7 +924,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), TableNameMapping: map[string]string{srcTableName: tableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -997,7 +984,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -1059,7 +1045,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), TableNameMapping: map[string]string{srcTableName: tableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -1114,7 +1099,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, SoftDelete: true, } @@ -1169,7 +1153,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { srcTable1Name: dstTable1Name, srcTable2Name: fmt.Sprintf("%s.%s", secondDataset, dstTable2Name), }, - PostgresPort: e2e.PostgresPort, Destination: s.bqHelper.Peer, CdcStagingPath: "", } @@ -1240,7 +1223,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { DestinationTableIdentifier: tableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1320,7 +1303,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1398,7 +1381,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { DestinationTableIdentifier: dstName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1482,7 +1465,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { DestinationTableIdentifier: tableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index a00d687172..603c6f0b7e 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -11,27 +11,10 @@ import ( "github.com/stretchr/testify/require" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" + "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/generated/protos" ) -const ( - postgresHost = "localhost" - postgresUser = "postgres" - postgresPassword = "postgres" - postgresDatabase = "postgres" - PostgresPort = 7132 -) - -func GetTestPostgresConf() *protos.PostgresConfig { - return &protos.PostgresConfig{ - Host: postgresHost, - Port: uint32(PostgresPort), - User: postgresUser, - Password: postgresPassword, - Database: postgresDatabase, - } -} - func cleanPostgres(conn *pgx.Conn, suffix string) error { // drop the e2e_test schema with the given suffix if it exists _, err := conn.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS e2e_test_%s CASCADE", suffix)) @@ -123,7 +106,7 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error { func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector, error) { t.Helper() - connector, err := connpostgres.NewPostgresConnector(context.Background(), GetTestPostgresConf()) + connector, err := connpostgres.NewPostgresConnector(context.Background(), utils.GetCatalogPostgresConfigFromEnv()) if err != nil { return nil, fmt.Errorf("failed to create postgres connection: %w", err) } @@ -167,28 +150,19 @@ func TearDownPostgres[T Suite](s T) { } // GeneratePostgresPeer generates a postgres peer config for testing. -func GeneratePostgresPeer(postgresPort int) *protos.Peer { - ret := &protos.Peer{} - ret.Name = "test_postgres_peer" - ret.Type = protos.DBType_POSTGRES - - ret.Config = &protos.Peer_PostgresConfig{ - PostgresConfig: &protos.PostgresConfig{ - Host: "localhost", - Port: uint32(postgresPort), - User: "postgres", - Password: "postgres", - Database: "postgres", +func GeneratePostgresPeer() *protos.Peer { + return &protos.Peer{ + Name: "test_postgres_peer", + Type: protos.DBType_POSTGRES, + Config: &protos.Peer_PostgresConfig{ + PostgresConfig: utils.GetCatalogPostgresConfigFromEnv(), }, } - - return ret } type FlowConnectionGenerationConfig struct { FlowJobName string TableNameMapping map[string]string - PostgresPort int Destination *protos.Peer CdcStagingPath string SoftDelete bool @@ -219,7 +193,7 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos ret := &protos.FlowConnectionConfigs{ FlowJobName: c.FlowJobName, TableMappings: tblMappings, - Source: GeneratePostgresPeer(c.PostgresPort), + Source: GeneratePostgresPeer(), Destination: c.Destination, CdcStagingPath: c.CdcStagingPath, SoftDelete: c.SoftDelete, @@ -250,7 +224,7 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( ret.WatermarkTable = c.WatermarkTable ret.DestinationTableIdentifier = c.DestinationTableIdentifier - postgresPeer := GeneratePostgresPeer(c.PostgresPort) + postgresPeer := GeneratePostgresPeer() ret.SourcePeer = postgresPeer ret.DestinationPeer = c.Destination diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 0f5cbfde13..c1d4f299bc 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -98,7 +98,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -148,7 +147,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_geo_flow_pg"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -194,7 +192,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_pg"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -261,7 +258,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_enum_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -303,7 +299,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -490,7 +485,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -555,7 +549,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -623,7 +616,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, } @@ -683,7 +675,6 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.peer, SoftDelete: true, } @@ -748,7 +739,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -834,7 +825,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -913,7 +904,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -997,7 +988,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1074,7 +1065,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, MaxBatchSize: 100, } @@ -1146,7 +1137,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() { DestinationTableIdentifier: dstTable1Name, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, MaxBatchSize: 6, IdleTimeoutSeconds: 7, diff --git a/flow/e2e/postgres/postgres_helper.go b/flow/e2e/postgres/postgres_helper.go deleted file mode 100644 index 6f9b471cc3..0000000000 --- a/flow/e2e/postgres/postgres_helper.go +++ /dev/null @@ -1,15 +0,0 @@ -package e2e_postgres - -import "github.com/PeerDB-io/peer-flow/generated/protos" - -func generatePGPeer(postgresConfig *protos.PostgresConfig) *protos.Peer { - ret := &protos.Peer{} - ret.Name = "test_pg_peer" - ret.Type = protos.DBType_POSTGRES - - ret.Config = &protos.Peer_PostgresConfig{ - PostgresConfig: postgresConfig, - } - - return ret -} diff --git a/flow/e2e/postgres/qrep_flow_pg_test.go b/flow/e2e/postgres/qrep_flow_pg_test.go index c13e02b6e0..b5e745c9bf 100644 --- a/flow/e2e/postgres/qrep_flow_pg_test.go +++ b/flow/e2e/postgres/qrep_flow_pg_test.go @@ -67,7 +67,7 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuitePG { return PeerFlowE2ETestSuitePG{ t: t, conn: conn, - peer: generatePGPeer(e2e.GetTestPostgresConf()), + peer: e2e.GeneratePostgresPeer(), suffix: suffix, } } @@ -230,7 +230,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() { query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) - postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) + postgresPeer := e2e.GeneratePostgresPeer() qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_flow_avro_pg", @@ -272,7 +272,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Setup_Destination_And_PeerDB_Columns_QRep_P query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", s.suffix, srcTable) - postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) + postgresPeer := e2e.GeneratePostgresPeer() qrepConfig, err := e2e.CreateQRepWorkflowConfig( "test_qrep_columns_pg", diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index 247141336b..8a976b8a0e 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -36,7 +36,6 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: flowJobName, TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.s3Helper.GetPeer(), } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 777a588e93..40dbefaa2e 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -145,7 +145,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -206,7 +205,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -258,7 +256,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -339,7 +336,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_1"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -393,7 +389,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_3"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -452,7 +447,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_4"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -506,7 +500,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_toast_sf_5"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -567,7 +560,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_types_sf"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -654,7 +646,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_multi_table"), TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -709,7 +700,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_simple_schema_changes"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -927,7 +917,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -985,7 +974,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -1048,7 +1036,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix(tableName), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } @@ -1118,7 +1105,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { Exclude: []string{"c2"}, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SyncedAtColName: "_PEERDB_SYNCED_AT", MaxBatchSize: 100, @@ -1193,7 +1180,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1269,7 +1256,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1348,7 +1335,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1432,7 +1419,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { DestinationTableIdentifier: dstTableName, }, }, - Source: e2e.GeneratePostgresPeer(e2e.PostgresPort), + Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, SoftDeleteColName: "_PEERDB_IS_DELETED", @@ -1498,7 +1485,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { connectionGen := e2e.FlowConnectionGenerationConfig{ FlowJobName: s.attachSuffix("test_mixed_case"), TableNameMapping: map[string]string{srcTableName: dstTableName}, - PostgresPort: e2e.PostgresPort, Destination: s.sfHelper.Peer, } diff --git a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go index 2ac659a1b5..2e7572cdb0 100644 --- a/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go +++ b/flow/e2e/sqlserver/qrep_flow_sqlserver_test.go @@ -162,7 +162,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append( query := fmt.Sprintf("SELECT * FROM %s.%s WHERE v_from BETWEEN {{.start}} AND {{.end}}", s.sqlsHelper.SchemaName, tblName) - postgresPeer := e2e.GeneratePostgresPeer(e2e.PostgresPort) + postgresPeer := e2e.GeneratePostgresPeer() qrepConfig := &protos.QRepConfig{ FlowJobName: tblName, diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 93136809b2..36a0e4d7c9 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -27,6 +27,7 @@ import ( connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" + catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" @@ -51,7 +52,7 @@ type RowSource interface { func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnvironment) { t.Helper() - conn, err := pgxpool.New(context.Background(), utils.GetPGConnectionString(GetTestPostgresConf())) + conn, err := pgxpool.New(context.Background(), catalog.GetCatalogConnectionStringFromEnv()) if err != nil { t.Fatalf("unable to create catalog connection pool: %v", err) } @@ -429,7 +430,6 @@ func CreateQRepWorkflowConfig( FlowJobName: flowJobName, WatermarkTable: sourceTable, DestinationTableIdentifier: dstTable, - PostgresPort: PostgresPort, Destination: dest, StagingPath: stagingPath, } diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 63633fe413..f6128e56a7 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -62,8 +62,8 @@ func PeerDBCatalogHost() string { } // PEERDB_CATALOG_PORT -func PeerDBCatalogPort() uint32 { - return getEnvUint[uint32]("PEERDB_CATALOG_PORT", 5432) +func PeerDBCatalogPort() uint16 { + return getEnvUint[uint16]("PEERDB_CATALOG_PORT", 5432) } // PEERDB_CATALOG_USER diff --git a/flow/peerdbenv/env.go b/flow/peerdbenv/env.go index 0e1c18b3ca..862ae07eb3 100644 --- a/flow/peerdbenv/env.go +++ b/flow/peerdbenv/env.go @@ -7,18 +7,10 @@ import ( "golang.org/x/exp/constraints" ) -// GetEnv returns the value of the environment variable with the given name -// and a boolean indicating whether the environment variable exists. -func getEnv(name string) (string, bool) { - val, exists := os.LookupEnv(name) - return val, exists -} - // GetEnvInt returns the value of the environment variable with the given name -// or defaultValue if the environment variable is not set or is not a valid -// integer value. +// or defaultValue if the environment variable is not set or is not a valid value. func getEnvInt(name string, defaultValue int) int { - val, ok := getEnv(name) + val, ok := os.LookupEnv(name) if !ok { return defaultValue } @@ -31,11 +23,10 @@ func getEnvInt(name string, defaultValue int) int { return i } -// getEnvUint32 returns the value of the environment variable with the given name -// or defaultValue if the environment variable is not set or is not a valid -// uint32 value. +// getEnvUint returns the value of the environment variable with the given name +// or defaultValue if the environment variable is not set or is not a valid value. func getEnvUint[T constraints.Unsigned](name string, defaultValue T) T { - val, ok := getEnv(name) + val, ok := os.LookupEnv(name) if !ok { return defaultValue } @@ -50,10 +41,9 @@ func getEnvUint[T constraints.Unsigned](name string, defaultValue T) T { } // getEnvBool returns the value of the environment variable with the given name -// or defaultValue if the environment variable is not set or is not a valid -// boolean value. +// or defaultValue if the environment variable is not set or is not a valid value. func getEnvBool(name string, defaultValue bool) bool { - val, ok := getEnv(name) + val, ok := os.LookupEnv(name) if !ok { return defaultValue } @@ -69,7 +59,7 @@ func getEnvBool(name string, defaultValue bool) bool { // GetEnvString returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set. func getEnvString(name string, defaultValue string) string { - val, ok := getEnv(name) + val, ok := os.LookupEnv(name) if !ok { return defaultValue }