Skip to content

Commit

Permalink
patch create qrep UI
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 27, 2024
1 parent ae4b630 commit cccae23
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 51 deletions.
28 changes: 20 additions & 8 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ const quotedWatermarkTable = (watermarkTable: string): string => {

export const handleCreateQRep = async (
flowJobName: string,
query: string,
config: QRepConfig,
notify: (msg: string) => void,
setLoading: Dispatch<SetStateAction<boolean>>,
Expand All @@ -193,7 +192,6 @@ export const handleCreateQRep = async (
config.query = `SELECT * FROM ${quotedWatermarkTable(
config.watermarkTable
)}`;
query = config.query;
config.initialCopyOnly = false;
}

Expand All @@ -205,14 +203,28 @@ export const handleCreateQRep = async (
notify('For upsert mode, unique key columns cannot be empty.');
return;
}
const fieldErr = validateQRepFields(query, config);
if (fieldErr) {
notify(fieldErr);
return;

if (config.sourcePeer?.snowflakeConfig) {
if (config.query.includes('<table_name>')) {
notify('Please fill in the query correctly');
return;
}
if (config.watermarkTable == '') {
notify('Please fill in the source table');
return;
}
if (config.destinationTableIdentifier == '') {
notify('Please fill in the destination table');
return;
}
} else {
const fieldErr = validateQRepFields(config.query, config);
if (fieldErr) {
notify(fieldErr);
return;
}
}
config.flowJobName = flowJobName;
config.query = query;

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch(
'/api/mirrors/qrep',
Expand Down
24 changes: 19 additions & 5 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,32 @@ export const blankCDCSetting: FlowConnectionConfigs = {

export const blankQRepSetting = {
destinationTableIdentifier: '',
query: '',
query: `-- Here's a sample template:
SELECT * FROM <table_name>
WHERE <watermark_column>
BETWEEN {{.start}} AND {{.end}}`,
watermarkTable: '',
watermarkColumn: '',
initialCopyOnly: false,
syncMode: 0,
batchSizeInt: 0,
batchDurationSeconds: 0,
maxParallelWorkers: 4,
waitBetweenBatchesSeconds: 30,
writeMode: undefined,
stagingPath: '',
numRowsPerPartition: 100000,
setupWatermarkTableOnDestination: false,
dstTableFullResync: false,
};

export const blankSnowflakeQRepSetting = {
destinationTableIdentifier: '',
query: `-- Here's a sample template you can fill in:
SELECT * FROM <table_name>`,
watermarkTable: '',
watermarkColumn: '',
maxParallelWorkers: 4,
waitBetweenBatchesSeconds: 30,
writeMode: undefined,
stagingPath: '',
numRowsPerPartition: 100000,
setupWatermarkTableOnDestination: false,
initialCopyOnly: true,
};
60 changes: 50 additions & 10 deletions ui/app/mirrors/create/helpers/qrep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,54 @@ export const qrepSettings: MirrorSetting[] = [
default: 30,
type: 'number',
},
// {
// label: 'Resync Destination Table',
// stateHandler: (value, setter) =>
// setter((curr: QRepConfig) => ({
// ...curr,
// dstTableFullResync:value as boolean
// })),
// tips: 'Perform a resync of the provided destination table',
// type: 'switch',
// },
];

export const snowflakeQRepSettings: MirrorSetting[] = [
{
label: 'Table',
stateHandler: (value, setter) =>
setter((curr: QRepConfig) => ({
...curr,
watermarkTable: (value as string) || '',
})),
type: 'text',
tips: 'The source table of the replication and the table to which the watermark column belongs.',
required: true,
},
{
label: 'Create Destination Table',
stateHandler: (value, setter) =>
setter((curr: QRepConfig) => ({
...curr,
setupWatermarkTableOnDestination: (value as boolean) || false,
})),
tips: 'Specify if you want to create the watermark table on the destination as-is, can be used for some queries.',
type: 'switch',
},
{
label: 'Destination Table Name',
stateHandler: (value, setter) =>
setter((curr: QRepConfig) => ({
...curr,
destinationTableIdentifier: value as string,
})),
tips: 'Name of the destination. For any destination peer apart from BigQuery, this must be schema-qualified. Example: public.users',
required: true,
},
{
label: 'Write Type',
stateHandler: (value, setter) =>
setter((curr: QRepConfig) => {
let currWriteMode = curr.writeMode || { writeType: undefined };
currWriteMode.writeType = value as QRepWriteType;
return {
...curr,
writeMode: currWriteMode,
};
}),
tips: `Specify whether you want the write mode to be via APPEND, UPSERT or OVERWRITE.
Append mode is for insert-only workloads. Upsert mode is append mode but also supports updates.
Overwrite mode overwrites the destination table data every sync.`,
type: 'select',
},
];
17 changes: 6 additions & 11 deletions ui/app/mirrors/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { blankCDCSetting } from './helpers/common';
import { qrepSettings } from './helpers/qrep';
import MirrorCards from './mirrorcards';
import QRepConfigForm from './qrep/qrep';
import QRepQuery from './qrep/query';
import SnowflakeQRepForm from './qrep/snowflakeQrep';
import * as styles from './styles';

function getPeerValue(peer: Peer) {
Expand Down Expand Up @@ -75,11 +75,6 @@ export default function CreateMirrors() {
const [config, setConfig] = useState<CDCConfig | QRepConfig>(blankCDCSetting);
const [peers, setPeers] = useState<Peer[]>([]);
const [rows, setRows] = useState<TableMapRow[]>([]);
const [qrepQuery, setQrepQuery] =
useState<string>(`-- Here's a sample template:
SELECT * FROM <table_name>
WHERE <watermark_column>
BETWEEN {{.start}} AND {{.end}}`);

useEffect(() => {
fetch('/api/peers', { cache: 'no-store' })
Expand Down Expand Up @@ -191,10 +186,6 @@ export default function CreateMirrors() {

<Divider style={{ marginTop: '1rem', marginBottom: '1rem' }} />

{mirrorType === 'Query Replication' && (
<QRepQuery query={qrepQuery} setter={setQrepQuery} />
)}

{mirrorType && (
<Label
as='label'
Expand All @@ -215,6 +206,11 @@ export default function CreateMirrors() {
rows={rows}
setRows={setRows}
/>
) : (config as QRepConfig).sourcePeer?.type == DBType.SNOWFLAKE ? (
<SnowflakeQRepForm
mirrorConfig={config as QRepConfig}
setter={setConfig}
/>
) : (
<QRepConfigForm
settings={qrepSettings}
Expand Down Expand Up @@ -267,7 +263,6 @@ export default function CreateMirrors() {
)
: handleCreateQRep(
mirrorName,
qrepQuery,
config as QRepConfig,
notifyErr,
setCreating,
Expand Down
17 changes: 10 additions & 7 deletions ui/app/mirrors/create/qrep/qrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { MirrorSetter } from '../../../dto/MirrorsDTO';
import { defaultSyncMode } from '../cdc/cdc';
import { fetchAllTables, fetchColumns } from '../handlers';
import { MirrorSetting, blankQRepSetting } from '../helpers/common';
import QRepQuery from './query';
import UpsertColsDisplay from './upsertcols';

interface QRepConfigProps {
Expand All @@ -23,13 +24,6 @@ interface QRepConfigProps {
xmin?: boolean;
}

interface QRepConfigProps {
settings: MirrorSetting[];
mirrorConfig: QRepConfig;
setter: MirrorSetter;
xmin?: boolean;
}

const WriteModes = ['Append', 'Upsert', 'Overwrite'].map((value) => ({
label: value,
value,
Expand Down Expand Up @@ -140,6 +134,15 @@ export default function QRepConfigForm({
}, [setter]);
return (
<>
<QRepQuery
query={mirrorConfig.query}
setter={(val: string) => {
setter((curr) => ({
...curr,
query: val,
}));
}}
/>
{mirrorConfig.sourcePeer?.name ? (
settings.map((setting, id) => {
return (
Expand Down
25 changes: 15 additions & 10 deletions ui/app/mirrors/create/qrep/query.tsx
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
import { Label } from '@/lib/Label';
import Editor from '@monaco-editor/react';
import { Dispatch, SetStateAction } from 'react';
const options = {
readOnly: false,
minimap: { enabled: false },
fontSize: 14,
};

interface QueryProps {
setter: Dispatch<SetStateAction<string>>;
setter: (val: string) => void;
query: string;
}
const QRepQuery = (props: QueryProps) => {
return (
<Editor
options={options}
height='10vh'
value={props.query}
defaultLanguage='pgsql'
defaultValue='-- Query for QRep Mirror'
onChange={(value) => props.setter(value as string)}
/>
<>
<Label as='label' style={{ marginTop: '1rem', marginBottom: '1rem' }}>
Replication Query
</Label>
<Editor
options={options}
height='10vh'
value={props.query}
defaultLanguage='pgsql'
defaultValue='-- Query for QRep Mirror'
onChange={(value) => props.setter(value as string)}
/>
</>
);
};

Expand Down
Loading

0 comments on commit cccae23

Please sign in to comment.