Skip to content

Commit

Permalink
drop replication slot when overwriting slot_name in snapshot_names
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Oct 22, 2024
1 parent a850f6c commit 0242dc2
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
12 changes: 11 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

Expand Down Expand Up @@ -82,7 +83,16 @@ func (a *SnapshotActivity) SetupReplication(
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
}

// TODO if record already exists, need to remove slot
var slotName string
if err := a.CatalogPool.QueryRow(ctx, "select slot_name from snapshot_names").Scan(&slotName); err == nil && slotName != "" {
if err := conn.ExecuteCommand(
ctx,
"select pg_drop_replication_slot($1)",
slotName,
); err != nil && !shared.IsSQLStateError(err, pgerrcode.UndefinedObject) {
return fmt.Errorf("Failed to drop slot from previous run: %w", err)

Check failure on line 93 in flow/activities/snapshot_activity.go

View workflow job for this annotation

GitHub Actions / lint

ST1005: error strings should not be capitalized (stylecheck)
}
}
if _, err := a.CatalogPool.Exec(ctx,
`insert into snapshot_names (flow_name, slot_name, snapshot_name, supports_tid_scan) values ($1, $2, $3, $4)
on conflict (flow_name) do update set slot_name = $2, snapshot_name = $3, supports_tid_scan = $4`,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ func (c *PostgresConnector) checkIfTableExistsWithTx(
return result.Bool, nil
}

func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error {
_, err := c.conn.Exec(ctx, command)
func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string, args ...any) error {
_, err := c.conn.Exec(ctx, command, args...)
return err
}

Expand Down

0 comments on commit 0242dc2

Please sign in to comment.