From 1da3546f95951f473a3ac56b42db6574b22c3814 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 3 Jan 2024 11:39:41 -0500 Subject: [PATCH] Refactor snapshot flow for slot based snapshotting This lays the ground work for cases where we want a consistent view without creating a slot by exposing a transaction snapshot --- flow/workflows/snapshot_flow.go | 38 ++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 87b339d46..addea5266 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -222,6 +222,27 @@ func (s *SnapshotFlowExecution) cloneTables( return nil } +func (s *SnapshotFlowExecution) cloneTablesWithSlot( + ctx workflow.Context, + sessionCtx workflow.Context, + numTablesInParallel int, +) error { + logger := s.logger + slotInfo, err := s.setupReplication(sessionCtx) + if err != nil { + return fmt.Errorf("failed to setup replication: %w", err) + } + + logger.Info("cloning tables in parallel: ", numTablesInParallel) + s.cloneTables(ctx, slotInfo, numTablesInParallel) + + if err := s.closeSlotKeepAlive(sessionCtx); err != nil { + return fmt.Errorf("failed to close slot keep alive: %w", err) + } + + return nil +} + func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionConfigs) error { logger := workflow.GetLogger(ctx) @@ -271,21 +292,8 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon } defer workflow.CompleteSession(sessionCtx) - slotInfo, err := se.setupReplication(sessionCtx) - if err != nil { - return fmt.Errorf("failed to setup replication: %w", err) - } - - if slotInfo == nil { - logger.Info("no slot info returned, skipping qrep workflow") - return nil - } - - logger.Info("cloning tables in parallel: ", numTablesInParallel) - se.cloneTables(ctx, slotInfo, numTablesInParallel) - - if err := se.closeSlotKeepAlive(sessionCtx); err != nil { - return fmt.Errorf("failed to close slot keep alive: %w", err) + if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil { + return fmt.Errorf("failed to clone slots and create replication slot: %w", err) } return nil