Skip to content

Commit

Permalink
asserts unique cols for upsert mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 27, 2023
1 parent 2db5a5d commit a91bc0d
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
12 changes: 11 additions & 1 deletion nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,18 @@ pub fn process_options(
// all options processed have been removed from the map
// so any leftover keys are options that shouldn't be here
if !raw_opts.is_empty() {
anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::<Vec<&str>>());
anyhow::bail!(
"Unknown options for QRep mirrors: {:#?}",
raw_opts.into_keys().collect::<Vec<&str>>()
);
}

// If mode is upsert, we need unique key columns
if opts.get("mode") == Some(&Value::String(String::from("upsert")))
&& (opts.get("unique_key_columns") == None
|| opts.get("unique_key_columns") == Some(&Value::Array(vec![])))
{
anyhow::bail!("For upsert mode, unique_key_columns must be specified");
}
Ok(opts)
}
12 changes: 11 additions & 1 deletion ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO';
import { QRepConfig } from '@/grpc_generated/flow';
import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow';
import { Dispatch, SetStateAction } from 'react';
import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO';
import { cdcSchema, qrepSchema, tableMappingSchema } from './schema';
Expand Down Expand Up @@ -122,6 +122,16 @@ export const handleCreateQRep = async (
config.initialCopyOnly = false;
}

if (
config.writeMode?.writeType == QRepWriteType.QREP_WRITE_MODE_UPSERT &&
!config.writeMode?.upsertKeyColumns
) {
setMsg({
ok: false,
msg: 'For upsert mode, unique key columns cannot be empty.',
});
return;
}
const isValid = validateQRepFields(query, setMsg, config);
if (!isValid) return;
config.flowJobName = flowJobName;
Expand Down
3 changes: 2 additions & 1 deletion ui/app/mirrors/create/helpers/qrep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ export const qrepSettings: MirrorSetting[] = [
writeMode: currWriteMode,
};
}),
tips: `Needed when write mode is set to UPSERT. These columns need to be unique and are used for updates.`,
tips: `Comma separated string column names. Needed when write mode is set to UPSERT.
These columns need to be unique and are used for updates.`,
},
{
label: 'Initial Copy Only',
Expand Down
1 change: 0 additions & 1 deletion ui/app/mirrors/create/qrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export default function QRepConfigForm(props: QRepConfigProps) {
? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO
: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
} else if (setting.label.includes('Write Type')) {
console.log('Handling write type: ' + val);
switch (val) {
case 'Upsert':
stateVal = QRepWriteType.QREP_WRITE_MODE_UPSERT;
Expand Down

0 comments on commit a91bc0d

Please sign in to comment.