diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 9a9338fc6..ab3b39068 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgxpool" "go.temporal.io/sdk/activity" @@ -82,7 +83,16 @@ func (a *SnapshotActivity) SetupReplication( logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName)) } - // TODO if record already exists, need to remove slot + var slotName string + if err := a.CatalogPool.QueryRow(ctx, "select slot_name from snapshot_names").Scan(&slotName); err == nil && slotName != "" { + if err := conn.ExecuteCommand( + ctx, + "select pg_drop_replication_slot($1)", + slotName, + ); err != nil && !shared.IsSQLStateError(err, pgerrcode.UndefinedObject) { + return fmt.Errorf("Failed to drop slot from previous run: %w", err) + } + } if _, err := a.CatalogPool.Exec(ctx, `insert into snapshot_names (flow_name, slot_name, snapshot_name, supports_tid_scan) values ($1, $2, $3, $4) on conflict (flow_name) do update set slot_name = $2, snapshot_name = $3, supports_tid_scan = $4`, diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 0d163f173..c164b5a62 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -668,8 +668,8 @@ func (c *PostgresConnector) checkIfTableExistsWithTx( return result.Bool, nil } -func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string) error { - _, err := c.conn.Exec(ctx, command) +func (c *PostgresConnector) ExecuteCommand(ctx context.Context, command string, args ...any) error { + _, err := c.conn.Exec(ctx, command, args...) return err }