Skip to content

Commit

Permalink
e2e: always use environment to access catalog (#1389)
Browse files Browse the repository at this point in the history
not hardcoded `postgrest:postgres@localhost/postgres:7132`
  • Loading branch information
serprex authored Feb 27, 2024
1 parent 865d42a commit caf2a0c
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 138 deletions.
14 changes: 9 additions & 5 deletions flow/connectors/utils/catalog/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error)
poolMutex.Lock()
defer poolMutex.Unlock()
if pool == nil {
catalogConnectionString := genCatalogConnectionString()
catalogConnectionString := GetCatalogConnectionStringFromEnv()
pool, err = pgxpool.New(ctx, catalogConnectionString)
if err != nil {
return nil, fmt.Errorf("unable to establish connection with catalog: %w", err)
Expand All @@ -38,12 +38,16 @@ func GetCatalogConnectionPoolFromEnv(ctx context.Context) (*pgxpool.Pool, error)
return pool, nil
}

func genCatalogConnectionString() string {
return utils.GetPGConnectionString(&protos.PostgresConfig{
func GetCatalogConnectionStringFromEnv() string {
return utils.GetPGConnectionString(GetCatalogPostgresConfigFromEnv())
}

func GetCatalogPostgresConfigFromEnv() *protos.PostgresConfig {
return &protos.PostgresConfig{
Host: peerdbenv.PeerDBCatalogHost(),
Port: peerdbenv.PeerDBCatalogPort(),
Port: uint32(peerdbenv.PeerDBCatalogPort()),
User: peerdbenv.PeerDBCatalogUser(),
Password: peerdbenv.PeerDBCatalogPassword(),
Database: peerdbenv.PeerDBCatalogDatabase(),
})
}
}
25 changes: 4 additions & 21 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_complete_flow_no_data"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -260,7 +259,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_char_table"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -299,7 +297,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_complete_simple_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -347,7 +344,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_toast_bq_1"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -403,7 +399,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_toast_bq_3"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -464,7 +459,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_toast_bq_4"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -519,7 +513,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_toast_bq_5"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -580,7 +573,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_types_bq"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -664,7 +656,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_nans_bq"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -711,7 +702,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_invalid_geo_bq_avro_cdc"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -792,7 +782,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_multi_table_bq"),
TableNameMapping: map[string]string{srcTable1Name: dstTable1Name, srcTable2Name: dstTable2Name},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -850,7 +839,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix(tableName),
TableNameMapping: map[string]string{srcTableName: tableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -936,7 +924,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_flow"),
TableNameMapping: map[string]string{srcTableName: tableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -997,7 +984,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_toast1_flow"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -1059,7 +1045,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_cpkey_toast2_flow"),
TableNameMapping: map[string]string{srcTableName: tableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -1114,7 +1099,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_peerdb_cols_mirror"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
SoftDelete: true,
}
Expand Down Expand Up @@ -1169,7 +1153,6 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
srcTable1Name: dstTable1Name,
srcTable2Name: fmt.Sprintf("%s.%s", secondDataset, dstTable2Name),
},
PostgresPort: e2e.PostgresPort,
Destination: s.bqHelper.Peer,
CdcStagingPath: "",
}
Expand Down Expand Up @@ -1240,7 +1223,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
DestinationTableIdentifier: tableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
Expand Down Expand Up @@ -1320,7 +1303,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
DestinationTableIdentifier: dstTableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
Expand Down Expand Up @@ -1398,7 +1381,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
DestinationTableIdentifier: dstName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
Expand Down Expand Up @@ -1482,7 +1465,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
DestinationTableIdentifier: tableName,
},
},
Source: e2e.GeneratePostgresPeer(e2e.PostgresPort),
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
Expand Down
46 changes: 10 additions & 36 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,10 @@ import (
"github.com/stretchr/testify/require"

connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
)

const (
postgresHost = "localhost"
postgresUser = "postgres"
postgresPassword = "postgres"
postgresDatabase = "postgres"
PostgresPort = 7132
)

func GetTestPostgresConf() *protos.PostgresConfig {
return &protos.PostgresConfig{
Host: postgresHost,
Port: uint32(PostgresPort),
User: postgresUser,
Password: postgresPassword,
Database: postgresDatabase,
}
}

func cleanPostgres(conn *pgx.Conn, suffix string) error {
// drop the e2e_test schema with the given suffix if it exists
_, err := conn.Exec(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS e2e_test_%s CASCADE", suffix))
Expand Down Expand Up @@ -123,7 +106,7 @@ func setupPostgresSchema(t *testing.T, conn *pgx.Conn, suffix string) error {
func SetupPostgres(t *testing.T, suffix string) (*connpostgres.PostgresConnector, error) {
t.Helper()

connector, err := connpostgres.NewPostgresConnector(context.Background(), GetTestPostgresConf())
connector, err := connpostgres.NewPostgresConnector(context.Background(), utils.GetCatalogPostgresConfigFromEnv())
if err != nil {
return nil, fmt.Errorf("failed to create postgres connection: %w", err)
}
Expand Down Expand Up @@ -167,28 +150,19 @@ func TearDownPostgres[T Suite](s T) {
}

// GeneratePostgresPeer generates a postgres peer config for testing.
func GeneratePostgresPeer(postgresPort int) *protos.Peer {
ret := &protos.Peer{}
ret.Name = "test_postgres_peer"
ret.Type = protos.DBType_POSTGRES

ret.Config = &protos.Peer_PostgresConfig{
PostgresConfig: &protos.PostgresConfig{
Host: "localhost",
Port: uint32(postgresPort),
User: "postgres",
Password: "postgres",
Database: "postgres",
func GeneratePostgresPeer() *protos.Peer {
return &protos.Peer{
Name: "test_postgres_peer",
Type: protos.DBType_POSTGRES,
Config: &protos.Peer_PostgresConfig{
PostgresConfig: utils.GetCatalogPostgresConfigFromEnv(),
},
}

return ret
}

type FlowConnectionGenerationConfig struct {
FlowJobName string
TableNameMapping map[string]string
PostgresPort int
Destination *protos.Peer
CdcStagingPath string
SoftDelete bool
Expand Down Expand Up @@ -219,7 +193,7 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos
ret := &protos.FlowConnectionConfigs{
FlowJobName: c.FlowJobName,
TableMappings: tblMappings,
Source: GeneratePostgresPeer(c.PostgresPort),
Source: GeneratePostgresPeer(),
Destination: c.Destination,
CdcStagingPath: c.CdcStagingPath,
SoftDelete: c.SoftDelete,
Expand Down Expand Up @@ -250,7 +224,7 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig(
ret.WatermarkTable = c.WatermarkTable
ret.DestinationTableIdentifier = c.DestinationTableIdentifier

postgresPeer := GeneratePostgresPeer(c.PostgresPort)
postgresPeer := GeneratePostgresPeer()
ret.SourcePeer = postgresPeer

ret.DestinationPeer = c.Destination
Expand Down
Loading

0 comments on commit caf2a0c

Please sign in to comment.