Skip to content

Commit

Permalink
uses pgerrcode
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 8, 2024
1 parent d24f162 commit b899ea5
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
6 changes: 5 additions & 1 deletion flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e_bigquery

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
Expand All @@ -16,6 +17,8 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -707,8 +710,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
srcTableName := s.attachSchemaSuffix("test_types_bq")
dstTableName := "test_types_bq"
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
var pgErr *pgconn.PgError
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if enumErr != nil && !strings.Contains(enumErr.Error(), "already exists") {
if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject {
require.NoError(s.t, enumErr)
}
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down
7 changes: 5 additions & 2 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package e2e_postgres

import (
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/qvalue"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -112,8 +114,9 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
srcTableName := s.attachSchemaSuffix("test_enum_flow")
dstTableName := s.attachSchemaSuffix("test_enum_flow_dst")
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
var pgErr *pgconn.PgError
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if enumErr != nil && !strings.Contains(enumErr.Error(), "already exists") {
if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject {
require.NoError(s.t, enumErr)
}
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down
6 changes: 5 additions & 1 deletion flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e_snowflake

import (
"context"
"errors"
"fmt"
"log/slog"
"strings"
Expand All @@ -17,6 +18,8 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/joho/godotenv"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -690,8 +693,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
srcTableName := s.attachSchemaSuffix("test_types_sf")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "test_types_sf")
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
var pgErr *pgconn.PgError
_, enumErr := s.pool.Exec(context.Background(), createMoodEnum)
if enumErr != nil && !strings.Contains(enumErr.Error(), "already exists") {
if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject {
require.NoError(s.t, enumErr)
}
_, err := s.pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down
10 changes: 7 additions & 3 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/PeerDB-io/peer-flow/shared/alerting"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/google/uuid"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
Expand Down Expand Up @@ -185,7 +188,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment,
}

func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) error {
createEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"
createMoodEnum := "CREATE TYPE mood AS ENUM ('happy', 'sad', 'angry');"

tblFields := []string{
"id UUID NOT NULL PRIMARY KEY",
Expand Down Expand Up @@ -236,8 +239,9 @@ func CreateTableForQRep(pool *pgxpool.Pool, suffix string, tableName string) err
"geography_polygon geography(polygon)")
}
tblFieldStr := strings.Join(tblFields, ",")
_, enumErr := pool.Exec(context.Background(), createEnum)
if enumErr != nil && !strings.Contains(enumErr.Error(), "already exists") {
var pgErr *pgconn.PgError
_, enumErr := pool.Exec(context.Background(), createMoodEnum)
if errors.As(enumErr, &pgErr) && pgErr.Code != pgerrcode.DuplicateObject {
return enumErr
}
_, err := pool.Exec(context.Background(), fmt.Sprintf(`
Expand Down

0 comments on commit b899ea5

Please sign in to comment.