Skip to content

Commit

Permalink
fixes to handling of custom publications for AddTablesToPublication
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Feb 14, 2024
1 parent aa095a5 commit 40b6375
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
17 changes: 13 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (c *PostgresConnector) getDefaultPublicationName(jobName string) string {

func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error {
if c.conn == nil {
return fmt.Errorf("check tables: conn is nil")
return errors.New("check tables: conn is nil")
}

// Check that we can select from all tables
Expand All @@ -649,11 +649,20 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []
}
}

// Check if tables belong to publication
tableStr := strings.Join(tableArr, ",")
if pubName != "" {
// Check if publication exists
err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil)
if err != nil {
if err == pgx.ErrNoRows {
return fmt.Errorf("publication does not exist: %s", pubName)
}
return fmt.Errorf("error while checking for publication existence: %w", err)
}

// Check if tables belong to publication
var pubTableCount int
err := c.conn.QueryRow(ctx, fmt.Sprintf(`
err = c.conn.QueryRow(ctx, fmt.Sprintf(`
with source_table_components (sname, tname) as (values %s)
select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables
INNER JOIN source_table_components stc
Expand All @@ -663,7 +672,7 @@ func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []
}

if pubTableCount != len(tableNames) {
return fmt.Errorf("not all tables belong to publication")
return errors.New("not all tables belong to publication")
}
}

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
// just check if we have all the tables already in the publication for custom publications
if req.PublicationName != "" {
rows, err := c.conn.Query(ctx,
"SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
"SELECT schemaname || '.' || tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
Expand All @@ -982,7 +982,7 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
notPresentTables := utils.ArrayMinus(tableNames, additionalSrcTables)
notPresentTables := utils.ArrayMinus(additionalSrcTables, tableNames)
if len(notPresentTables) > 0 {
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
Expand Down

0 comments on commit 40b6375

Please sign in to comment.