Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pause and resume mirror buttons, along with state reflection #1133

Merged
merged 7 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ service FlowService {
rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {
option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" };
}
rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {}
rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {
option (google.api.http) = { post: "/v1/mirrors/state_change", body: "*" };
}
rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) {
option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" };
}
Expand Down
2 changes: 1 addition & 1 deletion ui/app/api/mirrors/drop/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export async function POST(request: Request) {
destinationPeer,
removeFlowEntry: true,
};
console.log('/drop/mirror: req:', req);
console.log('/mirrors/drop: req:', req);
try {
const dropStatus: ShutdownResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/drop`,
Expand Down
23 changes: 23 additions & 0 deletions ui/app/api/mirrors/state_change/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { FlowStateChangeResponse } from '@/grpc_generated/route';
import { GetFlowHttpAddressFromEnv } from '@/rpc/http';

export async function POST(request: Request) {
const body = await request.json();
const flowServiceAddr = GetFlowHttpAddressFromEnv();
console.log('/mirrors/state_change: req:', body);
try {
const res: FlowStateChangeResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/state_change`,
{
method: 'POST',
body: JSON.stringify(body),
}
).then((res) => {
return res.json();
});

return new Response(JSON.stringify(res));
} catch (e) {
console.error(e);
}
}
15 changes: 9 additions & 6 deletions ui/app/mirrors/edit/[mirrorId]/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import TimeLabel from '@/components/TimeComponent';
import {
CDCMirrorStatus,
CloneTableSummary,
MirrorStatusResponse,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
Expand Down Expand Up @@ -230,13 +230,13 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
};

type CDCMirrorStatusProps = {
cdc: CDCMirrorStatus;
status: MirrorStatusResponse;
rows: SyncStatusRow[];
createdAt?: Date;
syncStatusChild?: React.ReactNode;
};
export function CDCMirror({
cdc,
status,
rows,
createdAt,
syncStatusChild,
Expand All @@ -249,8 +249,10 @@ export function CDCMirror({
};

let snapshot = <></>;
if (cdc.snapshotStatus) {
snapshot = <SnapshotStatusTable status={cdc.snapshotStatus} />;
if (status.cdcStatus?.snapshotStatus) {
snapshot = (
<SnapshotStatusTable status={status.cdcStatus?.snapshotStatus} />
);
}
useEffect(() => {
setMounted(true);
Expand Down Expand Up @@ -283,7 +285,8 @@ export function CDCMirror({
<CdcDetails
syncs={rows}
createdAt={createdAt}
mirrorConfig={cdc.config}
mirrorConfig={status.cdcStatus?.config!}
mirrorStatus={status.currentFlowState}
/>
</TabPanel>
<TabPanel>{syncStatusChild}</TabPanel>
Expand Down
95 changes: 89 additions & 6 deletions ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import MirrorInfo from '@/components/MirrorInfo';
import PeerButton from '@/components/PeerComponent';
import TimeLabel from '@/components/TimeComponent';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';
import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { dBTypeFromJSON } from '@/grpc_generated/peers';
import { FlowStateChangeRequest } from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import moment from 'moment';
import Link from 'next/link';
Expand All @@ -13,10 +16,11 @@ import TablePairs from './tablePairs';

type props = {
syncs: SyncStatusRow[];
mirrorConfig: FlowConnectionConfigs | undefined;
mirrorConfig: FlowConnectionConfigs;
createdAt?: Date;
mirrorStatus: FlowStatus;
};
function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
let lastSyncedAt = moment(
syncs.length > 1
? syncs[1]?.endTime
Expand All @@ -31,7 +35,7 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
return acc;
}, 0);

const tablesSynced = mirrorConfig?.tableMappings;
const tablesSynced = mirrorConfig.tableMappings;
return (
<>
<div className='mt-10'>
Expand All @@ -49,11 +53,14 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
borderRadius: '1rem',
border: '1px solid rgba(0,0,0,0.1)',
cursor: 'pointer',
display: 'flex',
alignItems: 'center',
}}
>
<Link href={`/mirrors/errors/${mirrorConfig?.flowJobName}`}>
<Label> Active </Label>
<Link href={`/mirrors/errors/${mirrorConfig.flowJobName}`}>
<Label>{formatStatus(mirrorStatus)}</Label>
</Link>
{statusChangeHandle(mirrorConfig, mirrorStatus)}
</div>
</div>
<div className='basis-1/4 md:basis-1/3'>
Expand Down Expand Up @@ -140,4 +147,80 @@ export function numberWithCommas(x: any): string {
return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ',');
}

function statusChangeHandle(
mirrorConfig: FlowConnectionConfigs,
mirrorStatus: FlowStatus
) {
// hopefully there's a better way to do this cast
if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_RUNNING]) {
return (
<Button
className='IconButton'
aria-label='Pause'
onClick={async () => {
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
const req: FlowStateChangeRequest = {
flowJobName: mirrorConfig.flowJobName,
sourcePeer: mirrorConfig.source,
destinationPeer: mirrorConfig.destination,
requestedFlowState: FlowStatus.STATUS_PAUSED,
};
await fetch(`/api/mirrors/state_change`, {
method: 'POST',
body: JSON.stringify(req),
heavycrystal marked this conversation as resolved.
Show resolved Hide resolved
cache: 'no-store',
});
window.location.reload();
}}
>
<Icon name='pause' />
</Button>
);
} else if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_PAUSED]) {
return (
<Button
className='IconButton'
aria-label='Play'
onClick={async () => {
const req: FlowStateChangeRequest = {
flowJobName: mirrorConfig.flowJobName,
sourcePeer: mirrorConfig.source,
destinationPeer: mirrorConfig.destination,
requestedFlowState: FlowStatus.STATUS_RUNNING,
};
await fetch(`/api/mirrors/state_change`, {
method: 'POST',
body: JSON.stringify(req),
cache: 'no-store',
});
window.location.reload();
}}
>
<Icon name='play_circle' />
</Button>
);
} else {
return (
<Button
className='IconButton'
aria-label='Pause (disabled)'
disabled={true}
style={{ opacity: '50%' }}
>
<Icon name='pause' />
</Button>
);
}
}

function formatStatus(mirrorStatus: FlowStatus) {
const mirrorStatusLower = mirrorStatus
.toString()
.split('_')
.at(-1)
?.toLocaleLowerCase()!;
return (
mirrorStatusLower.at(0)?.toLocaleUpperCase() + mirrorStatusLower.slice(1)
);
}

export default CdcDetails;
7 changes: 4 additions & 3 deletions ui/app/mirrors/edit/[mirrorId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ export default async function EditMirror({
return <div>No mirror status found!</div>;
}

let createdAt = await prisma.flows.findFirst({
let mirrorInfo = await prisma.flows.findFirst({
select: {
created_at: true,
workflow_id: true,
},
where: {
name: mirrorId,
Expand Down Expand Up @@ -86,9 +87,9 @@ export default async function EditMirror({
<Header variant='title2'>{mirrorId}</Header>
<CDCMirror
rows={rows}
createdAt={createdAt?.created_at}
createdAt={mirrorInfo?.created_at}
syncStatusChild={syncStatusChild}
cdc={mirrorStatus.cdcStatus}
status={mirrorStatus}
/>
</LayoutMain>
);
Expand Down
Loading