Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plethora of UI Improvements #1470

Merged
merged 6 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (h *FlowRequestHandler) GetTablesInSchema(
CASE
WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
ELSE false
END AS can_mirror
END AS can_mirror,
pg_size_pretty(pg_total_relation_size(t.oid)) :: text AS table_size
FROM
pg_class t
LEFT JOIN
Expand All @@ -108,6 +109,7 @@ func (h *FlowRequestHandler) GetTablesInSchema(
can_mirror DESC;
`, req.SchemaName)
if err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}

Expand All @@ -116,10 +118,15 @@ func (h *FlowRequestHandler) GetTablesInSchema(
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
var tableSize pgtype.Text
err := rows.Scan(&table, &hasPkeyOrReplica, &tableSize)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
var sizeOfTable string
if tableSize.Valid {
sizeOfTable = tableSize.String
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
Expand All @@ -128,8 +135,14 @@ func (h *FlowRequestHandler) GetTablesInSchema(
tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
TableSize: sizeOfTable,
})
}

if err := rows.Err(); err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}

Expand Down Expand Up @@ -325,3 +338,26 @@ func (h *FlowRequestHandler) GetStatInfo(
StatData: statInfoRows,
}, nil
}

func (h *FlowRequestHandler) GetPublications(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerPublicationsResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "select pubname from pg_publication;")
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}

publications, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
return &protos.PeerPublicationsResponse{PublicationNames: publications}, nil
}
9 changes: 9 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ message PeerSchemasResponse {
repeated string schemas = 1;
}

message PeerPublicationsResponse {
repeated string publication_names = 1;
}

message SchemaTablesRequest {
string peer_name = 1;
string schema_name = 2;
Expand All @@ -121,6 +125,7 @@ message SchemaTablesResponse {
message TableResponse {
string table_name = 1;
bool can_mirror = 2;
string table_size = 3;
}

message AllTablesResponse {
Expand Down Expand Up @@ -265,6 +270,10 @@ service FlowService {
option (google.api.http) = { get: "/v1/peers/schemas" };
}

rpc GetPublications(PostgresPeerActivityInfoRequest) returns (PeerPublicationsResponse) {
option (google.api.http) = { get: "/v1/peers/publications" };
}

rpc GetTablesInSchema(SchemaTablesRequest) returns (SchemaTablesResponse) {
option (google.api.http) = { get: "/v1/peers/tables" };
}
Expand Down
22 changes: 22 additions & 0 deletions ui/app/api/peers/publications/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { UPublicationsResponse } from '@/app/dto/PeersDTO';
import { PeerPublicationsResponse } from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { peerName } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
try {
const publicationList: PeerPublicationsResponse = await fetch(
`${flowServiceAddr}/v1/peers/publications?peer_name=${peerName}`
).then((res) => {
return res.json();
});
let response: UPublicationsResponse = {
publicationNames: publicationList.publicationNames,
};
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
5 changes: 0 additions & 5 deletions ui/app/api/peers/schemas/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ export async function POST(request: Request) {
let response: USchemasResponse = {
schemas: schemaList.schemas,
};
if (schemaList.message === 'no rows in result set') {
response = {
schemas: [],
};
}
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
Expand Down
1 change: 1 addition & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type TableMapRow = {
exclude: Set<string>;
selected: boolean;
canMirror: boolean;
tableSize: string;
};

export type SyncStatusRow = {
Expand Down
4 changes: 4 additions & 0 deletions ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ export type SlotLagPoint = {
updatedAt: string;
slotSize?: string;
};

export type UPublicationsResponse = {
publicationNames: string[];
};
52 changes: 32 additions & 20 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import { Label } from '@/lib/Label';
import { RowWithTextField } from '@/lib/Layout';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { TextField } from '@/lib/TextField';
import { Callout } from '@tremor/react';
import { useRouter } from 'next/navigation';
import { useCallback, useEffect, useMemo, useState } from 'react';
import { ToastContainer, toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import TableMapping from '../../create/cdc/tablemapping';
import { reformattedTableMapping } from '../../create/handlers';
import { blankCDCSetting } from '../../create/helpers/common';
import * as styles from '../../create/styles';
type EditMirrorProps = {
params: { mirrorId: string };
};
Expand Down Expand Up @@ -120,8 +122,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>

<RowWithTextField
key={1}
label={<Label>{'Pull Batch Size'} </Label>}
Expand Down Expand Up @@ -174,6 +174,22 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
/>

<Label variant='action' as='label' style={{ marginTop: '1rem' }}>
Adding Tables
</Label>
{!isNotPaused && rows.some((row) => row.selected) && (
<Callout
title='Note on adding tables'
color={'gray'}
style={{ marginTop: '1rem' }}
>
CDC will be put on hold until initial load for these added tables have
been completed.
<br></br>
The <b>replication slot will grow</b> during this period.
</Callout>
)}

<TableMapping
sourcePeerName={mirrorState.cdcStatus?.config?.source?.name || ''}
peerType={mirrorState.cdcStatus?.config?.destination?.type}
Expand All @@ -182,19 +198,23 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
{isNotPaused && (
<Callout title='' color={'rose'} style={{ marginTop: '1rem' }}>
Mirror can only be edited while paused.
</Callout>
)}
<div style={{ display: 'flex' }}>

<div style={styles.MirrorButtonContainer}>
<Button
style={{
marginTop: '1rem',
marginRight: '1rem',
width: '8%',
height: '2.5rem',
style={styles.MirrorButtonStyle}
onClick={() => {
push(`/mirrors/${mirrorId}`);
}}
>
Back
</Button>
<Button
style={styles.MirrorButtonStyle}
variant='normalSolid'
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
Expand All @@ -205,14 +225,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
'Edit Mirror'
)}
</Button>
<Button
style={{ marginTop: '1rem', width: '8%', height: '2.5rem' }}
onClick={() => {
push(`/mirrors/${mirrorId}`);
}}
>
Back
</Button>
</div>
<ToastContainer />
</div>
Expand Down
29 changes: 26 additions & 3 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import { DBType } from '@/grpc_generated/peers';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Dispatch, SetStateAction, useMemo, useState } from 'react';
import { Dispatch, SetStateAction, useEffect, useMemo, useState } from 'react';
import { CDCConfig, MirrorSetter, TableMapRow } from '../../../dto/MirrorsDTO';
import { fetchPublications } from '../handlers';
import { MirrorSetting } from '../helpers/common';
import CDCField from './fields';
import TableMapping from './tablemapping';
Expand Down Expand Up @@ -36,6 +37,7 @@ export default function CDCConfigForm({
rows,
setRows,
}: MirrorConfigProps) {
const [publications, setPublications] = useState<string[]>();
const [show, setShow] = useState(false);
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean = val;
Expand Down Expand Up @@ -64,7 +66,26 @@ export default function CDCConfigForm({
return true;
};

if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined)
const optionsForField = (setting: MirrorSetting) => {
switch (setting.label) {
case 'Publication Name':
return publications;
default:
return [];
}
};

useEffect(() => {
fetchPublications(mirrorConfig.source?.name || '').then((pubs) => {
setPublications(pubs);
});
}, [mirrorConfig.source?.name]);

if (
mirrorConfig.source != undefined &&
mirrorConfig.destination != undefined &&
publications != undefined
)
return (
<>
{normalSettings.map((setting, id) => {
Expand All @@ -74,6 +95,7 @@ export default function CDCConfigForm({
key={id}
handleChange={handleChange}
setting={setting}
options={optionsForField(setting)}
/>
)
);
Expand Down Expand Up @@ -101,9 +123,10 @@ export default function CDCConfigForm({
advancedSettings.map((setting, id) => {
return (
<CDCField
key={id}
key={setting.label}
handleChange={handleChange}
setting={setting}
options={optionsForField(setting)}
/>
);
})}
Expand Down
Loading
Loading