Skip to content

Commit

Permalink
Postgres: Fix pull cleanup (#1876)
Browse files Browse the repository at this point in the history
Noticed an issue where drop publication if exists fails with something
along the lines of being unable to perform drop publication in a read
only transaction

Manually checking the publication existence first should bypass the
issue. Need to do some functional testing

---------

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and serprex authored Jun 27, 2024
1 parent dd8de22 commit ff0327e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
2 changes: 2 additions & 0 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,13 @@ func (h *FlowRequestHandler) GetPublications(

rows, err := peerConn.Query(ctx, "select pubname from pg_publication;")
if err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}

publications, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
return &protos.PeerPublicationsResponse{PublicationNames: publications}, nil
Expand Down
22 changes: 16 additions & 6 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,18 +1055,28 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error {
// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := "peerflow_slot_" + jobName
_, err := c.conn.Exec(ctx, `SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots
WHERE slot_name=$1`, slotName)
if err != nil {
return fmt.Errorf("error dropping replication slot: %w", err)
}

publicationName := c.getDefaultPublicationName(jobName)

_, err := c.conn.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName)
// check if publication exists manually,
// as drop publication if exists requires permissions
// for a publication which we did not create via peerdb user
var publicationExists bool
err = c.conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname=$1)", publicationName).Scan(&publicationExists)
if err != nil {
return fmt.Errorf("error dropping publication: %w", err)
return fmt.Errorf("error checking if publication exists: %w", err)
}

_, err = c.conn.Exec(ctx, `SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots
WHERE slot_name=$1`, slotName)
if err != nil {
return fmt.Errorf("error dropping replication slot: %w", err)
if publicationExists {
_, err = c.conn.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName)
if err != nil {
return fmt.Errorf("error dropping publication: %w", err)
}
}

return nil
Expand Down

0 comments on commit ff0327e

Please sign in to comment.