Skip to content

Commit

Permalink
fixing bugs with rejecting dups + lenient addpub (#1203)
Browse files Browse the repository at this point in the history
1. fixes an issue where duplicate additional tables were not being
rejected properly
2. made adding tables to publication more lenient, if default pub name
we add tables but don't error out if table is already in publication, if
custom name we just check.
  • Loading branch information
heavycrystal authored Feb 5, 2024
1 parent a2c1f69 commit 42cc3a0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
21 changes: 14 additions & 7 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublic
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
}

// just check if we have all the tables already in the publication
// just check if we have all the tables already in the publication for custom publications
if req.PublicationName != "" {
rows, err := c.conn.Query(c.ctx,
"SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
Expand All @@ -1011,13 +1011,20 @@ func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublic
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
}
} else {
for _, additionalSrcTable := range additionalSrcTables {
_, err := c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
utils.QuoteIdentifier(c.getDefaultPublicationName(req.FlowJobName)),
utils.QuoteIdentifier(additionalSrcTable)))
// don't error out if table is already added to our publication
if err != nil && !strings.Contains(err.Error(), "SQLSTATE 42710") {
return fmt.Errorf("failed to alter publication: %w", err)
}
c.logger.Info("added table to publication",
slog.String("publication", c.getDefaultPublicationName(req.FlowJobName)),
slog.String("table", additionalSrcTable))
}
}

additionalSrcTablesString := strings.Join(additionalSrcTables, ",")
_, err := c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
c.getDefaultPublicationName(req.FlowJobName), additionalSrcTablesString))
if err != nil {
return fmt.Errorf("failed to alter publication: %w", err)
}
return nil
}
4 changes: 2 additions & 2 deletions flow/shared/additional_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping,
currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier)
}
for _, additionalTableMapping := range additionalTableMappings {
currentSrcTables = append(currentSrcTables, additionalTableMapping.SourceTableIdentifier)
currentDstTables = append(currentDstTables, additionalTableMapping.DestinationTableIdentifier)
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier)
}

return utils.ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||
Expand Down

0 comments on commit 42cc3a0

Please sign in to comment.