diff --git a/ui/app/mirrors/create/cdc.tsx b/ui/app/mirrors/create/cdc.tsx index 45595c7fa..226167b2d 100644 --- a/ui/app/mirrors/create/cdc.tsx +++ b/ui/app/mirrors/create/cdc.tsx @@ -45,29 +45,8 @@ export default function CDCConfigForm(props: MirrorConfigProps) { ); }; const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: string | boolean | Peer | QRepSyncMode = val; - if (setting.label.includes('Peer')) { - stateVal = props.peers.find((peer) => peer.name === val)!; - if (setting.label === 'Destination Peer') { - if (stateVal.type === DBType.POSTGRES) { - props.setter((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - }; - }); - } else if (stateVal.type === DBType.SNOWFLAKE) { - props.setter((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - }; - }); - } - } - } else if (setting.label.includes('Sync Mode')) { + let stateVal: string | boolean | QRepSyncMode = val; + if (setting.label.includes('Sync Mode')) { stateVal = val === 'AVRO' ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO @@ -92,55 +71,6 @@ export default function CDCConfigForm(props: MirrorConfigProps) { return ( <> - - Source Peer - {RequiredIndicator(true)} - - } - action={ -
- - -
- } - /> {props.mirrorConfig.source && ( - {(setting.label.includes('Peer') - ? (props.peers ?? []).map((peer) => peer.name) - : ['AVRO', 'Copy with Binary'] - ).map((item, id) => { + {['AVRO', 'Copy with Binary'].map((item, id) => { return ( {item.toString()} diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts index b1b03e8bd..af79f83c5 100644 --- a/ui/app/mirrors/create/helpers/cdc.ts +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -1,16 +1,7 @@ import { QRepSyncMode } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; import { CDCConfig } from '../../../dto/MirrorsDTO'; import { MirrorSetting } from './common'; export const cdcSettings: MirrorSetting[] = [ - { - label: 'Destination Peer', - stateHandler: (value, setter) => - setter((curr: CDCConfig) => ({ ...curr, destination: value as Peer })), - tips: 'The peer to which data will be replicated.', - type: 'select', - required: true, - }, { label: 'Initial Copy', stateHandler: (value, setter) => @@ -55,10 +46,10 @@ export const cdcSettings: MirrorSetting[] = [ stateHandler: (value, setter) => setter((curr: CDCConfig) => ({ ...curr, - snapshotMaxParallelWorkers: parseInt(value as string, 10) || 8, + snapshotMaxParallelWorkers: parseInt(value as string, 10) || 1, })), tips: 'PeerDB spins up parallel threads for each partition. This setting controls the number of partitions to sync in parallel. The default value is 8.', - default: '8', + default: '1', type: 'number', }, { diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 01fb3bd5d..87e3358a3 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -32,7 +32,7 @@ export const blankCDCSetting: FlowConnectionConfigs = { doInitialCopy: false, publicationName: '', snapshotNumRowsPerPartition: 500000, - snapshotMaxParallelWorkers: 8, + snapshotMaxParallelWorkers: 1, snapshotNumTablesInParallel: 1, snapshotSyncMode: 0, cdcSyncMode: 0, @@ -57,7 +57,7 @@ export const blankQRepSetting: QRepConfig = { syncMode: 0, batchSizeInt: 0, batchDurationSeconds: 0, - maxParallelWorkers: 8, + maxParallelWorkers: 1, waitBetweenBatchesSeconds: 30, writeMode: undefined, stagingPath: '', diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index f0f594bd5..9d518cd98 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -4,30 +4,8 @@ import { QRepWriteMode, QRepWriteType, } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; import { MirrorSetting } from './common'; export const qrepSettings: MirrorSetting[] = [ - { - label: 'Source Peer', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ ...curr, sourcePeer: value as Peer })), - tips: 'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.', - helpfulLink: - 'https://docs.peerdb.io/usecases/Real-time%20CDC/postgres-to-snowflake#prerequisites', - type: 'select', - required: true, - }, - { - label: 'Destination Peer', - stateHandler: (value, setter) => - setter((curr: QRepConfig) => ({ - ...curr, - destinationPeer: value as Peer, - })), - tips: 'The peer to which data will be replicated.', - type: 'select', - required: true, - }, { label: 'Table', stateHandler: (value, setter) => @@ -84,10 +62,10 @@ export const qrepSettings: MirrorSetting[] = [ stateHandler: (value, setter) => setter((curr: QRepConfig) => ({ ...curr, - maxParallelWorkers: parseInt(value as string, 10) || 8, + maxParallelWorkers: parseInt(value as string, 10) || 1, })), tips: 'PeerDB spins up parallel threads for each partition. This setting controls the number of partitions to sync in parallel. The default value is 8.', - default: '8', + default: '1', type: 'number', }, { diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index a58c54c37..94c9a7478 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -1,22 +1,31 @@ 'use client'; -import { QRepConfig } from '@/grpc_generated/flow'; -import { Peer } from '@/grpc_generated/peers'; +import { DBTypeToImageMapping } from '@/components/PeerComponent'; +import { RequiredIndicator } from '@/components/RequiredIndicator'; +import { QRepConfig, QRepSyncMode } from '@/grpc_generated/flow'; +import { DBType, Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; -import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout'; +import { + RowWithRadiobutton, + RowWithSelect, + RowWithTextField, +} from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; +import { Select, SelectItem } from '@/lib/Select'; import { TextField } from '@/lib/TextField'; import { Divider } from '@tremor/react'; +import Image from 'next/image'; import Link from 'next/link'; import { useRouter } from 'next/navigation'; import { useEffect, useState } from 'react'; +import { InfoPopover } from '../../../components/InfoPopover'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import CDCConfigForm from './cdc'; import { handleCreateCDC, handleCreateQRep } from './handlers'; import { cdcSettings } from './helpers/cdc'; -import { blankCDCSetting, blankQRepSetting } from './helpers/common'; +import { blankCDCSetting } from './helpers/common'; import { qrepSettings } from './helpers/qrep'; import QRepConfigForm from './qrep'; import QRepQuery from './query'; @@ -24,7 +33,7 @@ import QRepQuery from './query'; export default function CreateMirrors() { const router = useRouter(); const [mirrorName, setMirrorName] = useState(''); - const [mirrorType, setMirrorType] = useState('CDC'); + const [mirrorType, setMirrorType] = useState(''); const [formMessage, setFormMessage] = useState<{ ok: boolean; msg: string }>({ ok: true, msg: '', @@ -34,7 +43,11 @@ export default function CreateMirrors() { const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); const [sourceSchema, setSourceSchema] = useState('public'); - const [qrepQuery, setQrepQuery] = useState(''); + const [qrepQuery, setQrepQuery] = + useState(`-- Here's a sample template: + SELECT * FROM + WHERE + BETWEEN {{.start}} AND {{.end}}`); useEffect(() => { fetch('/api/peers') @@ -44,7 +57,6 @@ export default function CreateMirrors() { }); if (mirrorType === 'Query Replication' || mirrorType === 'XMIN') { - setConfig(blankQRepSetting); if (mirrorType === 'XMIN') { setConfig((curr) => { return { ...curr, setupWatermarkTableOnDestination: true }; @@ -53,13 +65,51 @@ export default function CreateMirrors() { setConfig((curr) => { return { ...curr, setupWatermarkTableOnDestination: false }; }); - } else setConfig(blankCDCSetting); + } }, [mirrorType]); let listMirrorsPage = () => { router.push('/mirrors'); }; + const handlePeer = (val: string, peerEnd: 'src' | 'dst') => { + const stateVal = peers.find((peer) => peer.name === val)!; + console.log('/handelPeer: val:', stateVal); + if (peerEnd === 'dst') { + if (stateVal.type === DBType.POSTGRES) { + setConfig((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + }; + }); + } else if (stateVal.type === DBType.SNOWFLAKE) { + setConfig((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + }; + }); + } + setConfig((curr) => ({ + ...curr, + destination: stateVal, + destinationPeer: stateVal, + })); + } else { + setConfig((curr) => ({ + ...curr, + source: stateVal, + sourcePeer: stateVal, + })); + } + console.log('/handelPeer: val:', config); + }; + return (
@@ -78,7 +128,9 @@ export default function CreateMirrors() { > Mirror type - setMirrorType(value)}> + setMirrorType(value)} + >
@@ -111,8 +162,8 @@ export default function CreateMirrors() {
@@ -131,12 +182,11 @@ export default function CreateMirrors() { width: '35%', marginLeft: '0.5rem', marginRight: '0.5rem', - height: '20vh', + height: '22vh', display: 'flex', flexDirection: 'column', justifyContent: 'space-between', boxShadow: '2px 2px 4px rgba(0, 0, 0, 0.1)', - backgroundColor: 'ghostwhite', borderRadius: '1rem', }} > @@ -171,12 +221,11 @@ export default function CreateMirrors() { style={{ padding: '1rem', width: '35%', - height: '20vh', + height: '22vh', display: 'flex', flexDirection: 'column', justifyContent: 'space-between', boxShadow: '2px 2px 4px rgba(0, 0, 0, 0.1)', - backgroundColor: 'ghostwhite', borderRadius: '1rem', }} > @@ -217,13 +266,113 @@ export default function CreateMirrors() { /> } /> + + + Source Peer + {RequiredIndicator(true)} + + } + action={ +
+ + +
+ } + /> + + + Destination Peer + {RequiredIndicator(true)} + + } + action={ +
+ + +
+ } + /> {mirrorType === 'Query Replication' && ( )} - + {mirrorType && } {!loading && formMessage.msg.length > 0 && (
); diff --git a/ui/app/mirrors/create/qrep.tsx b/ui/app/mirrors/create/qrep.tsx index ed8677bcb..58806a994 100644 --- a/ui/app/mirrors/create/qrep.tsx +++ b/ui/app/mirrors/create/qrep.tsx @@ -31,33 +31,9 @@ export default function QRepConfigForm(props: QRepConfigProps) { }; const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: - | string - | boolean - | Peer - | QRepSyncMode - | QRepWriteType - | string[] = val; - if (setting.label.includes('Peer')) { - stateVal = props.peers.find((peer) => peer.name === val)!; - if (setting.label === 'Destination Peer') { - if (stateVal.type === DBType.POSTGRES) { - props.setter((curr) => { - return { - ...curr, - syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - }; - }); - } else if (stateVal.type === DBType.SNOWFLAKE) { - props.setter((curr) => { - return { - ...curr, - syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - }; - }); - } - } - } else if (setting.label.includes('Sync Mode')) { + let stateVal: string | boolean | QRepSyncMode | QRepWriteType | string[] = + val; + if (setting.label.includes('Sync Mode')) { stateVal = val === 'AVRO' ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO @@ -150,9 +126,7 @@ export default function QRepConfigForm(props: QRepConfigProps) { }} >