Skip to content

Commit

Permalink
postgres connector: ignore unique constraint violations from IF NOT E…
Browse files Browse the repository at this point in the history
…XISTS

Followup from #905 now that I've seen parallel testing of pg connector running into this
  • Loading branch information
serprex committed Dec 30, 2023
1 parent 321818b commit f0a68cd
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
12 changes: 2 additions & 10 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package connmetadata

import (
"context"
"errors"
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/connectors/utils"
cc "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)
Expand All @@ -20,11 +17,6 @@ const (
lastSyncStateTableName = "last_sync_state"
)

func isUniqueError(err error) bool {
var pgerr *pgconn.PgError
return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation
}

type PostgresMetadataStore struct {
ctx context.Context
config *protos.PostgresConfig
Expand Down Expand Up @@ -114,7 +106,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {

// create the schema
_, err = tx.Exec(p.ctx, "CREATE SCHEMA IF NOT EXISTS "+p.schemaName)
if err != nil && !isUniqueError(err) {
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create schema", slog.Any("error", err))
return err
}
Expand All @@ -128,7 +120,7 @@ func (p *PostgresMetadataStore) SetupMetadata() error {
sync_batch_id BIGINT NOT NULL
)
`)
if err != nil && !isUniqueError(err) {
if err != nil && !utils.IsUniqueError(err) {
p.logger.Error("failed to create last sync state table", slog.Any("error", err))
return err
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (c *PostgresConnector) createSlotAndPublication(

func (c *PostgresConnector) createMetadataSchema(createSchemaTx pgx.Tx) error {
_, err := createSchemaTx.Exec(c.ctx, fmt.Sprintf(createSchemaSQL, c.metadataSchema))
if err != nil {
if err != nil && !utils.IsUniqueError(err) {
return fmt.Errorf("error while creating internal schema: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (c *PostgresConnector) SetupMetadataTables() error {
}
_, err = createMetadataTablesTx.Exec(c.ctx, fmt.Sprintf(createMirrorJobsTableSQL,
c.metadataSchema, mirrorJobsTableIdentifier))
if err != nil {
if err != nil && !utils.IsUniqueError(err) {
return fmt.Errorf("error creating table %s: %w", mirrorJobsTableIdentifier, err)
}

Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/utils/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package utils

import (
"context"
"errors"
"fmt"
"net/url"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)

func IsUniqueError(err error) bool {
var pgerr *pgconn.PgError
return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation
}

func GetPGConnectionString(pgConfig *protos.PostgresConfig) string {
passwordEscaped := url.QueryEscape(pgConfig.Password)
// for a url like postgres://user:password@host:port/dbname
Expand Down

0 comments on commit f0a68cd

Please sign in to comment.