Skip to content

Commit

Permalink
[snapshot] fallback to full partitions in <PG12, cleanup (#1499)
Browse files Browse the repository at this point in the history
separating slot and txn snapshots better, split state and make things
clearer
TID scans not supported in PG12, falling back to full table partitions
for now, might add support for custom partition keys later
  • Loading branch information
heavycrystal authored Mar 18, 2024
1 parent bb87b15 commit 9abd6b6
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 80 deletions.
6 changes: 0 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type CheckConnectionResult struct {
NeedsSetupMetadataTables bool
}

type SlotSnapshotSignal struct {
signal connpostgres.SlotSignal
snapshotName string
connector connectors.CDCPullConnector
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
Expand Down
70 changes: 44 additions & 26 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,33 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

type SlotSnapshotState struct {
snapshotName string
signal connpostgres.SlotSignal
connector connectors.CDCPullConnector
}

type TxSnapshotState struct {
SnapshotName string
SupportsTIDScans bool
}

type SnapshotActivity struct {
SnapshotConnectionsMutex sync.Mutex
SnapshotConnections map[string]SlotSnapshotSignal
Alerter *alerting.Alerter
SnapshotStatesMutex sync.Mutex
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
Alerter *alerting.Alerter
}

// closes the slot signal
func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName string) error {
a.SnapshotConnectionsMutex.Lock()
defer a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
defer a.SnapshotStatesMutex.Unlock()

if s, ok := a.SnapshotConnections[flowJobName]; ok {
if s, ok := a.SlotSnapshotStates[flowJobName]; ok {
close(s.signal.CloneComplete)
connectors.CloseConnector(ctx, s.connector)
delete(a.SnapshotConnections, flowJobName)
delete(a.SlotSnapshotStates, flowJobName)
}
a.Alerter.LogFlowEvent(ctx, flowJobName, "Ended Snapshot Flow Job")

Expand Down Expand Up @@ -94,18 +106,19 @@ func (a *SnapshotActivity) SetupReplication(
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
}

a.SnapshotConnectionsMutex.Lock()
defer a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
defer a.SnapshotStatesMutex.Unlock()

a.SnapshotConnections[config.FlowJobName] = SlotSnapshotSignal{
a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{
signal: slotSignal,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}

return &protos.SetupReplicationOutput{
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
SlotName: slotInfo.SlotName,
SnapshotName: slotInfo.SnapshotName,
SupportsTidScans: slotInfo.SupportsTIDScans,
}, nil
}

Expand All @@ -116,49 +129,54 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
}
defer connectors.CloseConnector(ctx, conn)

snapshotName, tx, err := conn.ExportSnapshot(ctx)
exportSnapshotOutput, tx, err := conn.ExportTxSnapshot(ctx)
if err != nil {
return err
}

sss := SlotSnapshotSignal{snapshotName: snapshotName}
a.SnapshotConnectionsMutex.Lock()
a.SnapshotConnections[sessionID] = sss
a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
a.TxSnapshotStates[sessionID] = TxSnapshotState{
SnapshotName: exportSnapshotOutput.SnapshotName,
SupportsTIDScans: exportSnapshotOutput.SupportsTidScans,
}
a.SnapshotStatesMutex.Unlock()

logger := activity.GetLogger(ctx)
start := time.Now()
for {
msg := fmt.Sprintf("maintaining export snapshot transaction %s", time.Since(start).Round(time.Second))
logger.Info(msg)
// this function relies on context cancellation to exit
// context is not explicitly cancelled, but workflow exit triggers an implicit cancel
// from activity.RecordBeat
activity.RecordHeartbeat(ctx, msg)
if ctx.Err() != nil {
a.SnapshotConnectionsMutex.Lock()
delete(a.SnapshotConnections, sessionID)
a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
delete(a.TxSnapshotStates, sessionID)
a.SnapshotStatesMutex.Unlock()
return conn.FinishExport(tx)
}
time.Sleep(time.Minute)
}
}

func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (string, error) {
func (a *SnapshotActivity) WaitForExportSnapshot(ctx context.Context, sessionID string) (*TxSnapshotState, error) {
logger := activity.GetLogger(ctx)
attempt := 0
for {
a.SnapshotConnectionsMutex.Lock()
sss, ok := a.SnapshotConnections[sessionID]
a.SnapshotConnectionsMutex.Unlock()
a.SnapshotStatesMutex.Lock()
tsc, ok := a.TxSnapshotStates[sessionID]
a.SnapshotStatesMutex.Unlock()
if ok {
return sss.snapshotName, nil
return &tsc, nil
}
activity.RecordHeartbeat(ctx, "wait another second for snapshot export")
attempt += 1
if attempt > 2 {
logger.Info("waiting on snapshot export", slog.Int("attempt", attempt))
}
if err := ctx.Err(); err != nil {
return "", err
return nil, err
}
time.Sleep(time.Second)
}
Expand Down
6 changes: 4 additions & 2 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
})

w.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
// explicitly not initializing mutex, in line with design
w.RegisterActivity(&activities.SnapshotActivity{
SnapshotConnections: make(map[string]activities.SlotSnapshotSignal),
Alerter: alerting.NewAlerter(context.Background(), conn),
SlotSnapshotStates: make(map[string]activities.SlotSnapshotState),
TxSnapshotStates: make(map[string]activities.TxSnapshotState),
Alerter: alerting.NewAlerter(context.Background(), conn),
})

return c, w, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CDCPullConnector interface {

// For InitialSnapshotOnly correctness without replication slot
// `any` is for returning transaction if necessary
ExportSnapshot(context.Context) (string, any, error)
ExportTxSnapshot(context.Context) (*protos.ExportTxSnapshotOutput, any, error)

// `any` from ExportSnapshot passed here when done, allowing transaction to commit
FinishExport(any) error
Expand Down
23 changes: 15 additions & 8 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ func (c *PostgresConnector) createSlotAndPublication(

c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot))

_, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0")
_, err = conn.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0")
if err != nil {
return fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)
}

_, err = conn.Exec(ctx, "SET lock_timeout = 0")
_, err = conn.Exec(ctx, "SET LOCAL lock_timeout=0")
if err != nil {
return fmt.Errorf("[slot] error setting lock_timeout: %w", err)
}
Expand All @@ -388,11 +388,17 @@ func (c *PostgresConnector) createSlotAndPublication(
return fmt.Errorf("[slot] error creating replication slot: %w", err)
}

ok, _, err := c.MajorVersionCheck(ctx, POSTGRES_13)
if err != nil {
return fmt.Errorf("[slot] error getting PG version: %w", err)
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SupportsTIDScans: ok,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
Expand All @@ -405,9 +411,10 @@ func (c *PostgresConnector) createSlotAndPublication(
e = errors.New("slot already exists")
}
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: e,
SlotName: slot,
SnapshotName: "",
Err: e,
SupportsTIDScans: false,
}
signal.SlotCreated <- slotDetails
}
Expand Down
39 changes: 29 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,30 +941,49 @@ func (c *PostgresConnector) EnsurePullability(
return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
}

func (c *PostgresConnector) ExportSnapshot(ctx context.Context) (string, any, error) {
func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context) (*protos.ExportTxSnapshotOutput, any, error) {
var snapshotName string
tx, err := c.conn.Begin(ctx)
if err != nil {
return "", nil, err
return nil, nil, err
}
txNeedsRollback := true
defer func() {
if txNeedsRollback {
rollbackCtx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancelFunc()
err := tx.Rollback(rollbackCtx)
if err != pgx.ErrTxClosed {
c.logger.Error("error while rolling back transaction for snapshot export")
}
}
}()

_, err = tx.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0")
if err != nil {
return nil, nil, fmt.Errorf("[export-snapshot] error setting idle_in_transaction_session_timeout: %w", err)
}

_, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout = 0")
_, err = tx.Exec(ctx, "SET LOCAL lock_timeout=0")
if err != nil {
return "", nil, fmt.Errorf("[export-snapshot] error setting idle_in_transaction_session_timeout: %w", err)
return nil, nil, fmt.Errorf("[export-snapshot] error setting lock_timeout: %w", err)
}

_, err = tx.Exec(ctx, "SET lock_timeout = 0")
ok, _, err := c.MajorVersionCheck(ctx, POSTGRES_13)
if err != nil {
return "", nil, fmt.Errorf("[export-snapshot] error setting lock_timeout: %w", err)
return nil, nil, fmt.Errorf("[export-snapshot] error getting PG version: %w", err)
}

err = tx.QueryRow(ctx, "select pg_export_snapshot()").Scan(&snapshotName)
err = tx.QueryRow(ctx, "SELECT pg_export_snapshot()").Scan(&snapshotName)
if err != nil {
_ = tx.Rollback(ctx)
return "", nil, err
return nil, nil, err
}
txNeedsRollback = false

return snapshotName, tx, err
return &protos.ExportTxSnapshotOutput{
SnapshotName: snapshotName,
SupportsTidScans: ok,
}, tx, err
}

func (c *PostgresConnector) FinishExport(tx any) error {
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/postgres/slot_signal.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package connpostgres

type SlotCreationResult struct {
SlotName string
SnapshotName string
Err error
SlotName string
SnapshotName string
SupportsTIDScans bool
Err error
}

// This struct contains two signals.
Expand Down
Loading

0 comments on commit 9abd6b6

Please sign in to comment.