diff --git a/flow/cmd/validate_mirror.go b/flow/cmd/validate_mirror.go index 199b9552ab..84527fbe94 100644 --- a/flow/cmd/validate_mirror.go +++ b/flow/cmd/validate_mirror.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "log/slog" @@ -17,12 +18,12 @@ func (h *FlowRequestHandler) ValidateCDCMirror( slog.Error("/validatecdc connection configs is nil") return &protos.ValidateCDCMirrorResponse{ Ok: false, - }, fmt.Errorf("connection configs is nil") + }, errors.New("connection configs is nil") } sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig() if sourcePeerConfig == nil { slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source)) - return nil, fmt.Errorf("source peer config is nil") + return nil, errors.New("source peer config is nil") } pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig) @@ -63,15 +64,8 @@ func (h *FlowRequestHandler) ValidateCDCMirror( } pubName := req.ConnectionConfigs.PublicationName - if pubName == "" { - pubErr := pgPeer.CheckPublicationPermission(ctx, sourceTables) - if pubErr != nil { - return &protos.ValidateCDCMirrorResponse{ - Ok: false, - }, fmt.Errorf("failed to check publication permission: %v", pubErr) - } - } else { - err = pgPeer.CheckSourceTables(ctx, sourceTables, req.ConnectionConfigs.PublicationName) + if pubName != "" { + err = pgPeer.CheckSourceTables(ctx, sourceTables, pubName) if err != nil { return &protos.ValidateCDCMirrorResponse{ Ok: false, diff --git a/flow/connectors/postgres/validate.go b/flow/connectors/postgres/validate.go index c4b84085dd..4928e7c6d5 100644 --- a/flow/connectors/postgres/validate.go +++ b/flow/connectors/postgres/validate.go @@ -111,43 +111,6 @@ func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, use return nil } -func (c *PostgresConnector) CheckPublicationPermission(ctx context.Context, tableNames []*utils.SchemaTable) error { - var hasSuper bool - var canCreateDatabase bool - queryErr := c.conn.QueryRow(ctx, fmt.Sprintf(` - SELECT - rolsuper, - has_database_privilege(rolname, current_database(), 'CREATE') AS can_create_database - FROM pg_roles - WHERE rolname = %s; - `, QuoteLiteral(c.config.User))).Scan(&hasSuper, &canCreateDatabase) - if queryErr != nil { - return fmt.Errorf("error while checking user privileges: %w", queryErr) - } - - if !hasSuper && !canCreateDatabase { - return errors.New("user does not have superuser or create database privileges") - } - - if !hasSuper { - // for each table, check if the user is an owner - for _, table := range tableNames { - var owner string - err := c.conn.QueryRow(ctx, fmt.Sprintf("SELECT tableowner FROM pg_tables WHERE schemaname=%s AND tablename=%s", - QuoteLiteral(table.Schema), QuoteLiteral(table.Table))).Scan(&owner) - if err != nil { - return fmt.Errorf("error while checking table owner: %w", err) - } - - if owner != c.config.User { - return fmt.Errorf("user %s is not the owner of table %s", c.config.User, table.String()) - } - } - } - - return nil -} - func (c *PostgresConnector) CheckReplicationConnectivity(ctx context.Context) error { // Check if we can create a replication connection conn, err := c.CreateReplConn(ctx)