diff --git a/protos/route.proto b/protos/route.proto index 046a63aca3..48db51e019 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -285,7 +285,9 @@ service FlowService { rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) { option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" }; } - rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {} + rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) { + option (google.api.http) = { post: "/v1/mirrors/state_change", body: "*" }; + } rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) { option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" }; } diff --git a/ui/app/api/mirrors/drop/route.ts b/ui/app/api/mirrors/drop/route.ts index e3be0f7c41..a52e4b078d 100644 --- a/ui/app/api/mirrors/drop/route.ts +++ b/ui/app/api/mirrors/drop/route.ts @@ -13,7 +13,7 @@ export async function POST(request: Request) { destinationPeer, removeFlowEntry: true, }; - console.log('/drop/mirror: req:', req); + console.log('/mirrors/drop: req:', req); try { const dropStatus: ShutdownResponse = await fetch( `${flowServiceAddr}/v1/mirrors/drop`, diff --git a/ui/app/api/mirrors/state_change/route.ts b/ui/app/api/mirrors/state_change/route.ts new file mode 100644 index 0000000000..5113c44bd1 --- /dev/null +++ b/ui/app/api/mirrors/state_change/route.ts @@ -0,0 +1,23 @@ +import { FlowStateChangeResponse } from "@/grpc_generated/route"; +import { GetFlowHttpAddressFromEnv } from "@/rpc/http"; + +export async function POST(request: Request) { + const body = await request.json(); + const flowServiceAddr = GetFlowHttpAddressFromEnv(); + console.log('/mirrors/state_change: req:', body); + try { + const res: FlowStateChangeResponse = await fetch( + `${flowServiceAddr}/v1/mirrors/state_change`, + { + method: 'POST', + body: JSON.stringify(body), + } + ).then((res) => { + return res.json(); + }); + + return new Response(JSON.stringify(res)); + } catch (e) { + console.error(e); + } + } diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx index e7f7ee0ca1..a45dfaecc6 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx @@ -4,6 +4,7 @@ import TimeLabel from '@/components/TimeComponent'; import { CDCMirrorStatus, CloneTableSummary, + MirrorStatusResponse, SnapshotStatus, } from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; @@ -230,13 +231,13 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => { }; type CDCMirrorStatusProps = { - cdc: CDCMirrorStatus; + status: MirrorStatusResponse; rows: SyncStatusRow[]; createdAt?: Date; syncStatusChild?: React.ReactNode; }; export function CDCMirror({ - cdc, + status, rows, createdAt, syncStatusChild, @@ -249,8 +250,8 @@ export function CDCMirror({ }; let snapshot = <>; - if (cdc.snapshotStatus) { - snapshot = ; + if (status.cdcStatus?.snapshotStatus) { + snapshot = ; } useEffect(() => { setMounted(true); @@ -283,7 +284,8 @@ export function CDCMirror({ {syncStatusChild} diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx index 24f4c2ac38..809289d10a 100644 --- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx @@ -3,8 +3,10 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO'; import MirrorInfo from '@/components/MirrorInfo'; import PeerButton from '@/components/PeerComponent'; import TimeLabel from '@/components/TimeComponent'; -import { FlowConnectionConfigs } from '@/grpc_generated/flow'; +import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow'; import { dBTypeFromJSON } from '@/grpc_generated/peers'; +import { FlowStateChangeRequest } from '@/grpc_generated/route'; +import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import moment from 'moment'; import Link from 'next/link'; @@ -13,10 +15,11 @@ import TablePairs from './tablePairs'; type props = { syncs: SyncStatusRow[]; - mirrorConfig: FlowConnectionConfigs | undefined; + mirrorConfig: FlowConnectionConfigs; createdAt?: Date; + mirrorStatus: FlowStatus; }; -function CdcDetails({ syncs, createdAt, mirrorConfig }: props) { +function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) { let lastSyncedAt = moment( syncs.length > 1 ? syncs[1]?.endTime @@ -31,7 +34,7 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) { return acc; }, 0); - const tablesSynced = mirrorConfig?.tableMappings; + const tablesSynced = mirrorConfig.tableMappings; return ( <>
@@ -49,11 +52,14 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) { borderRadius: '1rem', border: '1px solid rgba(0,0,0,0.1)', cursor: 'pointer', + display: 'flex', + alignItems: 'center', }} > - - + + + {statusChangeHandle(mirrorConfig, mirrorStatus)}
@@ -140,4 +146,78 @@ export function numberWithCommas(x: any): string { return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ','); } +function statusChangeHandle( + mirrorConfig: FlowConnectionConfigs, + mirrorStatus: FlowStatus +) { + // hopefully there's a better way to do this cast + if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_RUNNING]) { + return ( + + ); + } else if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_PAUSED]) { + return ( + + ); + } else { + return ( + + ); + } +} + +function formatStatus(mirrorStatus: FlowStatus) { + const mirrorStatusLower = mirrorStatus + .toString() + .split('_') + .at(-1) + ?.toLocaleLowerCase()!; + return ( + mirrorStatusLower.at(0)?.toLocaleUpperCase() + mirrorStatusLower.slice(1) + ); +} + export default CdcDetails; diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx index d472b849e7..259fbc8e7f 100644 --- a/ui/app/mirrors/edit/[mirrorId]/page.tsx +++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx @@ -8,6 +8,7 @@ import { redirect } from 'next/navigation'; import { CDCMirror } from './cdc'; import NoMirror from './nomirror'; import SyncStatus from './syncStatus'; +import { Icon } from '@/lib/Icon'; type EditMirrorProps = { params: { mirrorId: string }; @@ -33,9 +34,10 @@ export default async function EditMirror({ return
No mirror status found!
; } - let createdAt = await prisma.flows.findFirst({ + let mirrorInfo = await prisma.flows.findFirst({ select: { created_at: true, + workflow_id: true }, where: { name: mirrorId, @@ -86,9 +88,9 @@ export default async function EditMirror({
{mirrorId}
);