diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e63802a7fd..ca195f4c02 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -840,37 +840,21 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error { // Slotname would be the job name prefixed with "peerflow_slot_" - slotName := "peerflow_slot_%s" + jobName + slotName := "peerflow_slot_" + jobName publicationName := c.getDefaultPublicationName(jobName) - pullFlowCleanupTx, err := c.conn.Begin(ctx) - if err != nil { - return fmt.Errorf("error starting transaction for flow cleanup: %w", err) - } - defer func() { - deferErr := pullFlowCleanupTx.Rollback(ctx) - if deferErr != pgx.ErrTxClosed && deferErr != nil { - c.logger.Error("error rolling back transaction for flow cleanup", slog.Any("error", err)) - } - }() - - _, err = pullFlowCleanupTx.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName) + _, err := c.conn.Exec(ctx, "DROP PUBLICATION IF EXISTS "+publicationName) if err != nil { return fmt.Errorf("error dropping publication: %w", err) } - _, err = pullFlowCleanupTx.Exec(ctx, `SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots + _, err = c.conn.Exec(ctx, `SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name=$1`, slotName) if err != nil { return fmt.Errorf("error dropping replication slot: %w", err) } - err = pullFlowCleanupTx.Commit(ctx) - if err != nil { - return fmt.Errorf("error committing transaction for flow cleanup: %w", err) - } - return nil }