Skip to content

Commit

Permalink
Refactor snapshot flow for slot based snapshotting
Browse files Browse the repository at this point in the history
This lays the ground work for cases where we want a consistent view
without creating a slot by exposing a transaction snapshot
  • Loading branch information
iskakaushik committed Jan 3, 2024
1 parent 7165db7 commit 1da3546
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,27 @@ func (s *SnapshotFlowExecution) cloneTables(
return nil
}

func (s *SnapshotFlowExecution) cloneTablesWithSlot(
ctx workflow.Context,
sessionCtx workflow.Context,
numTablesInParallel int,
) error {
logger := s.logger
slotInfo, err := s.setupReplication(sessionCtx)
if err != nil {
return fmt.Errorf("failed to setup replication: %w", err)
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
s.cloneTables(ctx, slotInfo, numTablesInParallel)

if err := s.closeSlotKeepAlive(sessionCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
}

return nil
}

func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error {
logger := workflow.GetLogger(ctx)

Expand Down Expand Up @@ -271,21 +292,8 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
}
defer workflow.CompleteSession(sessionCtx)

slotInfo, err := se.setupReplication(sessionCtx)
if err != nil {
return fmt.Errorf("failed to setup replication: %w", err)
}

if slotInfo == nil {
logger.Info("no slot info returned, skipping qrep workflow")
return nil
}

logger.Info("cloning tables in parallel: ", numTablesInParallel)
se.cloneTables(ctx, slotInfo, numTablesInParallel)

if err := se.closeSlotKeepAlive(sessionCtx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil {
return fmt.Errorf("failed to clone slots and create replication slot: %w", err)
}

return nil
Expand Down

0 comments on commit 1da3546

Please sign in to comment.