diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 1896f6143..14c8ba43c 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -135,9 +135,40 @@ func (h *FlowRequestHandler) GetColumns( } defer peerPool.Close() - rows, err := peerPool.Query(ctx, "SELECT column_name, data_type"+ - " FROM information_schema.columns"+ - " WHERE table_schema = $1 AND table_name = $2;", req.SchemaName, req.TableName) + rows, err := peerPool.Query(ctx, ` + SELECT + cols.column_name, + cols.data_type, + CASE + WHEN constraint_type = 'PRIMARY KEY' THEN true + ELSE false + END AS is_primary_key + FROM + information_schema.columns cols + LEFT JOIN + ( + SELECT + kcu.column_name, + tc.constraint_type + FROM + information_schema.key_column_usage kcu + JOIN + information_schema.table_constraints tc + ON + kcu.constraint_name = tc.constraint_name + AND kcu.constraint_schema = tc.constraint_schema + AND kcu.constraint_name = tc.constraint_name + WHERE + tc.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = $1 + AND kcu.table_name = $2 + ) AS pk + ON + cols.column_name = pk.column_name + WHERE + cols.table_schema = $3 + AND cols.table_name = $4; + `, req.SchemaName, req.TableName, req.SchemaName, req.TableName) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err } @@ -147,11 +178,12 @@ func (h *FlowRequestHandler) GetColumns( for rows.Next() { var columnName string var datatype string - err := rows.Scan(&columnName, &datatype) + var isPkey bool + err := rows.Scan(&columnName, &datatype, &isPkey) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err } - column := fmt.Sprintf("%s:%s", columnName, datatype) + column := fmt.Sprintf("%s:%s:%v", columnName, datatype, isPkey) columns = append(columns, column) } return &protos.TableColumnsResponse{Columns: columns}, nil diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index 33fa094c5..f63a93a15 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -14,6 +14,7 @@ export type CDCConfig = FlowConnectionConfigs; export type MirrorConfig = CDCConfig | QRepConfig; export type MirrorSetter = Dispatch>; export type TableMapRow = { + schema: string; source: string; destination: string; partitionKey: string; diff --git a/ui/app/mirrors/create/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx similarity index 94% rename from ui/app/mirrors/create/cdc.tsx rename to ui/app/mirrors/create/cdc/cdc.tsx index ce7b6b931..58ae9afd5 100644 --- a/ui/app/mirrors/create/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -8,9 +8,9 @@ import { Switch } from '@/lib/Switch'; import { TextField } from '@/lib/TextField'; import { Dispatch, SetStateAction } from 'react'; import ReactSelect from 'react-select'; -import { InfoPopover } from '../../../components/InfoPopover'; -import { CDCConfig, MirrorSetter, TableMapRow } from '../../dto/MirrorsDTO'; -import { MirrorSetting } from './helpers/common'; +import { InfoPopover } from '../../../../components/InfoPopover'; +import { CDCConfig, MirrorSetter, TableMapRow } from '../../../dto/MirrorsDTO'; +import { MirrorSetting } from '../helpers/common'; import TableMapping from './tablemapping'; interface MirrorConfigProps { @@ -19,8 +19,6 @@ interface MirrorConfigProps { setter: MirrorSetter; rows: TableMapRow[]; setRows: Dispatch>; - schema: string; - setSchema: Dispatch>; } const SyncModeOptions = ['AVRO', 'Copy with Binary'].map((value) => ({ @@ -46,8 +44,6 @@ export default function CDCConfigForm({ setter, rows, setRows, - schema, - setSchema, }: MirrorConfigProps) { const setToDefault = (setting: MirrorSetting) => { const destinationPeerType = mirrorConfig.destination?.type; @@ -81,15 +77,13 @@ export default function CDCConfigForm({ return true; }; - if (mirrorConfig.source != undefined) + if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined) return ( <> {settings.map((setting, id) => { diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx new file mode 100644 index 000000000..5a40092ca --- /dev/null +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -0,0 +1,325 @@ +'use client'; +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { DBType } from '@/grpc_generated/peers'; +import { Checkbox } from '@/lib/Checkbox'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { RowWithCheckbox } from '@/lib/Layout'; +import { SearchField } from '@/lib/SearchField'; +import { TextField } from '@/lib/TextField'; +import { Dispatch, SetStateAction, useCallback, useState } from 'react'; +import { BarLoader } from 'react-spinners/'; +import { fetchColumns, fetchTables } from '../handlers'; +import { expandableStyle, schemaBoxStyle, tableBoxStyle } from './styles'; + +interface SchemaBoxProps { + sourcePeer: string; + schema: string; + rows: TableMapRow[]; + setRows: Dispatch>; + tableColumns: { tableName: string; columns: string[] }[]; + setTableColumns: Dispatch< + SetStateAction<{ tableName: string; columns: string[] }[]> + >; + peerType?: DBType; +} +const SchemaBox = ({ + sourcePeer, + peerType, + schema, + rows, + setRows, + tableColumns, + setTableColumns, +}: SchemaBoxProps) => { + const [tablesLoading, setTablesLoading] = useState(false); + const [columnsLoading, setColumnsLoading] = useState(false); + const [expandedSchemas, setExpandedSchemas] = useState([]); + const [tableQuery, setTableQuery] = useState(''); + + const schemaIsExpanded = useCallback( + (schema: string) => { + return !!expandedSchemas.find((schemaName) => schemaName === schema); + }, + [expandedSchemas] + ); + + const handleAddRow = (source: string) => { + const newRows = [...rows]; + const index = newRows.findIndex((row) => row.source === source); + if (index >= 0) newRows[index] = { ...newRows[index], selected: true }; + setRows(newRows); + addTableColumns(source); + }; + + const handleRemoveRow = (source: string) => { + const newRows = [...rows]; + const index = newRows.findIndex((row) => row.source === source); + if (index >= 0) newRows[index] = { ...newRows[index], selected: false }; + setRows(newRows); + removeTableColumns(source); + }; + + const handleTableSelect = (on: boolean, source: string) => { + on ? handleAddRow(source) : handleRemoveRow(source); + }; + + const updateDestination = (source: string, dest: string) => { + const newRows = [...rows]; + const index = newRows.findIndex((row) => row.source === source); + newRows[index] = { ...newRows[index], destination: dest }; + setRows(newRows); + }; + + const addTableColumns = (table: string) => { + const schemaName = table.split('.')[0]; + const tableName = table.split('.')[1]; + fetchColumns(sourcePeer, schemaName, tableName, setColumnsLoading).then( + (res) => + setTableColumns((prev) => { + return [...prev, { tableName: table, columns: res }]; + }) + ); + }; + + const removeTableColumns = (table: string) => { + setTableColumns((prev) => { + return prev.filter((column) => column.tableName !== table); + }); + }; + + const getTableColumns = (tableName: string) => { + return tableColumns?.find((column) => column.tableName === tableName) + ?.columns; + }; + + const handleColumnExclusion = ( + source: string, + column: string, + include: boolean + ) => { + const currRows = [...rows]; + const rowOfSource = currRows.find((row) => row.source === source); + if (rowOfSource) { + if (include) { + const updatedExclude = rowOfSource.exclude.filter( + (col) => col !== column + ); + rowOfSource.exclude = updatedExclude; + } else { + rowOfSource.exclude.push(column); + } + } + setRows(currRows); + }; + + const handleSelectAll = ( + e: React.MouseEvent + ) => { + const newRows = [...rows]; + for (const row of newRows) { + row.selected = e.currentTarget.checked; + if (e.currentTarget.checked) addTableColumns(row.source); + else removeTableColumns(row.source); + } + setRows(newRows); + }; + + const handleSchemaClick = (schemaName: string) => { + if (!schemaIsExpanded(schemaName)) { + setTablesLoading(true); + setExpandedSchemas((curr) => [...curr, schemaName]); + fetchTables(sourcePeer, schemaName, peerType).then((tableRows) => { + const newRows = [...rows, ...tableRows]; + setRows(newRows); + setTablesLoading(false); + }); + } else { + setExpandedSchemas((curr) => + curr.filter((expandedSchema) => expandedSchema != schemaName) + ); + } + }; + + return ( +
+
+
+
handleSchemaClick(schema)} + > + +

{schema}

+
+
+
+ handleSelectAll(e)} /> + +
+ ) => + setTableQuery(e.target.value) + } + /> +
+
+ {schemaIsExpanded(schema) && ( +
+ {rows.filter((row) => row.schema === schema).length ? ( + rows + .filter( + (row) => + row.schema === schema && + row.source.toLowerCase().includes(tableQuery.toLowerCase()) + ) + .map((row, index) => { + const columns = getTableColumns(row.source); + return ( +
+
+ + {row.source} + + } + action={ + + handleTableSelect(state, row.source) + } + /> + } + /> + +
+

Target Table:

+ + ) => updateDestination(row.source, e.target.value)} + /> +
+
+ {row.selected && ( +
+ + {columns ? ( + columns.map((column, index) => { + const columnName = column.split(':')[0]; + const columnType = column.split(':')[1]; + const isPkey = column.split(':')[2] === 'true'; + return ( + + {columnName}{' '} +

+ {columnType} +

+ + } + action={ + col == columnName + ) + } + onCheckedChange={(state: boolean) => + handleColumnExclusion( + row.source, + columnName, + state + ) + } + /> + } + /> + ); + }) + ) : columnsLoading ? ( + + ) : ( + + )} +
+ )} +
+ ); + }) + ) : tablesLoading ? ( + + ) : ( + + )} +
+ )} +
+
+ ); +}; + +export default SchemaBox; diff --git a/ui/app/mirrors/create/cdc/styles.ts b/ui/app/mirrors/create/cdc/styles.ts new file mode 100644 index 000000000..b60ee4035 --- /dev/null +++ b/ui/app/mirrors/create/cdc/styles.ts @@ -0,0 +1,36 @@ +import { CSSProperties } from 'styled-components'; + +export const expandableStyle = { + fontSize: 14, + display: 'flex', + alignItems: 'center', + justifyContent: 'space-between', + color: 'rgba(0,0,0,0.7)', + cursor: 'pointer', +}; + +export const schemaBoxStyle: CSSProperties = { + width: '100%', + marginTop: '0.5rem', + padding: '0.5rem', + display: 'flex', + flexDirection: 'column', + border: '1px solid #e9ecf2', + borderRadius: '0.8rem', +}; + +export const tableBoxStyle: CSSProperties = { + border: '1px solid #e9ecf2', + borderRadius: '0.5rem', + marginBottom: '0.5rem', + width: '90%', + padding: '0.5rem', +}; + +export const loaderContainer: CSSProperties = { + display: 'flex', + flexDirection: 'column', + alignItems: 'center', + justifyContent: 'center', + height: '100%', +}; diff --git a/ui/app/mirrors/create/cdc/tablemapping.tsx b/ui/app/mirrors/create/cdc/tablemapping.tsx new file mode 100644 index 000000000..ee4d86d58 --- /dev/null +++ b/ui/app/mirrors/create/cdc/tablemapping.tsx @@ -0,0 +1,86 @@ +'use client'; +import { DBType } from '@/grpc_generated/peers'; +import { Label } from '@/lib/Label'; +import { SearchField } from '@/lib/SearchField'; +import { Dispatch, SetStateAction, useEffect, useState } from 'react'; +import { BarLoader } from 'react-spinners/'; +import { TableMapRow } from '../../../dto/MirrorsDTO'; +import { fetchSchemas } from '../handlers'; +import SchemaBox from './schemabox'; +import { loaderContainer } from './styles'; + +interface TableMappingProps { + sourcePeerName: string; + rows: TableMapRow[]; + setRows: Dispatch>; + peerType?: DBType; +} + +const TableMapping = ({ + sourcePeerName, + rows, + setRows, + peerType, +}: TableMappingProps) => { + const [allSchemas, setAllSchemas] = useState(); + const [schemaQuery, setSchemaQuery] = useState(''); + const [tableColumns, setTableColumns] = useState< + { tableName: string; columns: string[] }[] + >([]); + useEffect(() => { + fetchSchemas(sourcePeerName).then((res) => setAllSchemas(res)); + }, [sourcePeerName]); + + return ( +
+ +
+
+ ) => + setSchemaQuery(e.target.value) + } + /> +
+
+
+ {allSchemas ? ( + allSchemas + ?.filter((schema) => { + return schema.toLowerCase().includes(schemaQuery.toLowerCase()); + }) + .map((schema, index) => ( + + )) + ) : ( +
+ +
+ )} +
+
+ ); +}; + +export default TableMapping; diff --git a/ui/app/mirrors/create/columns.tsx b/ui/app/mirrors/create/columns.tsx deleted file mode 100644 index 20dca1a5e..000000000 --- a/ui/app/mirrors/create/columns.tsx +++ /dev/null @@ -1,132 +0,0 @@ -'use client'; -import { Button } from '@/lib/Button'; -import { Dispatch, SetStateAction, useState } from 'react'; -import { PulseLoader } from 'react-spinners'; -import { fetchColumns } from './handlers'; - -interface ColumnsDisplayProps { - setColumns: Dispatch< - SetStateAction< - { - tableName: string; - columns: string[]; - }[] - > - >; - columns?: { - tableName: string; - columns: string[]; - }[]; - peerName: string; - schemaName: string; - tableName: string; -} - -const ColumnsDisplay = (props: ColumnsDisplayProps) => { - const [loading, setLoading] = useState(false); - const addTableColumns = (table: string) => { - // add table to columns - fetchColumns( - props.peerName, - props.schemaName, - props.tableName, - setLoading - ).then((res) => - props.setColumns((prev) => { - return [...prev, { tableName: table, columns: res }]; - }) - ); - }; - - const removeTableColumns = (table: string) => { - // remove table from columns - props.setColumns((prev) => { - return prev.filter((column) => column.tableName !== table); - }); - }; - - const getTableColumns = (tableName: string) => { - // get table columns - return props.columns?.find((column) => column.tableName === tableName) - ?.columns; - }; - return ( -
- - -
- {getTableColumns(props.tableName)?.map((column, id) => { - const columnName = column.split(':')[0]; - const columnType = column.split(':')[1]; - return ( -
-
- {columnName} -
-
- {columnType} -
-
- ); - })} -
-
- ); -}; - -export default ColumnsDisplay; diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 8e43035e2..10ccc4e0c 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -4,7 +4,13 @@ import { USchemasResponse, UTablesResponse, } from '@/app/dto/PeersDTO'; -import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow'; +import { + FlowConnectionConfigs, + QRepConfig, + QRepSyncMode, + QRepWriteType, +} from '@/grpc_generated/flow'; +import { DBType, Peer, dBTypeToJSON } from '@/grpc_generated/peers'; import { Dispatch, SetStateAction } from 'react'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import { @@ -14,8 +20,59 @@ import { tableMappingSchema, } from './schema'; +export const handlePeer = ( + peer: Peer | null, + peerEnd: 'src' | 'dst', + setConfig: (value: SetStateAction) => void +) => { + if (!peer) return; + if (peerEnd === 'dst') { + if (peer.type === DBType.POSTGRES) { + setConfig((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + }; + }); + } else if ( + peer.type === DBType.SNOWFLAKE || + peer.type === DBType.BIGQUERY + ) { + setConfig((curr) => { + return { + ...curr, + cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, + }; + }); + } + setConfig((curr) => ({ + ...curr, + destination: peer, + destinationPeer: peer, + })); + } else { + setConfig((curr) => ({ + ...curr, + source: peer, + sourcePeer: peer, + })); + } +}; + const validateCDCFields = ( - tableMapping: TableMapRow[], + tableMapping: ( + | { + sourceTableIdentifier: string; + destinationTableIdentifier: string; + partitionKey: string; + exclude: string[]; + } + | undefined + )[], setMsg: Dispatch>, config: CDCConfig ): boolean => { @@ -66,7 +123,7 @@ interface TableMapping { const reformattedTableMapping = (tableMapping: TableMapRow[]) => { const mapping = tableMapping .map((row) => { - if (row.selected === true) { + if (row?.selected === true) { return { sourceTableIdentifier: row.source, destinationTableIdentifier: row.destination, @@ -98,10 +155,10 @@ export const handleCreateCDC = async ( setMsg({ ok: false, msg: flowNameErr }); return; } - - const isValid = validateCDCFields(rows, setMsg, config); - if (!isValid) return; const tableNameMapping = reformattedTableMapping(rows); + const isValid = validateCDCFields(tableNameMapping, setMsg, config); + if (!isValid) return; + config['tableMappings'] = tableNameMapping as TableMapping[]; config['flowJobName'] = flowJobName; setLoading(true); @@ -183,28 +240,22 @@ export const handleCreateQRep = async ( setLoading(false); }; -export const fetchSchemas = async ( - peerName: string, - setLoading: Dispatch> -) => { - setLoading(true); +export const fetchSchemas = async (peerName: string) => { const schemasRes: USchemasResponse = await fetch('/api/peers/schemas', { method: 'POST', body: JSON.stringify({ peerName, }), }).then((res) => res.json()); - setLoading(false); return schemasRes.schemas; }; export const fetchTables = async ( peerName: string, schemaName: string, - setLoading: Dispatch> + peerType?: DBType ) => { if (schemaName.length === 0) return []; - setLoading(true); const tablesRes: UTablesResponse = await fetch('/api/peers/tables', { method: 'POST', body: JSON.stringify({ @@ -212,8 +263,29 @@ export const fetchTables = async ( schemaName, }), }).then((res) => res.json()); - setLoading(false); - return tablesRes.tables; + + let tables = []; + const tableNames = tablesRes.tables; + if (tableNames) { + for (const tableName of tableNames) { + // setting defaults: + // for bigquery, tables are not schema-qualified + const dstName = + peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY' + ? tableName + : `${schemaName}.${tableName}`; + + tables.push({ + schema: schemaName, + source: `${schemaName}.${tableName}`, + destination: dstName, + partitionKey: '', + exclude: [], + selected: false, + }); + } + } + return tables; }; export const fetchColumns = async ( diff --git a/ui/app/mirrors/create/mirrorcards.tsx b/ui/app/mirrors/create/mirrorcards.tsx new file mode 100644 index 000000000..fd07aa94e --- /dev/null +++ b/ui/app/mirrors/create/mirrorcards.tsx @@ -0,0 +1,90 @@ +'use client'; +import { Label } from '@/lib/Label'; +import { RowWithRadiobutton } from '@/lib/Layout'; +import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; +import Link from 'next/link'; +import { SetStateAction } from 'react'; + +const MirrorCards = ({ + setMirrorType, +}: { + setMirrorType: (value: SetStateAction) => void; +}) => { + const cards = [ + { + title: 'CDC', + description: + 'Change-data Capture or CDC refers to replication of changes on the source table to the target table with initial load. This is recommended.', + link: 'https://docs.peerdb.io/usecases/Real-time%20CDC/overview', + }, + { + title: 'Query Replication', + description: + 'Query Replication allows you to specify a set of rows to be synced via a SELECT query.', + link: 'https://docs.peerdb.io/usecases/Streaming%20Query%20Replication/overview', + }, + { + title: 'XMIN', + description: + 'XMIN mode uses the xmin system column of PostgreSQL as a watermark column for replication.', + link: 'https://docs.peerdb.io/sql/commands/create-mirror#xmin-query-replication', + }, + ]; + return ( + setMirrorType(value)}> +
+ {cards.map((card, index) => { + return ( +
+
+ +
{card.title}
+ + } + action={} + /> + +
+ +
+ ); + })} +
+
+ ); +}; + +export default MirrorCards; diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index b443cdd76..ba9c45359 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -1,18 +1,13 @@ 'use client'; import { DBTypeToImageMapping } from '@/components/PeerComponent'; import { RequiredIndicator } from '@/components/RequiredIndicator'; -import { QRepConfig, QRepSyncMode } from '@/grpc_generated/flow'; +import { QRepConfig } from '@/grpc_generated/flow'; import { DBType, Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; -import { - RowWithRadiobutton, - RowWithSelect, - RowWithTextField, -} from '@/lib/Layout'; +import { RowWithSelect, RowWithTextField } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; -import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; import { TextField } from '@/lib/TextField'; import { Divider } from '@tremor/react'; import Image from 'next/image'; @@ -22,13 +17,14 @@ import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; import { InfoPopover } from '../../../components/InfoPopover'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; -import CDCConfigForm from './cdc'; -import { handleCreateCDC, handleCreateQRep } from './handlers'; +import CDCConfigForm from './cdc/cdc'; +import { handleCreateCDC, handleCreateQRep, handlePeer } from './handlers'; import { cdcSettings } from './helpers/cdc'; import { blankCDCSetting } from './helpers/common'; import { qrepSettings } from './helpers/qrep'; -import QRepConfigForm from './qrep'; -import QRepQuery from './query'; +import MirrorCards from './mirrorcards'; +import QRepConfigForm from './qrep/qrep'; +import QRepQuery from './qrep/query'; function getPeerValue(peer: Peer) { return peer.name; @@ -63,7 +59,6 @@ export default function CreateMirrors() { const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); - const [sourceSchema, setSourceSchema] = useState('public'); const [qrepQuery, setQrepQuery] = useState(`-- Here's a sample template: SELECT * FROM @@ -93,45 +88,6 @@ export default function CreateMirrors() { router.push('/mirrors'); }; - const handlePeer = (peer: Peer | null, peerEnd: 'src' | 'dst') => { - if (!peer) return; - if (peerEnd === 'dst') { - if (peer.type === DBType.POSTGRES) { - setConfig((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - }; - }); - } else if ( - peer.type === DBType.SNOWFLAKE || - peer.type === DBType.BIGQUERY - ) { - setConfig((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - }; - }); - } - setConfig((curr) => ({ - ...curr, - destination: peer, - destinationPeer: peer, - })); - } else { - setConfig((curr) => ({ - ...curr, - source: peer, - sourcePeer: peer, - })); - } - }; - return (
@@ -150,135 +106,7 @@ export default function CreateMirrors() { > Mirror type - setMirrorType(value)} - > -
-
-
- -
CDC
- - } - action={} - /> - -
- -
- -
-
- -
- Query Replication -
- - } - action={} - /> - -
- -
- -
- -
XMIN
- - } - action={} - /> - - -
-
-
- + Mirror Name} action={ @@ -291,76 +119,53 @@ export default function CreateMirrors() { /> } /> + {['src', 'dst'].map((peerEnd, index) => { + return ( + + {peerEnd === 'src' ? 'Source Peer' : 'Destination Peer'} + {RequiredIndicator(true)} + + } + action={ +
+ + handlePeer(val, peerEnd as 'src' | 'dst', setConfig) + } + options={ + (peerEnd === 'src' + ? peers.filter((peer) => peer.type == DBType.POSTGRES) + : peers) ?? [] + } + getOptionValue={getPeerValue} + formatOptionLabel={getPeerLabel} + /> + +
+ } + /> + ); + })} - - Source Peer - {RequiredIndicator(true)} - - } - action={ -
- handlePeer(val, 'src')} - options={ - peers.filter((peer) => peer.type == DBType.POSTGRES) ?? [] - } - getOptionValue={getPeerValue} - formatOptionLabel={getPeerLabel} - /> - -
- } - /> - - - Destination Peer - {RequiredIndicator(true)} - - } - action={ -
- handlePeer(val, 'dst')} - options={peers ?? []} - getOptionValue={getPeerValue} - formatOptionLabel={getPeerLabel} - /> - -
- } - /> {mirrorType === 'Query Replication' && ( @@ -394,8 +199,6 @@ export default function CreateMirrors() { setter={setConfig} rows={rows} setRows={setRows} - setSchema={setSourceSchema} - schema={sourceSchema} /> ) : ( >; - schema: string; - setSchema: Dispatch>; - peerType?: DBType; -} - -const TableMapping = ({ - sourcePeerName, - rows, - setRows, - schema, - setSchema, - peerType, -}: TableMappingProps) => { - const [allSchemas, setAllSchemas] = useState(); - const [tableColumns, setTableColumns] = useState< - { tableName: string; columns: string[] }[] - >([]); - const [loading, setLoading] = useState(false); - - const handleAddRow = (source: string) => { - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - if (index >= 0) newRows[index] = { ...newRows[index], selected: true }; - setRows(newRows); - }; - - const handleRemoveRow = (source: string) => { - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - if (index >= 0) newRows[index] = { ...newRows[index], selected: false }; - setRows(newRows); - }; - - const handleSelectAll = ( - e: React.MouseEvent - ) => { - const newRows = [...rows]; - for (const row of newRows) { - row.selected = e.currentTarget.checked; - } - setRows(newRows); - }; - - const handleSwitch = (on: boolean, source: string) => { - if (on) { - handleAddRow(source); - } else { - handleRemoveRow(source); - } - }; - - const updateDestination = (source: string, dest: string) => { - // find the row with source and update the destination - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - newRows[index] = { ...newRows[index], destination: dest }; - setRows(newRows); - }; - - const updatePartitionKey = (source: string, pkey: string) => { - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - newRows[index] = { ...newRows[index], partitionKey: pkey }; - setRows(newRows); - }; - - const getTablesOfSchema = useCallback( - (schemaName: string) => { - fetchTables(sourcePeerName, schemaName, setLoading).then((tableNames) => { - if (tableNames) { - const newRows = []; - for (const tableName of tableNames) { - const dstName = - peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY' - ? tableName - : `${schemaName}.${tableName}`; - newRows.push({ - source: `${schemaName}.${tableName}`, - destination: dstName, - partitionKey: '', - exclude: [], - selected: false, - }); - } - setRows(newRows); - } - }); - }, - [sourcePeerName, setRows, peerType] - ); - - const [searchQuery, setSearchQuery] = useState(''); - - useEffect(() => { - if (peerType != undefined && dBTypeToJSON(peerType) == 'BIGQUERY') { - setRows((rows) => { - const newRows = [...rows]; - newRows.forEach((_, i) => { - const row = newRows[i]; - newRows[i] = { - ...row, - destination: row.destination?.split('.')[1], - }; - }); - return newRows; - }); - } else { - setRows((rows) => { - const newRows = [...rows]; - newRows.forEach((_, i) => { - const row = newRows[i]; - newRows[i] = { - ...row, - destination: `${schema}.${ - row.destination?.split('.')[1] || row.destination - }`, - }; - }); - return newRows; - }); - } - }, [peerType, setRows, schema]); - - useEffect(() => { - fetchSchemas(sourcePeerName, setLoading).then((res) => setAllSchemas(res)); - setSchema('public'); - getTablesOfSchema('public'); - }, [sourcePeerName, setSchema, getTablesOfSchema]); - - return ( -
- - Source Schema} - action={ - { - if (val) { - setSchema(val.value || ''); - getTablesOfSchema(val.value || ''); - } - }} - defaultInputValue={schema.length > 0 ? schema : 'Loading...'} - isLoading={loading} - options={allSchemas?.map((schemaName) => { - return { value: schemaName, label: schemaName }; - })} - /> - } - /> -
-
- handleSelectAll(e)} /> - -
-
- ) => - setSearchQuery(e.target.value) - } - /> -
-
-
- {rows ? ( - rows - ?.filter((row) => { - return row.source - .toLowerCase() - .includes(searchQuery.toLowerCase()); - }) - .map((row, index) => ( -
-
-
-
- - handleSwitch(state, row.source) - } - /> -
- {row.source} -
-
- {row.selected && ( -
- - Destination Table Name - {RequiredIndicator(true)} -
- } - action={ -
- - ) => - updateDestination(row.source, e.target.value) - } - /> -
- } - /> - - Partition Key -
- } - action={ -
- - ) => - updatePartitionKey(row.source, e.target.value) - } - /> -
- } - /> -
- This is used only if you enable initial load, and - specifies its watermark. -
-
- )} -
- -
-
- )) - ) : ( -
- -
- )} -
- - ); -}; - -export default TableMapping; diff --git a/ui/app/mirrors/types.ts b/ui/app/mirrors/types.ts index 16054b2fa..3fa9f2416 100644 --- a/ui/app/mirrors/types.ts +++ b/ui/app/mirrors/types.ts @@ -4,4 +4,3 @@ import { Dispatch, SetStateAction } from 'react'; export type CDCConfig = FlowConnectionConfigs; export type MirrorConfig = CDCConfig | QRepConfig; export type MirrorSetter = Dispatch>; -export type TableMapRow = { source: string; destination: string };