From 1924c1ccf841a149eb9d6fdb415272fa46f47cb8 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 23 Nov 2023 09:50:10 +0530 Subject: [PATCH] select all, table search, pkey disable --- flow/cmd/peer_data.go | 42 ++- ui/app/mirrors/create/cdc/schemabox.tsx | 326 +++++++++++++++++++++ ui/app/mirrors/create/cdc/styles.ts | 5 +- ui/app/mirrors/create/cdc/tablemapping.tsx | 306 ++----------------- ui/app/mirrors/create/handlers.ts | 25 +- ui/app/mirrors/create/schema.ts | 4 +- 6 files changed, 401 insertions(+), 307 deletions(-) create mode 100644 ui/app/mirrors/create/cdc/schemabox.tsx diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 1896f6143c..14c8ba43cc 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -135,9 +135,40 @@ func (h *FlowRequestHandler) GetColumns( } defer peerPool.Close() - rows, err := peerPool.Query(ctx, "SELECT column_name, data_type"+ - " FROM information_schema.columns"+ - " WHERE table_schema = $1 AND table_name = $2;", req.SchemaName, req.TableName) + rows, err := peerPool.Query(ctx, ` + SELECT + cols.column_name, + cols.data_type, + CASE + WHEN constraint_type = 'PRIMARY KEY' THEN true + ELSE false + END AS is_primary_key + FROM + information_schema.columns cols + LEFT JOIN + ( + SELECT + kcu.column_name, + tc.constraint_type + FROM + information_schema.key_column_usage kcu + JOIN + information_schema.table_constraints tc + ON + kcu.constraint_name = tc.constraint_name + AND kcu.constraint_schema = tc.constraint_schema + AND kcu.constraint_name = tc.constraint_name + WHERE + tc.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = $1 + AND kcu.table_name = $2 + ) AS pk + ON + cols.column_name = pk.column_name + WHERE + cols.table_schema = $3 + AND cols.table_name = $4; + `, req.SchemaName, req.TableName, req.SchemaName, req.TableName) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err } @@ -147,11 +178,12 @@ func (h *FlowRequestHandler) GetColumns( for rows.Next() { var columnName string var datatype string - err := rows.Scan(&columnName, &datatype) + var isPkey bool + err := rows.Scan(&columnName, &datatype, &isPkey) if err != nil { return &protos.TableColumnsResponse{Columns: nil}, err } - column := fmt.Sprintf("%s:%s", columnName, datatype) + column := fmt.Sprintf("%s:%s:%v", columnName, datatype, isPkey) columns = append(columns, column) } return &protos.TableColumnsResponse{Columns: columns}, nil diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx new file mode 100644 index 0000000000..4815de4bc4 --- /dev/null +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -0,0 +1,326 @@ +'use client'; +import { TableMapRow } from '@/app/dto/MirrorsDTO'; +import { DBType } from '@/grpc_generated/peers'; +import { Checkbox } from '@/lib/Checkbox'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { RowWithCheckbox } from '@/lib/Layout'; +import { SearchField } from '@/lib/SearchField'; +import { TextField } from '@/lib/TextField'; +import { Dispatch, SetStateAction, useCallback, useState } from 'react'; +import { BarLoader } from 'react-spinners/'; +import { fetchColumns, fetchTables } from '../handlers'; +import { expandableStyle, schemaBoxStyle, tableBoxStyle } from './styles'; + +interface SchemaBoxProps { + sourcePeer: string; + schema: string; + rows: TableMapRow[]; + setRows: Dispatch>; + tableColumns: { tableName: string; columns: string[] }[]; + setTableColumns: Dispatch< + SetStateAction<{ tableName: string; columns: string[] }[]> + >; + peerType?: DBType; +} +const SchemaBox = ({ + sourcePeer, + peerType, + schema, + rows, + setRows, + tableColumns, + setTableColumns, +}: SchemaBoxProps) => { + const [tablesLoading, setTablesLoading] = useState(false); + const [columnsLoading, setColumnsLoading] = useState(false); + const [expandedSchemas, setExpandedSchemas] = useState([]); + const [tableQuery, setTableQuery] = useState(''); + + const schemaIsExpanded = useCallback( + (schema: string) => { + return !!expandedSchemas.find((schemaName) => schemaName === schema); + }, + [expandedSchemas] + ); + + const handleAddRow = (source: string) => { + const newRows = [...rows]; + const index = newRows.findIndex((row) => row.source === source); + if (index >= 0) newRows[index] = { ...newRows[index], selected: true }; + setRows(newRows); + addTableColumns(source); + }; + + const handleRemoveRow = (source: string) => { + const newRows = [...rows]; + const index = newRows.findIndex((row) => row.source === source); + if (index >= 0) newRows[index] = { ...newRows[index], selected: false }; + setRows(newRows); + removeTableColumns(source); + }; + + const handleTableSelect = (on: boolean, source: string) => { + on ? handleAddRow(source) : handleRemoveRow(source); + }; + + const updateDestination = (source: string, dest: string) => { + const newRows = [...rows]; + const index = newRows.findIndex((row) => row.source === source); + newRows[index] = { ...newRows[index], destination: dest }; + setRows(newRows); + }; + + const addTableColumns = (table: string) => { + const schemaName = table.split('.')[0]; + const tableName = table.split('.')[1]; + fetchColumns(sourcePeer, schemaName, tableName, setColumnsLoading).then( + (res) => + setTableColumns((prev) => { + return [...prev, { tableName: table, columns: res }]; + }) + ); + }; + + const removeTableColumns = (table: string) => { + setTableColumns((prev) => { + return prev.filter((column) => column.tableName !== table); + }); + }; + + const getTableColumns = (tableName: string) => { + return tableColumns?.find((column) => column.tableName === tableName) + ?.columns; + }; + + const handleColumnExclusion = ( + source: string, + column: string, + include: boolean + ) => { + const currRows = [...rows]; + const rowOfSource = currRows.find((row) => row.source === source); + if (rowOfSource) { + if (include) { + const updatedExclude = rowOfSource.exclude.filter( + (col) => col !== column + ); + rowOfSource.exclude = updatedExclude; + } else { + rowOfSource.exclude.push(column); + } + } + setRows(currRows); + }; + + const handleSelectAll = ( + e: React.MouseEvent + ) => { + const newRows = [...rows]; + for (const row of newRows) { + row.selected = e.currentTarget.checked; + if (e.currentTarget.checked) addTableColumns(row.source); + else removeTableColumns(row.source); + } + setRows(newRows); + }; + + const handleSchemaClick = (schemaName: string) => { + if (!schemaIsExpanded(schemaName)) { + setTablesLoading(true); + setExpandedSchemas((curr) => [...curr, schemaName]); + fetchTables(sourcePeer, schemaName, peerType).then((tableRows) => { + const newRows = [...rows, ...tableRows]; + setRows(newRows); + setTablesLoading(false); + }); + } else { + setExpandedSchemas((curr) => + curr.filter((expandedSchema) => expandedSchema != schemaName) + ); + setRows((curr) => curr.filter((row) => row.schema !== schemaName)); + } + }; + + return ( +
+
+
+
handleSchemaClick(schema)} + > + +

{schema}

+
+
+
+ handleSelectAll(e)} /> + +
+ ) => + setTableQuery(e.target.value) + } + /> +
+
+ {schemaIsExpanded(schema) && ( +
+ {rows.filter((row) => row.schema === schema).length ? ( + rows + .filter( + (row) => + row.schema === schema && + row.source.toLowerCase().includes(tableQuery.toLowerCase()) + ) + .map((row, index) => { + const columns = getTableColumns(row.source); + return ( +
+
+ + {row.source} + + } + action={ + + handleTableSelect(state, row.source) + } + /> + } + /> + +
+

Target Table:

+ + ) => updateDestination(row.source, e.target.value)} + /> +
+
+ {row.selected && ( +
+ + {columns ? ( + columns.map((column, index) => { + const columnName = column.split(':')[0]; + const columnType = column.split(':')[1]; + const isPkey = column.split(':')[2] === 'true'; + return ( + + {columnName}{' '} +

+ {columnType} +

+ + } + action={ + col == columnName + ) + } + onCheckedChange={(state: boolean) => + handleColumnExclusion( + row.source, + columnName, + state + ) + } + /> + } + /> + ); + }) + ) : columnsLoading ? ( + + ) : ( + + )} +
+ )} +
+ ); + }) + ) : tablesLoading ? ( + + ) : ( + + )} +
+ )} +
+
+ ); +}; + +export default SchemaBox; diff --git a/ui/app/mirrors/create/cdc/styles.ts b/ui/app/mirrors/create/cdc/styles.ts index 29fd29963b..b60ee4035e 100644 --- a/ui/app/mirrors/create/cdc/styles.ts +++ b/ui/app/mirrors/create/cdc/styles.ts @@ -1,12 +1,11 @@ import { CSSProperties } from 'styled-components'; export const expandableStyle = { - fontSize: 13, - overflow: 'hidden', + fontSize: 14, display: 'flex', alignItems: 'center', + justifyContent: 'space-between', color: 'rgba(0,0,0,0.7)', - textOverflow: 'ellipsis', cursor: 'pointer', }; diff --git a/ui/app/mirrors/create/cdc/tablemapping.tsx b/ui/app/mirrors/create/cdc/tablemapping.tsx index 6cb9759a35..ee4d86d588 100644 --- a/ui/app/mirrors/create/cdc/tablemapping.tsx +++ b/ui/app/mirrors/create/cdc/tablemapping.tsx @@ -1,27 +1,13 @@ 'use client'; import { DBType } from '@/grpc_generated/peers'; -import { Checkbox } from '@/lib/Checkbox'; -import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; -import { RowWithCheckbox } from '@/lib/Layout'; import { SearchField } from '@/lib/SearchField'; -import { TextField } from '@/lib/TextField'; -import { - Dispatch, - SetStateAction, - useCallback, - useEffect, - useState, -} from 'react'; +import { Dispatch, SetStateAction, useEffect, useState } from 'react'; import { BarLoader } from 'react-spinners/'; import { TableMapRow } from '../../../dto/MirrorsDTO'; -import { fetchColumns, fetchSchemas, fetchTables } from '../handlers'; -import { - expandableStyle, - loaderContainer, - schemaBoxStyle, - tableBoxStyle, -} from './styles'; +import { fetchSchemas } from '../handlers'; +import SchemaBox from './schemabox'; +import { loaderContainer } from './styles'; interface TableMappingProps { sourcePeerName: string; @@ -37,111 +23,12 @@ const TableMapping = ({ peerType, }: TableMappingProps) => { const [allSchemas, setAllSchemas] = useState(); + const [schemaQuery, setSchemaQuery] = useState(''); const [tableColumns, setTableColumns] = useState< { tableName: string; columns: string[] }[] >([]); - const [tablesLoading, setTablesLoading] = useState(false); - const [columnsLoading, setColumnsLoading] = useState(false); - const [searchQuery, setSearchQuery] = useState(''); - const [expandedSchemas, setExpandedSchemas] = useState([]); - - const schemaIsExpanded = useCallback( - (schema: string) => { - return !!expandedSchemas.find((schemaName) => schemaName === schema); - }, - [expandedSchemas] - ); - - const addTableColumns = (table: string) => { - const schemaName = table.split('.')[0]; - const tableName = table.split('.')[1]; - fetchColumns(sourcePeerName, schemaName, tableName, setColumnsLoading).then( - (res) => - setTableColumns((prev) => { - return [...prev, { tableName: table, columns: res }]; - }) - ); - }; - - const removeTableColumns = (table: string) => { - setTableColumns((prev) => { - return prev.filter((column) => column.tableName !== table); - }); - }; - - const getTableColumns = (tableName: string) => { - return tableColumns?.find((column) => column.tableName === tableName) - ?.columns; - }; - - const handleColumnExclusion = ( - source: string, - column: string, - include: boolean - ) => { - const currRows = [...rows]; - const rowOfSource = currRows.find((row) => row.source === source); - if (rowOfSource) { - if (include) { - const updatedExclude = rowOfSource.exclude.filter( - (col) => col !== column - ); - rowOfSource.exclude = updatedExclude; - } else { - rowOfSource.exclude.push(column); - } - } - setRows(currRows); - }; - - const handleAddRow = (source: string) => { - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - if (index >= 0) newRows[index] = { ...newRows[index], selected: true }; - setRows(newRows); - addTableColumns(source); - }; - - const handleRemoveRow = (source: string) => { - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - if (index >= 0) newRows[index] = { ...newRows[index], selected: false }; - setRows(newRows); - removeTableColumns(source); - }; - - const handleTableSelect = (on: boolean, source: string) => { - on ? handleAddRow(source) : handleRemoveRow(source); - }; - - const updateDestination = (source: string, dest: string) => { - const newRows = [...rows]; - const index = newRows.findIndex((row) => row.source === source); - newRows[index] = { ...newRows[index], destination: dest }; - setRows(newRows); - }; - - const handleSchemaClick = (schemaName: string) => { - if (!schemaIsExpanded(schemaName)) { - setTablesLoading(true); - setExpandedSchemas((curr) => [...curr, schemaName]); - fetchTables(sourcePeerName, schemaName, peerType).then((tableRows) => { - const newRows = [...rows, ...tableRows]; - setRows(newRows); - setTablesLoading(false); - }); - } else { - setExpandedSchemas((curr) => - curr.filter((expandedSchema) => expandedSchema != schemaName) - ); - setRows((curr) => curr.filter((row) => row.schema !== schemaName)); - } - }; - useEffect(() => { - fetchSchemas(sourcePeerName, setTablesLoading).then((res) => - setAllSchemas(res) - ); + fetchSchemas(sourcePeerName).then((res) => setAllSchemas(res)); }, [sourcePeerName]); return ( @@ -160,10 +47,10 @@ const TableMapping = ({
) => - setSearchQuery(e.target.value) + setSchemaQuery(e.target.value) } />
@@ -172,172 +59,19 @@ const TableMapping = ({ {allSchemas ? ( allSchemas ?.filter((schema) => { - return schema.toLowerCase().includes(searchQuery.toLowerCase()); + return schema.toLowerCase().includes(schemaQuery.toLowerCase()); }) .map((schema, index) => ( -
-
-
handleSchemaClick(schema)} - > - -

{schema}

-
- {schemaIsExpanded(schema) && ( -
- {rows.filter((row) => row.schema === schema).length ? ( - rows - .filter((row) => row.schema === schema) - .map((row, index) => { - const columns = getTableColumns(row.source); - return ( -
-
- - {row.source} - - } - action={ - - handleTableSelect(state, row.source) - } - /> - } - /> - -
-

- Target Table: -

- - ) => - updateDestination( - row.source, - e.target.value - ) - } - /> -
-
- {row.selected && ( -
- - {columns ? ( - columns.map((column, index) => { - const columnName = column.split(':')[0]; - const columnType = column.split(':')[1]; - return ( - - {columnName}{' '} -

- {columnType} -

- - } - action={ - col == columnName - ) - } - onCheckedChange={( - state: boolean - ) => - handleColumnExclusion( - row.source, - columnName, - state - ) - } - /> - } - /> - ); - }) - ) : columnsLoading ? ( - - ) : ( - - )} -
- )} -
- ); - }) - ) : tablesLoading ? ( - - ) : ( - - )} -
- )} -
-
+ )) ) : (
diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index b324cab704..10ccc4e0cc 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -64,7 +64,15 @@ export const handlePeer = ( }; const validateCDCFields = ( - tableMapping: TableMapRow[], + tableMapping: ( + | { + sourceTableIdentifier: string; + destinationTableIdentifier: string; + partitionKey: string; + exclude: string[]; + } + | undefined + )[], setMsg: Dispatch>, config: CDCConfig ): boolean => { @@ -115,7 +123,7 @@ interface TableMapping { const reformattedTableMapping = (tableMapping: TableMapRow[]) => { const mapping = tableMapping .map((row) => { - if (row.selected === true) { + if (row?.selected === true) { return { sourceTableIdentifier: row.source, destinationTableIdentifier: row.destination, @@ -147,10 +155,10 @@ export const handleCreateCDC = async ( setMsg({ ok: false, msg: flowNameErr }); return; } - - const isValid = validateCDCFields(rows, setMsg, config); - if (!isValid) return; const tableNameMapping = reformattedTableMapping(rows); + const isValid = validateCDCFields(tableNameMapping, setMsg, config); + if (!isValid) return; + config['tableMappings'] = tableNameMapping as TableMapping[]; config['flowJobName'] = flowJobName; setLoading(true); @@ -232,18 +240,13 @@ export const handleCreateQRep = async ( setLoading(false); }; -export const fetchSchemas = async ( - peerName: string, - setLoading: Dispatch> -) => { - setLoading(true); +export const fetchSchemas = async (peerName: string) => { const schemasRes: USchemasResponse = await fetch('/api/peers/schemas', { method: 'POST', body: JSON.stringify({ peerName, }), }).then((res) => res.json()); - setLoading(false); return schemasRes.schemas; }; diff --git a/ui/app/mirrors/create/schema.ts b/ui/app/mirrors/create/schema.ts index 2f1d56ea8a..3400ba459d 100644 --- a/ui/app/mirrors/create/schema.ts +++ b/ui/app/mirrors/create/schema.ts @@ -14,10 +14,10 @@ export const flowNameSchema = z export const tableMappingSchema = z .array( z.object({ - source: z + sourceTableIdentifier: z .string() .min(1, 'source table names, if added, must be non-empty'), - destination: z + destinationTableIdentifier: z .string() .min(1, 'destination table names, if added, must be non-empty'), exclude: z.array(z.string()).optional(),