From db7cdfe51325e2163569dd3484237d4236ef0345 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 14 Feb 2024 20:36:55 +0530 Subject: [PATCH] fixes to handling of custom publications for AddTablesToPublication (#1289) 1. need to fetch tables schema-qualified for comparison 2. order of ArrayMinus was wrong 3. also add validation for custom publication existing --- flow/connectors/postgres/client.go | 17 +++++++++++++---- flow/connectors/postgres/postgres.go | 4 ++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 72a5691fe5..563e0c6824 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -629,7 +629,7 @@ func (c *PostgresConnector) getDefaultPublicationName(jobName string) string { func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error { if c.conn == nil { - return fmt.Errorf("check tables: conn is nil") + return errors.New("check tables: conn is nil") } // Check that we can select from all tables @@ -649,11 +649,20 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames [] } } - // Check if tables belong to publication tableStr := strings.Join(tableArr, ",") if pubName != "" { + // Check if publication exists + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + if err != nil { + if err == pgx.ErrNoRows { + return fmt.Errorf("publication does not exist: %s", pubName) + } + return fmt.Errorf("error while checking for publication existence: %w", err) + } + + // Check if tables belong to publication var pubTableCount int - err := c.conn.QueryRow(ctx, fmt.Sprintf(` + err = c.conn.QueryRow(ctx, fmt.Sprintf(` with source_table_components (sname, tname) as (values %s) select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables INNER JOIN source_table_components stc @@ -663,7 +672,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames [] } if pubTableCount != len(tableNames) { - return fmt.Errorf("not all tables belong to publication") + return errors.New("not all tables belong to publication") } } diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f41e015672..d81401511b 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -973,7 +973,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro // just check if we have all the tables already in the publication for custom publications if req.PublicationName != "" { rows, err := c.conn.Query(ctx, - "SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName) + "SELECT schemaname || '.' || tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName) if err != nil { return fmt.Errorf("failed to check tables in publication: %w", err) } @@ -982,7 +982,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro if err != nil { return fmt.Errorf("failed to check tables in publication: %w", err) } - notPresentTables := utils.ArrayMinus(tableNames, additionalSrcTables) + notPresentTables := utils.ArrayMinus(additionalSrcTables, tableNames) if len(notPresentTables) > 0 { return fmt.Errorf("some additional tables not present in custom publication: %s", strings.Join(notPresentTables, ", "))