diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 1535b40731..b0f186f0e9 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -174,7 +174,6 @@ const quotedWatermarkTable = (watermarkTable: string): string => { export const handleCreateQRep = async ( flowJobName: string, - query: string, config: QRepConfig, notify: (msg: string) => void, setLoading: Dispatch>, @@ -193,7 +192,6 @@ export const handleCreateQRep = async ( config.query = `SELECT * FROM ${quotedWatermarkTable( config.watermarkTable )}`; - query = config.query; config.initialCopyOnly = false; } @@ -205,14 +203,28 @@ export const handleCreateQRep = async ( notify('For upsert mode, unique key columns cannot be empty.'); return; } - const fieldErr = validateQRepFields(query, config); - if (fieldErr) { - notify(fieldErr); - return; + + if (config.sourcePeer?.snowflakeConfig) { + if (config.query.includes('')) { + notify('Please fill in the query correctly'); + return; + } + if (config.watermarkTable == '') { + notify('Please fill in the source table'); + return; + } + if (config.destinationTableIdentifier == '') { + notify('Please fill in the destination table'); + return; + } + } else { + const fieldErr = validateQRepFields(config.query, config); + if (fieldErr) { + notify(fieldErr); + return; + } } config.flowJobName = flowJobName; - config.query = query; - setLoading(true); const statusMessage: UCreateMirrorResponse = await fetch( '/api/mirrors/qrep', diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 5d1b8d9a93..856217509e 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -39,18 +39,32 @@ export const blankCDCSetting: FlowConnectionConfigs = { export const blankQRepSetting = { destinationTableIdentifier: '', - query: '', + query: `-- Here's a sample template: + SELECT * FROM + WHERE + BETWEEN {{.start}} AND {{.end}}`, watermarkTable: '', watermarkColumn: '', initialCopyOnly: false, - syncMode: 0, - batchSizeInt: 0, - batchDurationSeconds: 0, maxParallelWorkers: 4, waitBetweenBatchesSeconds: 30, writeMode: undefined, stagingPath: '', numRowsPerPartition: 100000, setupWatermarkTableOnDestination: false, - dstTableFullResync: false, +}; + +export const blankSnowflakeQRepSetting = { + destinationTableIdentifier: '', + query: `-- Here's a sample template you can fill in: + SELECT * FROM `, + watermarkTable: '', + watermarkColumn: '', + maxParallelWorkers: 4, + waitBetweenBatchesSeconds: 30, + writeMode: undefined, + stagingPath: '', + numRowsPerPartition: 100000, + setupWatermarkTableOnDestination: false, + initialCopyOnly: true, }; diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index eb1260404f..e18c795bc3 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -137,14 +137,54 @@ export const qrepSettings: MirrorSetting[] = [ default: 30, type: 'number', }, - // { - // label: 'Resync Destination Table', - // stateHandler: (value, setter) => - // setter((curr: QRepConfig) => ({ - // ...curr, - // dstTableFullResync:value as boolean - // })), - // tips: 'Perform a resync of the provided destination table', - // type: 'switch', - // }, +]; + +export const snowflakeQRepSettings: MirrorSetting[] = [ + { + label: 'Table', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => ({ + ...curr, + watermarkTable: (value as string) || '', + })), + type: 'text', + tips: 'The source table of the replication and the table to which the watermark column belongs.', + required: true, + }, + { + label: 'Create Destination Table', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => ({ + ...curr, + setupWatermarkTableOnDestination: (value as boolean) || false, + })), + tips: 'Specify if you want to create the watermark table on the destination as-is, can be used for some queries.', + type: 'switch', + }, + { + label: 'Destination Table Name', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => ({ + ...curr, + destinationTableIdentifier: value as string, + })), + tips: 'Name of the destination. For any destination peer apart from BigQuery, this must be schema-qualified. Example: public.users', + required: true, + }, + { + label: 'Write Type', + stateHandler: (value, setter) => + setter((curr: QRepConfig) => { + let currWriteMode = curr.writeMode || { writeType: undefined }; + currWriteMode.writeType = value as QRepWriteType; + return { + ...curr, + writeMode: currWriteMode, + }; + }), + tips: `Specify whether you want the write mode to be via APPEND, UPSERT or OVERWRITE. + Append mode is for insert-only workloads. Upsert mode is append mode but also supports updates. + Overwrite mode overwrites the destination table data every sync.`, + type: 'select', + }, ]; diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index 5411995d86..d0ee0a7e19 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -31,7 +31,7 @@ import { blankCDCSetting } from './helpers/common'; import { qrepSettings } from './helpers/qrep'; import MirrorCards from './mirrorcards'; import QRepConfigForm from './qrep/qrep'; -import QRepQuery from './qrep/query'; +import SnowflakeQRepForm from './qrep/snowflakeQrep'; import * as styles from './styles'; function getPeerValue(peer: Peer) { @@ -75,11 +75,6 @@ export default function CreateMirrors() { const [config, setConfig] = useState(blankCDCSetting); const [peers, setPeers] = useState([]); const [rows, setRows] = useState([]); - const [qrepQuery, setQrepQuery] = - useState(`-- Here's a sample template: - SELECT * FROM - WHERE - BETWEEN {{.start}} AND {{.end}}`); useEffect(() => { fetch('/api/peers', { cache: 'no-store' }) @@ -191,10 +186,6 @@ export default function CreateMirrors() { - {mirrorType === 'Query Replication' && ( - - )} - {mirrorType && ( } + action={ +
+ + handleChange(state, setting) + } + /> + {setting.tips && ( + + )} +
+ } + /> + ) : setting.type === 'select' ? ( + + {setting.label} + {RequiredIndicator(setting.required)} + + } + action={ +
+
+ + val && handleChange(val.value, setting) + } + options={WriteModes} + /> +
+ + {setting.tips && ( + + )} +
+ } + /> + ) : ( + + {setting.label} + {setting.required && ( + + + + )} + + } + action={ +
+ ) => + handleChange(e.target.value, setting) + } + /> + {setting.tips && ( + + )} +
+ } + /> + )) + ); + }) + ) : ( + + )} + + ); +}