Skip to content

Commit

Permalink
Postgres: move version fetching to shared folder (#1576)
Browse files Browse the repository at this point in the history
This is a preceding PR to #1574
  • Loading branch information
Amogh-Bharadwaj authored Apr 5, 2024
1 parent 23dbcae commit f65d666
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 27 deletions.
3 changes: 2 additions & 1 deletion flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
)

func (h *FlowRequestHandler) ValidatePeer(
Expand Down Expand Up @@ -46,7 +47,7 @@ func (h *FlowRequestHandler) ValidatePeer(
return nil, err
}

if pgversion < connpostgres.POSTGRES_12 {
if pgversion < shared.POSTGRES_12 {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("Postgres peer %s must be of PG12 or above. Current version: %d",
Expand Down
30 changes: 6 additions & 24 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

type PGVersion int32

const (
POSTGRES_12 PGVersion = 120000
POSTGRES_13 PGVersion = 130000
POSTGRES_15 PGVersion = 150000
)

const (
mirrorJobsTableIdentifier = "peerdb_mirror_jobs"
createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(mirror_job_name TEXT PRIMARY KEY,
Expand Down Expand Up @@ -266,12 +258,12 @@ func getSlotInfo(ctx context.Context, conn *pgx.Conn, slotName string, database
whereClause = "WHERE database=" + QuoteLiteral(database)
}

pgversion, err := majorVersion(ctx, conn)
pgversion, err := shared.GetMajorVersion(ctx, conn)
if err != nil {
return nil, err
}
walStatusSelector := "wal_status"
if pgversion < POSTGRES_13 {
if pgversion < shared.POSTGRES_13 {
walStatusSelector = "'unknown'"
}
rows, err := conn.Query(ctx, fmt.Sprintf(`SELECT slot_name, redo_lsn::Text,restart_lsn::text,%s,
Expand Down Expand Up @@ -348,7 +340,7 @@ func (c *PostgresConnector) createSlotAndPublication(
return fmt.Errorf("error checking Postgres version: %w", err)
}
var pubViaRootString string
if pgversion >= POSTGRES_13 {
if pgversion >= shared.POSTGRES_13 {
pubViaRootString = "WITH(publish_via_partition_root=true)"
}
// Create the publication to help filter changes only for the given tables
Expand Down Expand Up @@ -400,7 +392,7 @@ func (c *PostgresConnector) createSlotAndPublication(
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SupportsTIDScans: pgversion >= POSTGRES_13,
SupportsTIDScans: pgversion >= shared.POSTGRES_13,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
Expand Down Expand Up @@ -523,18 +515,8 @@ func (c *PostgresConnector) jobMetadataExists(ctx context.Context, jobName strin
return result.Bool, nil
}

func majorVersion(ctx context.Context, conn *pgx.Conn) (PGVersion, error) {
var version int32
err := conn.QueryRow(ctx, "SELECT current_setting('server_version_num')::INTEGER").Scan(&version)
if err != nil {
return 0, fmt.Errorf("failed to get server version: %w", err)
}

return PGVersion(version), nil
}

func (c *PostgresConnector) MajorVersion(ctx context.Context) (PGVersion, error) {
return majorVersion(ctx, c.conn)
func (c *PostgresConnector) MajorVersion(ctx context.Context) (shared.PGVersion, error) {
return shared.GetMajorVersion(ctx, c.conn)
}

func (c *PostgresConnector) updateSyncMetadata(ctx context.Context, flowJobName string, lastCP int64, syncBatchID int64,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (c *PostgresConnector) NormalizeRecords(ctx context.Context, req *model.Nor
SyncedAtColName: req.SyncedAtColName,
SoftDelete: req.SoftDelete,
},
supportsMerge: pgversion >= POSTGRES_15,
supportsMerge: pgversion >= shared.POSTGRES_15,
metadataSchema: c.metadataSchema,
}
for _, destinationTableName := range destinationTableNames {
Expand Down Expand Up @@ -983,7 +983,7 @@ func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context) (*protos.Expor

return &protos.ExportTxSnapshotOutput{
SnapshotName: snapshotName,
SupportsTidScans: pgversion >= POSTGRES_13,
SupportsTidScans: pgversion >= shared.POSTGRES_13,
}, tx, err
}

Expand Down
18 changes: 18 additions & 0 deletions flow/shared/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
)

type PGVersion int32

const (
POSTGRES_12 PGVersion = 120000
POSTGRES_13 PGVersion = 130000
POSTGRES_15 PGVersion = 150000
)

func IsUniqueError(err error) bool {
var pgerr *pgconn.PgError
return errors.As(err, &pgerr) && pgerr.Code == pgerrcode.UniqueViolation
Expand Down Expand Up @@ -73,3 +81,13 @@ func RegisterHStore(ctx context.Context, conn *pgx.Conn) error {

return nil
}

func GetMajorVersion(ctx context.Context, conn *pgx.Conn) (PGVersion, error) {
var version int32
err := conn.QueryRow(ctx, "SELECT current_setting('server_version_num')::INTEGER").Scan(&version)
if err != nil {
return 0, fmt.Errorf("failed to get server version: %w", err)
}

return PGVersion(version), nil
}

0 comments on commit f65d666

Please sign in to comment.