Skip to content

Commit

Permalink
Refactor snapshot flow for slot based snapshotting (#974)
Browse files Browse the repository at this point in the history
This lays the ground work for cases where we want a consistent view
without creating a slot by exposing a transaction snapshot
  • Loading branch information
iskakaushik authored Jan 3, 2024
1 parent 7165db7 commit 4bc7879
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,27 @@ func (s *SnapshotFlowExecution) cloneTables(
return nil
}

func (s *SnapshotFlowExecution) cloneTablesWithSlot(
ctx workflow.Context,
sessionCtx workflow.Context,
numTablesInParallel int,
) error {
logger := s.logger
slotInfo, err := s.setupReplication(sessionCtx)
if err != nil {
return fmt.Errorf("failed to setup replication: %w", err)
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
s.cloneTables(ctx, slotInfo, numTablesInParallel)

if err := s.closeSlotKeepAlive(sessionCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
}

return nil
}

func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error {
logger := workflow.GetLogger(ctx)

Expand Down Expand Up @@ -271,21 +292,8 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
}
defer workflow.CompleteSession(sessionCtx)

slotInfo, err := se.setupReplication(sessionCtx)
if err != nil {
return fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo == nil {
logger.Info("no slot info returned, skipping qrep workflow")
return nil
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
se.cloneTables(ctx, slotInfo, numTablesInParallel)

if err := se.closeSlotKeepAlive(sessionCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone slots and create replication slot: %w", err)
}

return nil
Expand Down

0 comments on commit 4bc7879

Please sign in to comment.