diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 27eeaf3a11..e778b7c5fa 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -994,7 +994,7 @@ func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublic additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier) } - // just check if we have all the tables already in the publication + // just check if we have all the tables already in the publication for custom publications if req.PublicationName != "" { rows, err := c.conn.Query(c.ctx, "SELECT tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName) @@ -1011,13 +1011,20 @@ func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublic return fmt.Errorf("some additional tables not present in custom publication: %s", strings.Join(notPresentTables, ", ")) } + } else { + for _, additionalSrcTable := range additionalSrcTables { + _, err := c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s", + utils.QuoteIdentifier(c.getDefaultPublicationName(req.FlowJobName)), + utils.QuoteIdentifier(additionalSrcTable))) + // don't error out if table is already added to our publication + if err != nil && !strings.Contains(err.Error(), "SQLSTATE 42710") { + return fmt.Errorf("failed to alter publication: %w", err) + } + c.logger.Info("added table to publication", + slog.String("publication", c.getDefaultPublicationName(req.FlowJobName)), + slog.String("table", additionalSrcTable)) + } } - additionalSrcTablesString := strings.Join(additionalSrcTables, ",") - _, err := c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s", - c.getDefaultPublicationName(req.FlowJobName), additionalSrcTablesString)) - if err != nil { - return fmt.Errorf("failed to alter publication: %w", err) - } return nil } diff --git a/flow/shared/additional_tables.go b/flow/shared/additional_tables.go index 4fd0a874b2..00dc8efa42 100644 --- a/flow/shared/additional_tables.go +++ b/flow/shared/additional_tables.go @@ -18,8 +18,8 @@ func AdditionalTablesHasOverlap(currentTableMappings []*protos.TableMapping, currentDstTables = append(currentDstTables, currentTableMapping.DestinationTableIdentifier) } for _, additionalTableMapping := range additionalTableMappings { - currentSrcTables = append(currentSrcTables, additionalTableMapping.SourceTableIdentifier) - currentDstTables = append(currentDstTables, additionalTableMapping.DestinationTableIdentifier) + additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier) + additionalDstTables = append(additionalDstTables, additionalTableMapping.DestinationTableIdentifier) } return utils.ArraysHaveOverlap(currentSrcTables, additionalSrcTables) ||