Skip to content

Commit

Permalink
Merge branch 'main' into temporal-sdk-126
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Mar 12, 2024
2 parents d53ab27 + 2d3790a commit 40c4340
Show file tree
Hide file tree
Showing 30 changed files with 453 additions and 164 deletions.
40 changes: 38 additions & 2 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (h *FlowRequestHandler) GetTablesInSchema(
CASE
WHEN con.contype = 'p' OR t.relreplident = 'i' OR t.relreplident = 'f' THEN true
ELSE false
END AS can_mirror
END AS can_mirror,
pg_size_pretty(pg_total_relation_size(t.oid)) :: text AS table_size
FROM
pg_class t
LEFT JOIN
Expand All @@ -108,6 +109,7 @@ func (h *FlowRequestHandler) GetTablesInSchema(
can_mirror DESC;
`, req.SchemaName)
if err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}

Expand All @@ -116,10 +118,15 @@ func (h *FlowRequestHandler) GetTablesInSchema(
for rows.Next() {
var table pgtype.Text
var hasPkeyOrReplica pgtype.Bool
err := rows.Scan(&table, &hasPkeyOrReplica)
var tableSize pgtype.Text
err := rows.Scan(&table, &hasPkeyOrReplica, &tableSize)
if err != nil {
return &protos.SchemaTablesResponse{Tables: nil}, err
}
var sizeOfTable string
if tableSize.Valid {
sizeOfTable = tableSize.String
}
canMirror := false
if hasPkeyOrReplica.Valid && hasPkeyOrReplica.Bool {
canMirror = true
Expand All @@ -128,8 +135,14 @@ func (h *FlowRequestHandler) GetTablesInSchema(
tables = append(tables, &protos.TableResponse{
TableName: table.String,
CanMirror: canMirror,
TableSize: sizeOfTable,
})
}

if err := rows.Err(); err != nil {
slog.Info("failed to fetch publications", slog.Any("error", err))
return &protos.SchemaTablesResponse{Tables: nil}, err
}
return &protos.SchemaTablesResponse{Tables: tables}, nil
}

Expand Down Expand Up @@ -325,3 +338,26 @@ func (h *FlowRequestHandler) GetStatInfo(
StatData: statInfoRows,
}, nil
}

func (h *FlowRequestHandler) GetPublications(
ctx context.Context,
req *protos.PostgresPeerActivityInfoRequest,
) (*protos.PeerPublicationsResponse, error) {
tunnel, peerConn, err := h.getConnForPGPeer(ctx, req.PeerName)
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
defer tunnel.Close()
defer peerConn.Close(ctx)

rows, err := peerConn.Query(ctx, "select pubname from pg_publication;")
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}

publications, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return &protos.PeerPublicationsResponse{PublicationNames: nil}, err
}
return &protos.PeerPublicationsResponse{PublicationNames: publications}, nil
}
9 changes: 9 additions & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ message PeerSchemasResponse {
repeated string schemas = 1;
}

message PeerPublicationsResponse {
repeated string publication_names = 1;
}

message SchemaTablesRequest {
string peer_name = 1;
string schema_name = 2;
Expand All @@ -121,6 +125,7 @@ message SchemaTablesResponse {
message TableResponse {
string table_name = 1;
bool can_mirror = 2;
string table_size = 3;
}

message AllTablesResponse {
Expand Down Expand Up @@ -265,6 +270,10 @@ service FlowService {
option (google.api.http) = { get: "/v1/peers/schemas" };
}

rpc GetPublications(PostgresPeerActivityInfoRequest) returns (PeerPublicationsResponse) {
option (google.api.http) = { get: "/v1/peers/publications" };
}

rpc GetTablesInSchema(SchemaTablesRequest) returns (SchemaTablesResponse) {
option (google.api.http) = { get: "/v1/peers/tables" };
}
Expand Down
22 changes: 22 additions & 0 deletions ui/app/api/peers/publications/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { UPublicationsResponse } from '@/app/dto/PeersDTO';
import { PeerPublicationsResponse } from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const { peerName } = body;
const flowServiceAddr = GetFlowHttpAddressFromEnv();
try {
const publicationList: PeerPublicationsResponse = await fetch(
`${flowServiceAddr}/v1/peers/publications?peer_name=${peerName}`
).then((res) => {
return res.json();
});
let response: UPublicationsResponse = {
publicationNames: publicationList.publicationNames,
};
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
}
}
5 changes: 0 additions & 5 deletions ui/app/api/peers/schemas/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ export async function POST(request: Request) {
let response: USchemasResponse = {
schemas: schemaList.schemas,
};
if (schemaList.message === 'no rows in result set') {
response = {
schemas: [],
};
}
return new Response(JSON.stringify(response));
} catch (e) {
console.log(e);
Expand Down
40 changes: 16 additions & 24 deletions ui/app/api/peers/slots/[name]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,22 @@ export async function GET(
break;
}

const lagPoints = await prisma.peer_slot_size.findMany({
select: {
updated_at: true,
slot_size: true,
},
where: {
slot_name: context.params.name,
updated_at: {
gte: new Date(Date.now() - forThePastThisMuchTime),
},
},
});
const lagPoints = await prisma.$queryRaw<
{ updated_at: Date; slot_size: bigint }[]
>`
select updated_at, slot_size
from peerdb_stats.peer_slot_size
where slot_size is not null
and slot_name = ${context.params.name}
and updated_at > ${new Date(Date.now() - forThePastThisMuchTime)}
order by random()
limit 720
`;

// convert slot_size to string
const stringedLagPoints: SlotLagPoint[] = lagPoints.map((lagPoint) => {
return {
// human readable
updatedAt:
lagPoint.updated_at.toDateString() +
' ' +
lagPoint.updated_at.toLocaleTimeString(),
slotSize: lagPoint.slot_size?.toString(),
};
});
const slotLagPoints: SlotLagPoint[] = lagPoints.map((lagPoint) => ({
updatedAt: +lagPoint.updated_at,
slotSize: Number(lagPoint.slot_size) / 1000,
}));

return NextResponse.json(stringedLagPoints);
return NextResponse.json(slotLagPoints);
}
1 change: 1 addition & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type TableMapRow = {
exclude: Set<string>;
selected: boolean;
canMirror: boolean;
tableSize: string;
};

export type SyncStatusRow = {
Expand Down
8 changes: 6 additions & 2 deletions ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export type CatalogPeer = {
export type PeerSetter = React.Dispatch<React.SetStateAction<PeerConfig>>;

export type SlotLagPoint = {
updatedAt: string;
slotSize?: string;
updatedAt: number;
slotSize: number;
};

export type UPublicationsResponse = {
publicationNames: string[];
};
52 changes: 32 additions & 20 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import { Label } from '@/lib/Label';
import { RowWithTextField } from '@/lib/Layout';
import { ProgressCircle } from '@/lib/ProgressCircle';
import { TextField } from '@/lib/TextField';
import { Callout } from '@tremor/react';
import { useRouter } from 'next/navigation';
import { useCallback, useEffect, useMemo, useState } from 'react';
import { ToastContainer, toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import TableMapping from '../../create/cdc/tablemapping';
import { reformattedTableMapping } from '../../create/handlers';
import { blankCDCSetting } from '../../create/helpers/common';
import * as styles from '../../create/styles';
type EditMirrorProps = {
params: { mirrorId: string };
};
Expand Down Expand Up @@ -120,8 +122,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {

return (
<div>
<Label variant='title3'>Edit {mirrorId}</Label>

<RowWithTextField
key={1}
label={<Label>{'Pull Batch Size'} </Label>}
Expand Down Expand Up @@ -174,6 +174,22 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
}
/>

<Label variant='action' as='label' style={{ marginTop: '1rem' }}>
Adding Tables
</Label>
{!isNotPaused && rows.some((row) => row.selected) && (
<Callout
title='Note on adding tables'
color={'gray'}
style={{ marginTop: '1rem' }}
>
CDC will be put on hold until initial load for these added tables have
been completed.
<br></br>
The <b>replication slot will grow</b> during this period.
</Callout>
)}

<TableMapping
sourcePeerName={mirrorState.cdcStatus?.config?.source?.name || ''}
peerType={mirrorState.cdcStatus?.config?.destination?.type}
Expand All @@ -182,19 +198,23 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
omitAdditionalTablesMapping={omitAdditionalTablesMapping}
/>

{isNotPaused ? (
<Label>Mirror can only be edited while paused.</Label>
) : (
<Label>Editing mirror will automatically unpause it.</Label>
{isNotPaused && (
<Callout title='' color={'rose'} style={{ marginTop: '1rem' }}>
Mirror can only be edited while paused.
</Callout>
)}
<div style={{ display: 'flex' }}>

<div style={styles.MirrorButtonContainer}>
<Button
style={{
marginTop: '1rem',
marginRight: '1rem',
width: '8%',
height: '2.5rem',
style={styles.MirrorButtonStyle}
onClick={() => {
push(`/mirrors/${mirrorId}`);
}}
>
Back
</Button>
<Button
style={styles.MirrorButtonStyle}
variant='normalSolid'
disabled={loading || isNotPaused}
onClick={sendFlowStateChangeRequest}
Expand All @@ -205,14 +225,6 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => {
'Edit Mirror'
)}
</Button>
<Button
style={{ marginTop: '1rem', width: '8%', height: '2.5rem' }}
onClick={() => {
push(`/mirrors/${mirrorId}`);
}}
>
Back
</Button>
</div>
<ToastContainer />
</div>
Expand Down
29 changes: 26 additions & 3 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import { DBType } from '@/grpc_generated/peers';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Dispatch, SetStateAction, useMemo, useState } from 'react';
import { Dispatch, SetStateAction, useEffect, useMemo, useState } from 'react';
import { CDCConfig, MirrorSetter, TableMapRow } from '../../../dto/MirrorsDTO';
import { fetchPublications } from '../handlers';
import { MirrorSetting } from '../helpers/common';
import CDCField from './fields';
import TableMapping from './tablemapping';
Expand Down Expand Up @@ -36,6 +37,7 @@ export default function CDCConfigForm({
rows,
setRows,
}: MirrorConfigProps) {
const [publications, setPublications] = useState<string[]>();
const [show, setShow] = useState(false);
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean = val;
Expand Down Expand Up @@ -64,7 +66,26 @@ export default function CDCConfigForm({
return true;
};

if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined)
const optionsForField = (setting: MirrorSetting) => {
switch (setting.label) {
case 'Publication Name':
return publications;
default:
return [];
}
};

useEffect(() => {
fetchPublications(mirrorConfig.source?.name || '').then((pubs) => {
setPublications(pubs);
});
}, [mirrorConfig.source?.name]);

if (
mirrorConfig.source != undefined &&
mirrorConfig.destination != undefined &&
publications != undefined
)
return (
<>
{normalSettings.map((setting, id) => {
Expand All @@ -74,6 +95,7 @@ export default function CDCConfigForm({
key={id}
handleChange={handleChange}
setting={setting}
options={optionsForField(setting)}
/>
)
);
Expand Down Expand Up @@ -101,9 +123,10 @@ export default function CDCConfigForm({
advancedSettings.map((setting, id) => {
return (
<CDCField
key={id}
key={setting.label}
handleChange={handleChange}
setting={setting}
options={optionsForField(setting)}
/>
);
})}
Expand Down
Loading

0 comments on commit 40c4340

Please sign in to comment.