Skip to content

Commit

Permalink
pause and resume mirror buttons, along with state reflection
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 23, 2024
1 parent 6997d5e commit 6e3e612
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 16 deletions.
4 changes: 3 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ service FlowService {
rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {
option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" };
}
rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {}
rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {
option (google.api.http) = { post: "/v1/mirrors/state_change", body: "*" };
}
rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) {
option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" };
}
Expand Down
2 changes: 1 addition & 1 deletion ui/app/api/mirrors/drop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export async function POST(request: Request) {
destinationPeer,
removeFlowEntry: true,
};
console.log('/drop/mirror: req:', req);
console.log('/mirrors/drop: req:', req);
try {
const dropStatus: ShutdownResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/drop`,
Expand Down
23 changes: 23 additions & 0 deletions ui/app/api/mirrors/state_change/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { FlowStateChangeResponse } from "@/grpc_generated/route";
import { GetFlowHttpAddressFromEnv } from "@/rpc/http";

export async function POST(request: Request) {
const body = await request.json();
const flowServiceAddr = GetFlowHttpAddressFromEnv();
console.log('/mirrors/state_change: req:', body);
try {
const res: FlowStateChangeResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/state_change`,
{
method: 'POST',
body: JSON.stringify(body),
}
).then((res) => {
return res.json();
});

return new Response(JSON.stringify(res));
} catch (e) {
console.error(e);
}
}
12 changes: 7 additions & 5 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import TimeLabel from '@/components/TimeComponent';
import {
CDCMirrorStatus,
CloneTableSummary,
MirrorStatusResponse,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
Expand Down Expand Up @@ -230,13 +231,13 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
};

type CDCMirrorStatusProps = {
cdc: CDCMirrorStatus;
status: MirrorStatusResponse;
rows: SyncStatusRow[];
createdAt?: Date;
syncStatusChild?: React.ReactNode;
};
export function CDCMirror({
cdc,
status,
rows,
createdAt,
syncStatusChild,
Expand All @@ -249,8 +250,8 @@ export function CDCMirror({
};

let snapshot = <></>;
if (cdc.snapshotStatus) {
snapshot = <SnapshotStatusTable status={cdc.snapshotStatus} />;
if (status.cdcStatus?.snapshotStatus) {
snapshot = <SnapshotStatusTable status={status.cdcStatus?.snapshotStatus} />;
}
useEffect(() => {
setMounted(true);
Expand Down Expand Up @@ -283,7 +284,8 @@ export function CDCMirror({
<CdcDetails
syncs={rows}
createdAt={createdAt}
mirrorConfig={cdc.config}
mirrorConfig={status.cdcStatus?.config!}
mirrorStatus={status.currentFlowState}
/>
</TabPanel>
<TabPanel>{syncStatusChild}</TabPanel>
Expand Down
92 changes: 86 additions & 6 deletions ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import MirrorInfo from '@/components/MirrorInfo';
import PeerButton from '@/components/PeerComponent';
import TimeLabel from '@/components/TimeComponent';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { dBTypeFromJSON } from '@/grpc_generated/peers';
import { FlowStateChangeRequest } from '@/grpc_generated/route';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import moment from 'moment';
import Link from 'next/link';
Expand All @@ -13,10 +15,11 @@ import TablePairs from './tablePairs';

type props = {
syncs: SyncStatusRow[];
mirrorConfig: FlowConnectionConfigs | undefined;
mirrorConfig: FlowConnectionConfigs;
createdAt?: Date;
mirrorStatus: FlowStatus;
};
function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
let lastSyncedAt = moment(
syncs.length > 1
? syncs[1]?.endTime
Expand All @@ -31,7 +34,7 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
return acc;
}, 0);

const tablesSynced = mirrorConfig?.tableMappings;
const tablesSynced = mirrorConfig.tableMappings;
return (
<>
<div className='mt-10'>
Expand All @@ -49,11 +52,14 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
borderRadius: '1rem',
border: '1px solid rgba(0,0,0,0.1)',
cursor: 'pointer',
display: 'flex',
alignItems: 'center',
}}
>
<Link href={`/mirrors/errors/${mirrorConfig?.flowJobName}`}>
<Label> Active </Label>
<Link href={`/mirrors/errors/${mirrorConfig.flowJobName}`}>
<Label>{formatStatus(mirrorStatus)}</Label>
</Link>
{statusChangeHandle(mirrorConfig, mirrorStatus)}
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
Expand Down Expand Up @@ -140,4 +146,78 @@ export function numberWithCommas(x: any): string {
return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ',');
}

function statusChangeHandle(
mirrorConfig: FlowConnectionConfigs,
mirrorStatus: FlowStatus
) {
// hopefully there's a better way to do this cast
if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_RUNNING]) {
return (
<button
className='IconButton'
aria-label='Pause'
onClick={async () => {
const req: FlowStateChangeRequest = {
flowJobName: mirrorConfig.flowJobName,
sourcePeer: mirrorConfig.source,
destinationPeer: mirrorConfig.destination,
requestedFlowState: FlowStatus.STATUS_PAUSED,
};
await fetch(`/api/mirrors/state_change`, {
method: 'POST',
body: JSON.stringify(req),
});
window.location.reload();
}}
>
<Icon name='pause' />
</button>
);
} else if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_PAUSED]) {
return (
<button
className='IconButton'
aria-label='Play'
onClick={async () => {
const req: FlowStateChangeRequest = {
flowJobName: mirrorConfig.flowJobName,
sourcePeer: mirrorConfig.source,
destinationPeer: mirrorConfig.destination,
requestedFlowState: FlowStatus.STATUS_RUNNING,
};
await fetch(`/api/mirrors/state_change`, {
method: 'POST',
body: JSON.stringify(req),
});
window.location.reload();
}}
>
<Icon name='play_circle' />
</button>
);
} else {
return (
<button
className='IconButton'
aria-label='Pause (disabled)'
disabled={true}
style={{opacity:'50%'}}
>
<Icon name='pause' />
</button>
);
}
}

function formatStatus(mirrorStatus: FlowStatus) {
const mirrorStatusLower = mirrorStatus
.toString()
.split('_')
.at(-1)
?.toLocaleLowerCase()!;
return (
mirrorStatusLower.at(0)?.toLocaleUpperCase() + mirrorStatusLower.slice(1)
);
}

export default CdcDetails;
8 changes: 5 additions & 3 deletions ui/app/mirrors/edit/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { redirect } from 'next/navigation';
import { CDCMirror } from './cdc';
import NoMirror from './nomirror';
import SyncStatus from './syncStatus';
import { Icon } from '@/lib/Icon';

type EditMirrorProps = {
params: { mirrorId: string };
Expand All @@ -33,9 +34,10 @@ export default async function EditMirror({
return <div>No mirror status found!</div>;
}

let createdAt = await prisma.flows.findFirst({
let mirrorInfo = await prisma.flows.findFirst({
select: {
created_at: true,
workflow_id: true
},
where: {
name: mirrorId,
Expand Down Expand Up @@ -86,9 +88,9 @@ export default async function EditMirror({
<Header variant='title2'>{mirrorId}</Header>
<CDCMirror
rows={rows}
createdAt={createdAt?.created_at}
createdAt={mirrorInfo?.created_at}
syncStatusChild={syncStatusChild}
cdc={mirrorStatus.cdcStatus}
status={mirrorStatus}
/>
</LayoutMain>
);
Expand Down

0 comments on commit 6e3e612

Please sign in to comment.