From c64d77c2ce2662f7e3aa80dcf5b04d749dff7610 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 18 Mar 2024 23:33:40 +0530 Subject: [PATCH 1/4] UI: Redirect to peers page by default, add buttons there (#1498) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Now users do not land on the Home page. The landing route is the peers page (`/peers`), where if there are no peers, the below look holds: Screenshot 2024-03-18 at 10 09 31 PM --- ui/app/peers/page.tsx | 68 +++++++++++++++++++++++++++++++++---- ui/app/peers/peersTable.tsx | 3 +- ui/next.config.js | 9 +++++ 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index d7812e54c0..48394acde7 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -15,7 +15,8 @@ import useSWR from 'swr'; import { fetcher } from '../utils/swr'; export default function Peers() { - const { data: peers, error, isLoading } = useSWR('/api/peers', fetcher); + const peers: any[] = []; + const { data, error, isLoading } = useSWR('/api/peers', fetcher); return ( @@ -39,6 +40,10 @@ export default function Peers() { > Peers + {isLoading && ( @@ -46,12 +51,61 @@ export default function Peers() { )} - {!isLoading && ( - peer)} - /> - )} + {!isLoading && + (peers && peers.length == 0 ? ( +
+ + + +
+ ) : ( + peer)} /> + ))}
); diff --git a/ui/app/peers/peersTable.tsx b/ui/app/peers/peersTable.tsx index a49cbd2e53..a592d232f3 100644 --- a/ui/app/peers/peersTable.tsx +++ b/ui/app/peers/peersTable.tsx @@ -33,7 +33,7 @@ function PeerRow({ peer }: { peer: Peer }) { ); } -function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { +function PeersTable({ peers }: { peers: Peer[] }) { const [searchQuery, setSearchQuery] = useState(''); const [filteredType, setFilteredType] = useState( undefined @@ -65,7 +65,6 @@ function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { return ( {title}} toolbar={{ left: <>, right: ( diff --git a/ui/next.config.js b/ui/next.config.js index a42a10c727..5212d4780c 100644 --- a/ui/next.config.js +++ b/ui/next.config.js @@ -3,6 +3,15 @@ const nextConfig = { compiler: { styledComponents: true, }, + async redirects() { + return [ + { + source: '/', + destination: '/peers', + permanent: false, + }, + ]; + }, reactStrictMode: true, swcMinify: true, output: 'standalone', From 7eae988704ebc8b510975e505967983269130995 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 18 Mar 2024 23:35:33 +0530 Subject: [PATCH 2/4] Revert "UI: Redirect to peers page by default, add buttons there" (#1500) Reverts PeerDB-io/peerdb#1498 --- ui/app/peers/page.tsx | 68 ++++--------------------------------- ui/app/peers/peersTable.tsx | 3 +- ui/next.config.js | 9 ----- 3 files changed, 9 insertions(+), 71 deletions(-) diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index 48394acde7..d7812e54c0 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -15,8 +15,7 @@ import useSWR from 'swr'; import { fetcher } from '../utils/swr'; export default function Peers() { - const peers: any[] = []; - const { data, error, isLoading } = useSWR('/api/peers', fetcher); + const { data: peers, error, isLoading } = useSWR('/api/peers', fetcher); return ( @@ -40,10 +39,6 @@ export default function Peers() { > Peers - {isLoading && ( @@ -51,61 +46,12 @@ export default function Peers() { )} - {!isLoading && - (peers && peers.length == 0 ? ( -
- - - -
- ) : ( - peer)} /> - ))} + {!isLoading && ( + peer)} + /> + )}
); diff --git a/ui/app/peers/peersTable.tsx b/ui/app/peers/peersTable.tsx index a592d232f3..a49cbd2e53 100644 --- a/ui/app/peers/peersTable.tsx +++ b/ui/app/peers/peersTable.tsx @@ -33,7 +33,7 @@ function PeerRow({ peer }: { peer: Peer }) { ); } -function PeersTable({ peers }: { peers: Peer[] }) { +function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { const [searchQuery, setSearchQuery] = useState(''); const [filteredType, setFilteredType] = useState( undefined @@ -65,6 +65,7 @@ function PeersTable({ peers }: { peers: Peer[] }) { return (
{title}} toolbar={{ left: <>, right: ( diff --git a/ui/next.config.js b/ui/next.config.js index 5212d4780c..a42a10c727 100644 --- a/ui/next.config.js +++ b/ui/next.config.js @@ -3,15 +3,6 @@ const nextConfig = { compiler: { styledComponents: true, }, - async redirects() { - return [ - { - source: '/', - destination: '/peers', - permanent: false, - }, - ]; - }, reactStrictMode: true, swcMinify: true, output: 'standalone', From bb87b15e37b368fc242a98bb95923264487fea6b Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 19 Mar 2024 00:26:00 +0530 Subject: [PATCH 3/4] UI: New default page and peer buttons (#1501) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Screenshot 2024-03-18 at 10 09 31 PM --- ui/app/peers/page.tsx | 65 +++++++++++++++++++++++++++++++++---- ui/app/peers/peersTable.tsx | 3 +- ui/next.config.js | 9 +++++ 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index d7812e54c0..2170712950 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -39,6 +39,10 @@ export default function Peers() { > Peers + {isLoading && ( @@ -46,12 +50,61 @@ export default function Peers() { )} - {!isLoading && ( - peer)} - /> - )} + {!isLoading && + (peers && peers.length == 0 ? ( +
+ + + +
+ ) : ( + peer)} /> + ))}
); diff --git a/ui/app/peers/peersTable.tsx b/ui/app/peers/peersTable.tsx index a49cbd2e53..a592d232f3 100644 --- a/ui/app/peers/peersTable.tsx +++ b/ui/app/peers/peersTable.tsx @@ -33,7 +33,7 @@ function PeerRow({ peer }: { peer: Peer }) { ); } -function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { +function PeersTable({ peers }: { peers: Peer[] }) { const [searchQuery, setSearchQuery] = useState(''); const [filteredType, setFilteredType] = useState( undefined @@ -65,7 +65,6 @@ function PeersTable({ title, peers }: { title: string; peers: Peer[] }) { return (
{title}} toolbar={{ left: <>, right: ( diff --git a/ui/next.config.js b/ui/next.config.js index a42a10c727..5212d4780c 100644 --- a/ui/next.config.js +++ b/ui/next.config.js @@ -3,6 +3,15 @@ const nextConfig = { compiler: { styledComponents: true, }, + async redirects() { + return [ + { + source: '/', + destination: '/peers', + permanent: false, + }, + ]; + }, reactStrictMode: true, swcMinify: true, output: 'standalone', From 9abd6b6e061fbc8fc4be5d8a9bfbd1be644b9f04 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 19 Mar 2024 01:58:46 +0530 Subject: [PATCH 4/4] [snapshot] fallback to full partitions in 0 { @@ -161,7 +177,7 @@ func (s *SnapshotFlowExecution) cloneTable( SourcePeer: sourcePostgres, DestinationPeer: s.config.Destination, Query: query, - WatermarkColumn: partitionCol, + WatermarkColumn: mapping.PartitionKey, WatermarkTable: srcName, InitialCopyOnly: true, DestinationTableIdentifier: dstName, @@ -182,23 +198,36 @@ func (s *SnapshotFlowExecution) cloneTable( func (s *SnapshotFlowExecution) cloneTables( ctx workflow.Context, - slotInfo *protos.SetupReplicationOutput, - maxParallelClones int, + cloneTablesInput *cloneTablesInput, ) error { - s.logger.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", - slotInfo.SlotName, slotInfo.SnapshotName)) + if cloneTablesInput.snapshotType == SNAPSHOT_TYPE_SLOT { + s.logger.Info(fmt.Sprintf("cloning tables for slot name %s and snapshotName %s", + cloneTablesInput.slotName, cloneTablesInput.snapshotName)) + } else if cloneTablesInput.snapshotType == SNAPSHOT_TYPE_TX { + s.logger.Info("cloning tables in txn snapshot mode with snapshotName " + + cloneTablesInput.snapshotName) + } - boundSelector := concurrency.NewBoundSelector(maxParallelClones) + boundSelector := concurrency.NewBoundSelector(cloneTablesInput.maxParallelClones) + defaultPartitionCol := "ctid" + if !cloneTablesInput.supportsTIDScans { + s.logger.Info("Postgres version too old for TID scans, might use full table partitions!") + defaultPartitionCol = "" + } + + snapshotName := cloneTablesInput.snapshotName for _, v := range s.config.TableMappings { source := v.SourceTableIdentifier destination := v.DestinationTableIdentifier - snapshotName := slotInfo.SnapshotName s.logger.Info(fmt.Sprintf( "Cloning table with source table %s and destination table name %s", source, destination), slog.String("snapshotName", snapshotName), ) + if v.PartitionKey == "" { + v.PartitionKey = defaultPartitionCol + } err := s.cloneTable(ctx, boundSelector, snapshotName, v) if err != nil { s.logger.Error("failed to start clone child workflow: ", err) @@ -227,7 +256,13 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot( } logger.Info(fmt.Sprintf("cloning %d tables in parallel", numTablesInParallel)) - if err := s.cloneTables(ctx, slotInfo, numTablesInParallel); err != nil { + if err := s.cloneTables(ctx, &cloneTablesInput{ + snapshotType: SNAPSHOT_TYPE_SLOT, + slotName: slotInfo.SlotName, + snapshotName: slotInfo.SnapshotName, + supportsTIDScans: slotInfo.SupportsTidScans, + maxParallelClones: numTablesInParallel, + }); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } @@ -246,7 +281,8 @@ func SnapshotFlowWorkflow( se := &SnapshotFlowExecution{ config: config, tableNameSchemaMapping: tableNameSchemaMapping, - logger: log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)), + logger: log.With(workflow.GetLogger(ctx), + slog.String(string(shared.FlowNameKey), config.FlowJobName)), } numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1)) @@ -299,7 +335,7 @@ func SnapshotFlowWorkflow( ) var sessionError error - var snapshotName string + var txnSnapshotState *activities.TxSnapshotState sessionSelector := workflow.NewNamedSelector(ctx, "ExportSnapshotSetup") sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { // MaintainTx should never exit without an error before this point @@ -307,7 +343,7 @@ func SnapshotFlowWorkflow( }) sessionSelector.AddFuture(fExportSnapshot, func(f workflow.Future) { // Happy path is waiting for this to return without error - sessionError = f.Get(exportCtx, &snapshotName) + sessionError = f.Get(exportCtx, &txnSnapshotState) }) sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { sessionError = ctx.Err() @@ -317,11 +353,13 @@ func SnapshotFlowWorkflow( return sessionError } - slotInfo := &protos.SetupReplicationOutput{ - SlotName: "peerdb_initial_copy_only", - SnapshotName: snapshotName, - } - if err := se.cloneTables(ctx, slotInfo, int(config.SnapshotNumTablesInParallel)); err != nil { + if err := se.cloneTables(ctx, &cloneTablesInput{ + snapshotType: SNAPSHOT_TYPE_TX, + slotName: "", + snapshotName: txnSnapshotState.SnapshotName, + supportsTIDScans: txnSnapshotState.SupportsTIDScans, + maxParallelClones: numTablesInParallel, + }); err != nil { return fmt.Errorf("failed to clone tables: %w", err) } } else if err := se.cloneTablesWithSlot(ctx, sessionCtx, numTablesInParallel); err != nil { diff --git a/protos/flow.proto b/protos/flow.proto index e872b0fa17..f33b0696db 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -144,6 +144,7 @@ message SetupReplicationInput { message SetupReplicationOutput { string slot_name = 1; string snapshot_name = 2; + bool supports_tid_scans = 3; } message CreateRawTableInput { @@ -394,3 +395,8 @@ message IsQRepPartitionSyncedInput { string partition_id = 2; } +message ExportTxSnapshotOutput { + string snapshot_name = 1; + bool supports_tid_scans = 2; +} +