Skip to content

Commit

Permalink
remove sync mode fields from create mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 27, 2023
1 parent b7b6a8d commit fa75e46
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 152 deletions.
73 changes: 5 additions & 68 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import { RequiredIndicator } from '@/components/RequiredIndicator';
import { QRepSyncMode } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { Label } from '@/lib/Label';
import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { Switch } from '@/lib/Switch';
import { TextField } from '@/lib/TextField';
import { Dispatch, SetStateAction } from 'react';
import ReactSelect from 'react-select';
import { InfoPopover } from '../../../../components/InfoPopover';
import { CDCConfig, MirrorSetter, TableMapRow } from '../../../dto/MirrorsDTO';
import { MirrorSetting } from '../helpers/common';
Expand All @@ -21,17 +20,13 @@ interface MirrorConfigProps {
setRows: Dispatch<SetStateAction<TableMapRow[]>>;
}

const SyncModeOptions = ['AVRO', 'Copy with Binary'].map((value) => ({
label: value,
value,
}));

export const defaultSyncMode = (dtype: DBType | undefined) => {
switch (dtype) {
case DBType.POSTGRES:
return 'Copy with Binary';
case DBType.SNOWFLAKE:
case DBType.BIGQUERY:
case DBType.S3:
return 'AVRO';
default:
return 'Copy with Binary';
Expand All @@ -45,32 +40,17 @@ export default function CDCConfigForm({
rows,
setRows,
}: MirrorConfigProps) {
const setToDefault = (setting: MirrorSetting) => {
const destinationPeerType = mirrorConfig.destination?.type;
return (
setting.label.includes('Sync') &&
(destinationPeerType === DBType.POSTGRES ||
destinationPeerType === DBType.SNOWFLAKE)
);
};
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | QRepSyncMode = val;
if (setting.label.includes('Sync Mode')) {
stateVal =
val === 'AVRO'
? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO
: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
}
setting.stateHandler(stateVal, setter);
};

const paramDisplayCondition = (setting: MirrorSetting) => {
const label = setting.label.toLowerCase();
if (
(label.includes('snapshot') && mirrorConfig.doInitialCopy !== true) ||
(label.includes('snapshot staging') &&
mirrorConfig.snapshotSyncMode?.toString() !== '1') ||
(label.includes('cdc staging') &&
mirrorConfig.cdcSyncMode?.toString() !== '1')
(label.includes('staging path') &&
defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO')
) {
return false;
}
Expand Down Expand Up @@ -115,49 +95,6 @@ export default function CDCConfigForm({
</div>
}
/>
) : setting.type === 'select' ? (
<RowWithSelect
key={id}
label={
<Label>
{setting.label}
{RequiredIndicator(setting.required)}
</Label>
}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<ReactSelect
placeholder='Select a sync mode'
onChange={(val, action) =>
val && handleChange(val.value, setting)
}
isDisabled={setToDefault(setting)}
value={
setToDefault(setting)
? SyncModeOptions.find(
(opt) =>
opt.value ===
defaultSyncMode(mirrorConfig.destination?.type)
)
: undefined
}
options={SyncModeOptions}
/>
{setting.tips && (
<InfoPopover
tips={setting.tips}
link={setting.helpfulLink}
/>
)}
</div>
}
/>
) : (
<RowWithTextField
key={id}
Expand Down
16 changes: 16 additions & 0 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,21 @@ export const handleCreateCDC = async (
setMsg({ ok: false, msg: flowNameErr });
return;
}

const tableNameMapping = reformattedTableMapping(rows);
const isValid = validateCDCFields(tableNameMapping, setMsg, config);
if (!isValid) return;

config['tableMappings'] = tableNameMapping as TableMapping[];
config['flowJobName'] = flowJobName;

if (config.destination?.type == DBType.POSTGRES) {
config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
} else {
config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
}
setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch('/api/mirrors/cdc', {
method: 'POST',
Expand Down Expand Up @@ -220,6 +229,13 @@ export const handleCreateQRep = async (
if (!isValid) return;
config.flowJobName = flowJobName;
config.query = query;

if (config.destinationPeer?.type == DBType.POSTGRES) {
config.syncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
} else {
config.syncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch(
'/api/mirrors/qrep',
Expand Down
29 changes: 2 additions & 27 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { QRepSyncMode } from '@/grpc_generated/flow';
import { CDCConfig } from '../../../dto/MirrorsDTO';
import { MirrorSetting } from './common';
export const cdcSettings: MirrorSetting[] = [
Expand Down Expand Up @@ -63,38 +62,14 @@ export const cdcSettings: MirrorSetting[] = [
default: '4',
type: 'number',
},
{
label: 'Snapshot Sync Mode',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
snapshotSyncMode:
(value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
})),
tips: 'Specify whether you want the sync mode for initial load to be via SQL or by staging AVRO files. The default mode is SQL.',
default: 'SQL',
type: 'select',
},
{
label: 'CDC Sync Mode',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
cdcSyncMode:
(value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
})),
tips: 'Specify whether you want the sync mode for CDC to be via SQL or by staging AVRO files. The default mode is SQL.',
default: 'SQL',
type: 'select',
},
{
label: 'Snapshot Staging Path',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
snapshotStagingPath: value as string | '',
})),
tips: 'You can specify staging path if you have set the Snapshot sync mode as AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket URL.',
tips: 'You can specify staging path for Snapshot sync mode AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket URL. For BigQuery, this must be either empty or an existing GCS bucket name. In both cases, if empty, the local filesystem will be used.',
},
{
label: 'CDC Staging Path',
Expand All @@ -103,7 +78,7 @@ export const cdcSettings: MirrorSetting[] = [
...curr,
cdcStagingPath: (value as string) || '',
})),
tips: 'You can specify staging path if you have set the CDC sync mode as AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket url',
tips: 'You can specify staging path for CDC sync mode AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket URL. For BigQuery, this must be either empty or an existing GCS bucket name. In both cases, if empty, the local filesystem will be used.',
},
{
label: 'Soft Delete',
Expand Down
17 changes: 1 addition & 16 deletions ui/app/mirrors/create/helpers/qrep.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
QRepConfig,
QRepSyncMode,
QRepWriteMode,
QRepWriteType,
} from '@/grpc_generated/flow';
Expand Down Expand Up @@ -71,28 +70,14 @@ export const qrepSettings: MirrorSetting[] = [
default: '4',
type: 'number',
},
{
label: 'Sync Mode',
stateHandler: (value, setter) =>
setter((curr: QRepConfig) => ({
...curr,
syncMode:
(value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
})),
tips: 'Specify whether you want the sync mode to be via SQL or by staging AVRO files. The default mode is SQL.',
default: 'SQL',
type: 'select',
},

{
label: 'Staging Path',
stateHandler: (value, setter) =>
setter((curr: QRepConfig) => ({
...curr,
stagingPath: (value as string) || '',
})),
tips: `You can specify staging path if you have set the sync mode as AVRO. For Snowflake as destination peer.
If this starts with gs:// then it will be written to GCS.
tips: `You can specify staging path for sync mode AVRO. For Snowflake as destination peer:
If this starts with s3:// then it will be written to S3.
If nothing is specified then it will be written to local disk.`,
},
Expand Down
43 changes: 7 additions & 36 deletions ui/app/mirrors/create/qrep/qrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,11 @@ export default function QRepConfigForm({
{ value: string; label: string }[]
>([]);
const [loading, setLoading] = useState(false);
const setToDefault = (setting: MirrorSetting) => {
const destinationPeerType = mirrorConfig.destinationPeer?.type;
return (
setting.label.includes('Sync') &&
(destinationPeerType === DBType.POSTGRES ||
destinationPeerType === DBType.SNOWFLAKE)
);
};

const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | QRepSyncMode | QRepWriteType | string[] =
val;
if (setting.label.includes('Sync Mode')) {
stateVal =
val === 'AVRO'
? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO
: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
} else if (setting.label.includes('Write Type')) {
if (setting.label.includes('Write Type')) {
switch (val) {
case 'Upsert':
stateVal = QRepWriteType.QREP_WRITE_MODE_UPSERT;
Expand All @@ -86,14 +73,15 @@ export default function QRepConfigForm({
}
setting.stateHandler(stateVal, setter);
};

const paramDisplayCondition = (setting: MirrorSetting) => {
const label = setting.label.toLowerCase();
if (
(label.includes('upsert') &&
mirrorConfig.writeMode?.writeType !=
QRepWriteType.QREP_WRITE_MODE_UPSERT) ||
(label.includes('staging') &&
mirrorConfig.syncMode?.toString() !== '1') ||
defaultSyncMode(mirrorConfig.destinationPeer?.type) !== 'AVRO') ||
(label.includes('watermark column') && xmin) ||
(label.includes('initial copy') && xmin)
) {
Expand Down Expand Up @@ -200,30 +188,13 @@ export default function QRepConfigForm({
}}
>
<div style={{ width: '100%' }}>
{setting.label.includes('Sync') ||
setting.label.includes('Write') ? (
{setting.label.includes('Write') ? (
<ReactSelect
placeholder='Select a mode'
onChange={(val, action) =>
placeholder='Select a write mode'
onChange={(val) =>
val && handleChange(val.value, setting)
}
isDisabled={setToDefault(setting)}
defaultValue={
setToDefault(setting)
? ((mode) =>
SyncModes.find((opt) => opt.value === mode) ||
WriteModes.find((opt) => opt.value === mode))(
defaultSyncMode(
mirrorConfig.destinationPeer?.type
)
)
: undefined
}
options={
setting.label.includes('Sync')
? SyncModes
: WriteModes
}
options={WriteModes}
/>
) : (
<ReactSelect
Expand Down
11 changes: 6 additions & 5 deletions ui/components/InfoPopover.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ export const InfoPopover = ({

<Popover.Portal>
<Popover.Content
style={{ position: 'absolute' }}
style={{ backgroundColor: '#fff' }}
className='PopoverContent'
side='right'
sideOffset={5}
>
<div
Expand All @@ -31,9 +30,11 @@ export const InfoPopover = ({
minWidth: '15rem',
}}
>
<p className='Text' style={{ fontSize: 13 }}>
{tips}
</p>
{tips.split('.').map((sentence, index) => (
<p className='Text' style={{ fontSize: 13 }} key={index}>
{sentence.trim()}
</p>
))}

{link && (
<a
Expand Down

0 comments on commit fa75e46

Please sign in to comment.