From a91bc0ddd86bad5c997a8f43a2adfa463cd3ca63 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 26 Oct 2023 13:42:33 -0700 Subject: [PATCH] asserts unique cols for upsert mode --- nexus/analyzer/src/qrep.rs | 12 +++++++++++- ui/app/mirrors/create/handlers.ts | 12 +++++++++++- ui/app/mirrors/create/helpers/qrep.ts | 3 ++- ui/app/mirrors/create/qrep.tsx | 1 - 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index adf290acfe..6bb2e729fa 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -192,8 +192,18 @@ pub fn process_options( // all options processed have been removed from the map // so any leftover keys are options that shouldn't be here if !raw_opts.is_empty() { - anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::>()); + anyhow::bail!( + "Unknown options for QRep mirrors: {:#?}", + raw_opts.into_keys().collect::>() + ); } + // If mode is upsert, we need unique key columns + if opts.get("mode") == Some(&Value::String(String::from("upsert"))) + && (opts.get("unique_key_columns") == None + || opts.get("unique_key_columns") == Some(&Value::Array(vec![]))) + { + anyhow::bail!("For upsert mode, unique_key_columns must be specified"); + } Ok(opts) } diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index b21f35ea37..b5f431fe3c 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -1,5 +1,5 @@ import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO'; -import { QRepConfig } from '@/grpc_generated/flow'; +import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow'; import { Dispatch, SetStateAction } from 'react'; import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO'; import { cdcSchema, qrepSchema, tableMappingSchema } from './schema'; @@ -122,6 +122,16 @@ export const handleCreateQRep = async ( config.initialCopyOnly = false; } + if ( + config.writeMode?.writeType == QRepWriteType.QREP_WRITE_MODE_UPSERT && + !config.writeMode?.upsertKeyColumns + ) { + setMsg({ + ok: false, + msg: 'For upsert mode, unique key columns cannot be empty.', + }); + return; + } const isValid = validateQRepFields(query, setMsg, config); if (!isValid) return; config.flowJobName = flowJobName; diff --git a/ui/app/mirrors/create/helpers/qrep.ts b/ui/app/mirrors/create/helpers/qrep.ts index acf1c0bf11..f0f594bd5f 100644 --- a/ui/app/mirrors/create/helpers/qrep.ts +++ b/ui/app/mirrors/create/helpers/qrep.ts @@ -146,7 +146,8 @@ export const qrepSettings: MirrorSetting[] = [ writeMode: currWriteMode, }; }), - tips: `Needed when write mode is set to UPSERT. These columns need to be unique and are used for updates.`, + tips: `Comma separated string column names. Needed when write mode is set to UPSERT. + These columns need to be unique and are used for updates.`, }, { label: 'Initial Copy Only', diff --git a/ui/app/mirrors/create/qrep.tsx b/ui/app/mirrors/create/qrep.tsx index 139b37e07a..1e289f629d 100644 --- a/ui/app/mirrors/create/qrep.tsx +++ b/ui/app/mirrors/create/qrep.tsx @@ -36,7 +36,6 @@ export default function QRepConfigForm(props: QRepConfigProps) { ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO : QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; } else if (setting.label.includes('Write Type')) { - console.log('Handling write type: ' + val); switch (val) { case 'Upsert': stateVal = QRepWriteType.QREP_WRITE_MODE_UPSERT;