Skip to content

Commit

Permalink
bunch of UI improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 10, 2024
1 parent 548f657 commit 8db584f
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 84 deletions.
40 changes: 38 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (h *FlowRequestHandler) GetTablesInSchema(
CASE
WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
ELSE false
END AS can_mirror
END AS can_mirror,
pg_size_pretty(pg_total_relation_size(quote_ident(n.nspname) || '.' || quote_ident(t.relname))) :: text AS table_size
FROM
pg_class t
LEFT JOIN
Expand All @@ -108,6 +109,7 @@ func (h *FlowRequestHandler) GetTablesInSchema(
can_mirror DESC;
`, req.SchemaName)
if err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}

Expand All @@ -116,10 +118,15 @@ func (h *FlowRequestHandler) GetTablesInSchema(
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
var tableSize pgtype.Text
err := rows.Scan(&table, &hasPkeyOrReplica, &tableSize)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
var sizeOfTable string
if tableSize.Valid {
sizeOfTable = tableSize.String
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
Expand All @@ -128,8 +135,14 @@ func (h *FlowRequestHandler) GetTablesInSchema(
tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
TableSize: sizeOfTable,
})
}

if err := rows.Err(); err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}

Expand Down Expand Up @@ -325,3 +338,26 @@ func (h *FlowRequestHandler) GetStatInfo(
StatData: statInfoRows,
}, nil
}

func (h *FlowRequestHandler) GetPublications(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerPublicationsResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "select pubname from pg_publication;")
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}

publications, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
return &protos.PeerPublicationsResponse{PublicationNames: publications}, nil
}
9 changes: 9 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ message PeerSchemasResponse {
repeated string schemas = 1;
}

message PeerPublicationsResponse {
repeated string publication_names = 1;
}

message SchemaTablesRequest {
string peer_name = 1;
string schema_name = 2;
Expand All @@ -121,6 +125,7 @@ message SchemaTablesResponse {
message TableResponse {
string table_name = 1;
bool can_mirror = 2;
string table_size = 3;
}

message AllTablesResponse {
Expand Down Expand Up @@ -265,6 +270,10 @@ service FlowService {
option (google.api.http) = { get: "/v1/peers/schemas" };
}

rpc GetPublications(PostgresPeerActivityInfoRequest) returns (PeerPublicationsResponse) {
option (google.api.http) = { get: "/v1/peers/publications" };
}

rpc GetTablesInSchema(SchemaTablesRequest) returns (SchemaTablesResponse) {
option (google.api.http) = { get: "/v1/peers/tables" };
}
Expand Down
22 changes: 22 additions & 0 deletions ui/app/api/peers/publications/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { UPublicationsResponse } from '@/app/dto/PeersDTO';
import { PeerPublicationsResponse } from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { peerName } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
try {
const publicationList: PeerPublicationsResponse = await fetch(
`${flowServiceAddr}/v1/peers/publications?peer_name=${peerName}`
).then((res) => {
return res.json();
});
let response: UPublicationsResponse = {
publicationNames: publicationList.publicationNames,
};
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
5 changes: 0 additions & 5 deletions ui/app/api/peers/schemas/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ export async function POST(request: Request) {
let response: USchemasResponse = {
schemas: schemaList.schemas,
};
if (schemaList.message === 'no rows in result set') {
response = {
schemas: [],
};
}
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
Expand Down
1 change: 1 addition & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type TableMapRow = {
exclude: Set<string>;
selected: boolean;
canMirror: boolean;
tableSize: string;
};

export type SyncStatusRow = {
Expand Down
4 changes: 4 additions & 0 deletions ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ export type SlotLagPoint = {
updatedAt: string;
slotSize?: string;
};

export type UPublicationsResponse = {
publicationNames: string[];
};
52 changes: 32 additions & 20 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import { Label } from '@/lib/Label';
import { RowWithTextField } from '@/lib/Layout';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { TextField } from '@/lib/TextField';
import { Callout } from '@tremor/react';
import { useRouter } from 'next/navigation';
import { useCallback, useEffect, useMemo, useState } from 'react';
import { ToastContainer, toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import TableMapping from '../../create/cdc/tablemapping';
import { reformattedTableMapping } from '../../create/handlers';
import { blankCDCSetting } from '../../create/helpers/common';
import * as styles from '../../create/styles';
type EditMirrorProps = {
params: { mirrorId: string };
};
Expand Down Expand Up @@ -120,8 +122,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>

<RowWithTextField
key={1}
label={<Label>{'Pull Batch Size'} </Label>}
Expand Down Expand Up @@ -174,6 +174,22 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
/>

<Label variant='action' as='label' style={{ marginTop: '1rem' }}>
Adding Tables
</Label>
{!isNotPaused && rows.some((row) => row.selected === true) && (
<Callout
title='Note on adding tables'
color={'gray'}
style={{ marginTop: '1rem' }}
>
CDC will be put on hold until initial load for these added tables have
been completed.
<br></br>
The <b>replication slot will grow</b> during this period.
</Callout>
)}

<TableMapping
sourcePeerName={mirrorState.cdcStatus?.config?.source?.name || ''}
peerType={mirrorState.cdcStatus?.config?.destination?.type}
Expand All @@ -182,19 +198,23 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
{isNotPaused && (
<Callout title='' color={'rose'} style={{ marginTop: '1rem' }}>
Mirror can only be edited while paused.
</Callout>
)}
<div style={{ display: 'flex' }}>

<div style={styles.MirrorButtonContainer}>
<Button
style={{
marginTop: '1rem',
marginRight: '1rem',
width: '8%',
height: '2.5rem',
style={styles.MirrorButtonStyle}
onClick={() => {
push(`/mirrors/${mirrorId}`);
}}
>
Back
</Button>
<Button
style={styles.MirrorButtonStyle}
variant='normalSolid'
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
Expand All @@ -205,14 +225,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
'Edit Mirror'
)}
</Button>
<Button
style={{ marginTop: '1rem', width: '8%', height: '2.5rem' }}
onClick={() => {
push(`/mirrors/${mirrorId}`);
}}
>
Back
</Button>
</div>
<ToastContainer />
</div>
Expand Down
29 changes: 26 additions & 3 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import { DBType } from '@/grpc_generated/peers';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Dispatch, SetStateAction, useMemo, useState } from 'react';
import { Dispatch, SetStateAction, useEffect, useMemo, useState } from 'react';
import { CDCConfig, MirrorSetter, TableMapRow } from '../../../dto/MirrorsDTO';
import { fetchPublications } from '../handlers';
import { MirrorSetting } from '../helpers/common';
import CDCField from './fields';
import TableMapping from './tablemapping';
Expand Down Expand Up @@ -36,6 +37,7 @@ export default function CDCConfigForm({
rows,
setRows,
}: MirrorConfigProps) {
const [publications, setPublications] = useState<string[]>();
const [show, setShow] = useState(false);
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean = val;
Expand Down Expand Up @@ -64,7 +66,26 @@ export default function CDCConfigForm({
return true;
};

if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined)
const optionsForField = (setting: MirrorSetting) => {
switch (setting.label) {
case 'Publication Name':
return publications;
default:
return [];
}
};

useEffect(() => {
fetchPublications(mirrorConfig.source?.name || '').then((pubs) => {
setPublications(pubs);
});
}, [mirrorConfig.source?.name]);

if (
mirrorConfig.source != undefined &&
mirrorConfig.destination != undefined &&
publications != undefined
)
return (
<>
{normalSettings.map((setting, id) => {
Expand All @@ -74,6 +95,7 @@ export default function CDCConfigForm({
key={id}
handleChange={handleChange}
setting={setting}
options={optionsForField(setting)}
/>
)
);
Expand Down Expand Up @@ -101,9 +123,10 @@ export default function CDCConfigForm({
advancedSettings.map((setting, id) => {
return (
<CDCField
key={id}
key={setting.label}
handleChange={handleChange}
setting={setting}
options={optionsForField(setting)}
/>
);
})}
Expand Down
48 changes: 45 additions & 3 deletions ui/app/mirrors/create/cdc/fields.tsx
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
'use client';
import SelectTheme from '@/app/styles/select';
import { RequiredIndicator } from '@/components/RequiredIndicator';
import { Label } from '@/lib/Label';
import { RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout';
import { Switch } from '@/lib/Switch';
import { TextField } from '@/lib/TextField';
import ReactSelect from 'react-select';
import { InfoPopover } from '../../../../components/InfoPopover';
import { MirrorSetting } from '../helpers/common';

interface FieldProps {
setting: MirrorSetting;
handleChange: (val: string | boolean, setting: MirrorSetting) => void;
options?: string[];
}

const CDCField = ({ setting, handleChange }: FieldProps) => {
const CDCField = ({ setting, handleChange, options }: FieldProps) => {
return setting.type === 'switch' ? (
<RowWithSwitch
label={<Label>{setting.label}</Label>}
label={
<Label>
{setting.label}
{RequiredIndicator(setting.required)}
</Label>
}
action={
<div
style={{
Expand All @@ -34,6 +42,40 @@ const CDCField = ({ setting, handleChange }: FieldProps) => {
</div>
}
/>
) : setting.type === 'select' ? (
<RowWithSelect
label={
<Label>
{setting.label}
{RequiredIndicator(setting.required)}
</Label>
}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<div style={{ width: '100%' }}>
<ReactSelect
placeholder={`Select a publication`}
onChange={(val, action) =>
val && handleChange(val.option, setting)
}
options={options?.map((option) => ({ option, label: option }))}
getOptionLabel={(option) => option.label}
getOptionValue={(option) => option.option}
theme={SelectTheme}
/>
</div>
{setting.tips && (
<InfoPopover tips={setting.tips} link={setting.helpfulLink} />
)}
</div>
}
/>
) : (
<RowWithTextField
label={
Expand Down
Loading

0 comments on commit 8db584f

Please sign in to comment.