diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c73e2f9a62..c8074484ad 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -699,9 +700,10 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" var pgErr *pgconn.PgError _, enumErr := s.pool.Exec(context.Background(), createMoodEnum) - if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject { + if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { require.NoError(s.t, enumErr) } + _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN, c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION, diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 476417b638..130c860c4c 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -114,7 +115,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" var pgErr *pgconn.PgError _, enumErr := s.pool.Exec(context.Background(), createMoodEnum) - if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject { + if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(enumErr) { require.NoError(s.t, enumErr) } _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 58adc47e65..7ffa8763c9 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -11,6 +11,7 @@ import ( "time" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -684,7 +685,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');" var pgErr *pgconn.PgError _, enumErr := s.pool.Exec(context.Background(), createMoodEnum) - if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject { + if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !utils.IsUniqueError(pgErr) { require.NoError(s.t, enumErr) } _, err := s.pool.Exec(context.Background(), fmt.Sprintf(` diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index eb48367f12..e043e97d54 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -15,6 +15,7 @@ import ( "github.com/PeerDB-io/peer-flow/activities" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" + conn_utils "github.com/PeerDB-io/peer-flow/connectors/utils" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -237,7 +238,7 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err tblFieldStr := strings.Join(tblFields, ",") var pgErr *pgconn.PgError _, enumErr := pool.Exec(context.Background(), createMoodEnum) - if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject { + if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject && !conn_utils.IsUniqueError(pgErr) { return enumErr } _, err := pool.Exec(context.Background(), fmt.Sprintf(`