From 9abd6b6e061fbc8fc4be5d8a9bfbd1be644b9f04 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 19 Mar 2024 01:58:46 +0530 Subject: [PATCH] [snapshot] fallback to full partitions in 0 { @@ -161,7 +177,7 @@ func (s *SnapshotFlowExecution) cloneTable( SourcePeer: sourcePostgres, DestinationPeer: s.config.Destination, Query: query, - WatermarkColumn: partitionCol, + WatermarkColumn: mapping.PartitionKey, WatermarkTable: srcName, InitialCopyOnly: true, DestinationTableIdentifier: dstName, @@ -182,23 +198,36 @@ func (s *SnapshotFlowExecution) cloneTable( func (s *SnapshotFlowExecution) cloneTables( ctx workflow.Context, - slotInfo *protos.SetupReplicationOutput, - maxParallelClones int, + cloneTablesInput *cloneTablesInput, ) error { - s.logger.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", - slotInfo.SlotName, slotInfo.SnapshotName)) + if cloneTablesInput.snapshotType == SNAPSHOT_TYPE_SLOT { + s.logger.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", + cloneTablesInput.slotName, cloneTablesInput.snapshotName)) + } else if cloneTablesInput.snapshotType == SNAPSHOT_TYPE_TX { + s.logger.Info("cloning tables in txn snapshot mode with snapshotName " + + cloneTablesInput.snapshotName) + } - boundSelector := concurrency.NewBoundSelector(maxParallelClones) + boundSelector := concurrency.NewBoundSelector(cloneTablesInput.maxParallelClones) + defaultPartitionCol := "ctid" + if !cloneTablesInput.supportsTIDScans { + s.logger.Info("Postgres version too old for TID scans, might use full table partitions!") + defaultPartitionCol = "" + } + + snapshotName := cloneTablesInput.snapshotName for _, v := range s.config.TableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier - snapshotName := slotInfo.SnapshotName s.logger.Info(fmt.Sprintf( "Cloning table with source table %s and destination table name %s", source, destination), slog.String("snapshotName", snapshotName), ) + if v.PartitionKey == "" { + v.PartitionKey = defaultPartitionCol + } err := s.cloneTable(ctx, boundSelector, snapshotName, v) if err != nil { s.logger.Error("failed to start clone child workflow: ", err) @@ -227,7 +256,13 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( } logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) - if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil { + if err := s.cloneTables(ctx, &cloneTablesInput{ + snapshotType: SNAPSHOT_TYPE_SLOT, + slotName: slotInfo.SlotName, + snapshotName: slotInfo.SnapshotName, + supportsTIDScans: slotInfo.SupportsTidScans, + maxParallelClones: numTablesInParallel, + }); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } @@ -246,7 +281,8 @@ func SnapshotFlowWorkflow( se := &SnapshotFlowExecution{ config: config, tableNameSchemaMapping: tableNameSchemaMapping, - logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), + logger: log.With(workflow.GetLogger(ctx), + slog.String(string(shared.FlowNameKey), config.FlowJobName)), } numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1)) @@ -299,7 +335,7 @@ func SnapshotFlowWorkflow( ) var sessionError error - var snapshotName string + var txnSnapshotState *activities.TxSnapshotState sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup") sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { // MaintainTx should never exit without an error before this point @@ -307,7 +343,7 @@ func SnapshotFlowWorkflow( }) sessionSelector.AddFuture(fExportSnapshot, func(f workflow.Future) { // Happy path is waiting for this to return without error - sessionError = f.Get(exportCtx, &snapshotName) + sessionError = f.Get(exportCtx, &txnSnapshotState) }) sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { sessionError = ctx.Err() @@ -317,11 +353,13 @@ func SnapshotFlowWorkflow( return sessionError } - slotInfo := &protos.SetupReplicationOutput{ - SlotName: "peerdb_initial_copy_only", - SnapshotName: snapshotName, - } - if err := se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel)); err != nil { + if err := se.cloneTables(ctx, &cloneTablesInput{ + snapshotType: SNAPSHOT_TYPE_TX, + slotName: "", + snapshotName: txnSnapshotState.SnapshotName, + supportsTIDScans: txnSnapshotState.SupportsTIDScans, + maxParallelClones: numTablesInParallel, + }); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } } else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil { diff --git a/protos/flow.proto b/protos/flow.proto index e872b0fa17..f33b0696db 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -144,6 +144,7 @@ message SetupReplicationInput { message SetupReplicationOutput { string slot_name = 1; string snapshot_name = 2; + bool supports_tid_scans = 3; } message CreateRawTableInput { @@ -394,3 +395,8 @@ message IsQRepPartitionSyncedInput { string partition_id = 2; } +message ExportTxSnapshotOutput { + string snapshot_name = 1; + bool supports_tid_scans = 2; +} +