Skip to content

Commit

Permalink
Validate mirror: remove no-pub create check (#1307)
Browse files Browse the repository at this point in the history
1. There isn't a universal way to check for superuser across managed postgres offerings, making permission checking infeasible
2. Not ideal to have the risk of having a stray publication lying around if we go with the create-drop dummy publication route
  • Loading branch information
Amogh-Bharadwaj authored Feb 15, 2024
1 parent d82f3a8 commit 790d305
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 48 deletions.
16 changes: 5 additions & 11 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"log/slog"

Expand All @@ -17,12 +18,12 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
slog.Error("/validatecdc connection configs is nil")
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("connection configs is nil")
}, errors.New("connection configs is nil")
}
sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig()
if sourcePeerConfig == nil {
slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source))
return nil, fmt.Errorf("source peer config is nil")
return nil, errors.New("source peer config is nil")
}

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
Expand Down Expand Up @@ -63,15 +64,8 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
}

pubName := req.ConnectionConfigs.PublicationName
if pubName == "" {
pubErr := pgPeer.CheckPublicationPermission(ctx, sourceTables)
if pubErr != nil {
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, fmt.Errorf("failed to check publication permission: %v", pubErr)
}
} else {
err = pgPeer.CheckSourceTables(ctx, sourceTables, req.ConnectionConfigs.PublicationName)
if pubName != "" {
err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName)
if err != nil {
return &protos.ValidateCDCMirrorResponse{
Ok: false,
Expand Down
37 changes: 0 additions & 37 deletions flow/connectors/postgres/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,43 +111,6 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use
return nil
}

func (c *PostgresConnector) CheckPublicationPermission(ctx context.Context, tableNames []*utils.SchemaTable) error {
var hasSuper bool
var canCreateDatabase bool
queryErr := c.conn.QueryRow(ctx, fmt.Sprintf(`
SELECT
rolsuper,
has_database_privilege(rolname, current_database(), 'CREATE') AS can_create_database
FROM pg_roles
WHERE rolname = %s;
`, QuoteLiteral(c.config.User))).Scan(&hasSuper, &canCreateDatabase)
if queryErr != nil {
return fmt.Errorf("error while checking user privileges: %w", queryErr)
}

if !hasSuper && !canCreateDatabase {
return errors.New("user does not have superuser or create database privileges")
}

if !hasSuper {
// for each table, check if the user is an owner
for _, table := range tableNames {
var owner string
err := c.conn.QueryRow(ctx, fmt.Sprintf("SELECT tableowner FROM pg_tables WHERE schemaname=%s AND tablename=%s",
QuoteLiteral(table.Schema), QuoteLiteral(table.Table))).Scan(&owner)
if err != nil {
return fmt.Errorf("error while checking table owner: %w", err)
}

if owner != c.config.User {
return fmt.Errorf("user %s is not the owner of table %s", c.config.User, table.String())
}
}
}

return nil
}

func (c *PostgresConnector) CheckReplicationConnectivity(ctx context.Context) error {
// Check if we can create a replication connection
conn, err := c.CreateReplConn(ctx)
Expand Down

0 comments on commit 790d305

Please sign in to comment.