Skip to content

Commit

Permalink
rebase oopsie
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 19, 2024
1 parent 97a23c7 commit 4ee863a
Showing 1 changed file with 0 additions and 103 deletions.
103 changes: 0 additions & 103 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/jackc/pglogrepl"
Expand Down Expand Up @@ -626,105 +625,3 @@ func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, e
func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {
return "peerflow_pub_" + jobName
}

func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error {
if c.conn == nil {
return errors.New("check tables: conn is nil")
}

// Check that we can select from all tables
tableArr := make([]string, 0, len(tableNames))
for _, table := range tableNames {
var row pgx.Row
schemaName, tableName, found := strings.Cut(table, ".")
if !found {
return fmt.Errorf("invalid source table identifier: %s", table)
}

tableArr = append(tableArr, fmt.Sprintf(`(%s::text, %s::text)`, QuoteLiteral(schemaName), QuoteLiteral(tableName)))
err := c.conn.QueryRow(ctx,
fmt.Sprintf("SELECT * FROM %s.%s LIMIT 0;", QuoteIdentifier(schemaName), QuoteIdentifier(tableName))).Scan(&row)
if err != nil && err != pgx.ErrNoRows {
return err
}
}

tableStr := strings.Join(tableArr, ",")
if pubName != "" {
// Check if publication exists
err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil)
if err != nil {
if err == pgx.ErrNoRows {
return fmt.Errorf("publication does not exist: %s", pubName)
}
return fmt.Errorf("error while checking for publication existence: %w", err)
}

// Check if tables belong to publication
var pubTableCount int
err = c.conn.QueryRow(ctx, fmt.Sprintf(`
with source_table_components (sname, tname) as (values %s)
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount)
if err != nil {
return err
}

if pubTableCount != len(tableNames) {
return errors.New("not all tables belong to publication")
}
}

return nil
}

func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error {
if c.conn == nil {
return errors.New("check replication permissions: conn is nil")
}

var replicationRes bool
err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = $1", username).Scan(&replicationRes)
if err != nil {
return err
}

if !replicationRes {
// RDS case: check pg_settings for rds.logical_replication
var setting string
err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting)
if err != nil || setting != "on" {
return errors.New("postgres user does not have replication role")
}
}

// check wal_level
var walLevel string
err = c.conn.QueryRow(ctx, "SHOW wal_level").Scan(&walLevel)
if err != nil {
return err
}

if walLevel != "logical" {
return fmt.Errorf("wal_level is not logical")
}

// max_wal_senders must be at least 2
var maxWalSendersRes string
err = c.conn.QueryRow(ctx, "SHOW max_wal_senders").Scan(&maxWalSendersRes)
if err != nil {
return err
}

maxWalSenders, err := strconv.Atoi(maxWalSendersRes)
if err != nil {
return err
}

if maxWalSenders < 2 {
return fmt.Errorf("max_wal_senders must be at least 2")
}

return nil
}

0 comments on commit 4ee863a

Please sign in to comment.