Skip to content

Commit

Permalink
create mirror v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 9, 2023
1 parent dd178cc commit 4716b10
Show file tree
Hide file tree
Showing 17 changed files with 698 additions and 89 deletions.
20 changes: 20 additions & 0 deletions ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {
CreateCDCFlowRequest,
CreateCDCFlowResponse,
} from '@/grpc_generated/route';
import { GetFlowServiceClientFromEnv } from '@/rpc/rpc';

export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
const flowServiceClient = GetFlowServiceClientFromEnv();
const req: CreateCDCFlowRequest = {
connectionConfigs: config,
};
const createStatus: CreateCDCFlowResponse =
await flowServiceClient.createCdcFlow(req);
if (!createStatus.worflowId) {
return new Response('Failed to create CDC mirror');
}
return new Response('created');
}
8 changes: 8 additions & 0 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
CreatePeerRequest,
CreatePeerResponse,
CreatePeerStatus,
ListPeersRequest,
ValidatePeerRequest,
ValidatePeerResponse,
ValidatePeerStatus,
Expand Down Expand Up @@ -63,3 +64,10 @@ export async function POST(request: Request) {
} else return new Response('status of peer creation is unknown');
} else return new Response('mode of peer creation is unknown');
}

export async function GET(request: Request) {
let flowServiceClient = GetFlowServiceClientFromEnv();
let req: ListPeersRequest = {};
let peers = await flowServiceClient.listPeers(req);
return new Response(JSON.stringify(peers));
}
162 changes: 162 additions & 0 deletions ui/app/mirrors/create/config.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
'use client';
import { QRepSyncMode } from '@/grpc_generated/flow';
import { Peer } from '@/grpc_generated/peers';
import { Label } from '@/lib/Label';
import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { Select, SelectItem } from '@/lib/Select';
import { Switch } from '@/lib/Switch';
import { TextField } from '@/lib/TextField';
import { Tooltip } from '@/lib/Tooltip';
import { InfoPopover } from '../../../components/InfoPopover';
import { MirrorConfig, MirrorSetter } from '../types';
import { MirrorSetting } from './helpers/common';

interface MirrorConfigProps {
settings: MirrorSetting[];
mirrorConfig: MirrorConfig;
peers: Peer[];
setter: MirrorSetter;
}

export default function MirrorConfig(props: MirrorConfigProps) {
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | Peer | QRepSyncMode = val;
if (setting.label.includes('Peer')) {
stateVal = props.peers.find((peer) => peer.name === val)!;
} else if (setting.label.includes('Sync Mode')) {
stateVal =
val === 'avro'
? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO
: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
}
setting.stateHandler(stateVal, props.setter);
};
const paramDisplayCondition = (setting: MirrorSetting) => {
const label = setting.label.toLowerCase();
if (
(label.includes('snapshot') &&
props.mirrorConfig.doInitialCopy !== true) ||
(label.includes('snapshot staging') &&
props.mirrorConfig.snapshotSyncMode?.toString() !== '1') ||
(label.includes('cdc staging') &&
props.mirrorConfig.cdcSyncMode?.toString() !== '1')
) {
return false;
}
return true;
};

return (
<>
{props.settings.map((setting, id) => {
return (
paramDisplayCondition(setting) &&
(setting.type === 'switch' ? (
<RowWithSwitch
key={id}
label={<Label>{setting.label}</Label>}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<Switch
onCheckedChange={(state) => handleChange(state, setting)}
/>
{setting.tips && (
<InfoPopover
tips={setting.tips}
link={setting.helpfulLink}
/>
)}
</div>
}
/>
) : setting.type === 'select' ? (
<RowWithSelect
key={id}
label={<Label>{setting.label}</Label>}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<Select
placeholder={`Select ${
setting.label.includes('Peer') ? 'a peer' : 'a sync mode'
}`}
onValueChange={(val) => handleChange(val, setting)}
>
{(setting.label.includes('Peer')
? props.peers?.map((peer) => peer.name)
: ['avro', 'sql']
).map((item, id) => {
return (
<SelectItem key={id} value={item.toString()}>
{item.toString()}
</SelectItem>
);
})}
</Select>
{setting.tips && (
<InfoPopover
tips={setting.tips}
link={setting.helpfulLink}
/>
)}
</div>
}
/>
) : (
<RowWithTextField
key={id}
label={
<Label>
{setting.label}
{setting.required && (
<Tooltip
style={{ width: '100%' }}
content={'This is a required field.'}
>
<Label colorName='lowContrast' colorSet='destructive'>
*
</Label>
</Tooltip>
)}
</Label>
}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<TextField
variant='simple'
type={setting.type}
defaultValue={setting.default}
onChange={(e) => handleChange(e.target.value, setting)}
/>
{setting.tips && (
<InfoPopover
tips={setting.tips}
link={setting.helpfulLink}
/>
)}
</div>
}
/>
))
);
})}
</>
);
}
78 changes: 78 additions & 0 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { ListPeersResponse } from '@/grpc_generated/route';
import { AppRouterInstance } from 'next/dist/shared/lib/app-router-context';
import { Dispatch, SetStateAction } from 'react';
import { MirrorConfig, TableMapRow } from '../types';
import { cdcSchema, tableMappingSchema } from './schema';

export const listAllPeers = async () => {
const peers = await fetch('../api/peers').then((res) => res.json());
return peers as ListPeersResponse;
};

const validateFlowFields = (
tableMapping: TableMapRow[],
setMsg: Dispatch<SetStateAction<{ ok: boolean; msg: string }>>,
config: MirrorConfig
): boolean => {
let validationErr: string | undefined;
const tablesValidity = tableMappingSchema.safeParse(tableMapping);
if (!tablesValidity.success) {
validationErr = tablesValidity.error.issues[0].message;
setMsg({ ok: false, msg: validationErr });
return false;
}
const configValidity = cdcSchema.safeParse(config);
if (!configValidity.success) {
validationErr = configValidity.error.issues[0].message;
setMsg({ ok: false, msg: validationErr });
return false;
}
setMsg({ ok: true, msg: '' });
return true;
};

const reformattedTableMapping = (tableMapping: TableMapRow[]) => {
const mapping: { [key: string]: string } = {};
tableMapping.forEach((row) => {
mapping[row.source] = row.destination;
});
return mapping;
};
export const handleCreate = async (
flowJobName: string,
rows: TableMapRow[],
config: MirrorConfig,
setMsg: Dispatch<
SetStateAction<{
ok: boolean;
msg: string;
}>
>,
setLoading: Dispatch<SetStateAction<boolean>>,
router: AppRouterInstance
) => {
if (!flowJobName) {
setMsg({ ok: false, msg: 'Mirror name is required' });
return;
}
const isValid = validateFlowFields(rows, setMsg, config);
if (!isValid) return;
const tableNameMapping = reformattedTableMapping(rows);
config['tableNameMapping'] = tableNameMapping;
config['flowJobName'] = flowJobName;
setLoading(true);
const statusMessage = await fetch('/api/mirrors/cdc', {
method: 'POST',
body: JSON.stringify({
config,
}),
}).then((res) => res.text());
if (statusMessage !== 'created') {
setMsg({ ok: false, msg: statusMessage });
setLoading(false);
return;
}
setMsg({ ok: true, msg: 'CDC Mirror created successfully' });
router.push('/mirrors');
setLoading(false);
};
Loading

0 comments on commit 4716b10

Please sign in to comment.