From a013eeeb463a7f8df94d54aed874c778a5605f84 Mon Sep 17 00:00:00 2001
From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com>
Date: Thu, 25 Jan 2024 00:52:47 +0530
Subject: [PATCH] pause and resume mirror buttons, along with state reflection
(#1133)
not the most aligned button
---
protos/route.proto | 4 +-
ui/app/api/mirrors/drop/route.ts | 2 +-
ui/app/api/mirrors/state_change/route.ts | 23 +++++
ui/app/mirrors/edit/[mirrorId]/cdc.tsx | 15 +--
ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx | 95 +++++++++++++++++--
ui/app/mirrors/edit/[mirrorId]/page.tsx | 7 +-
6 files changed, 129 insertions(+), 17 deletions(-)
create mode 100644 ui/app/api/mirrors/state_change/route.ts
diff --git a/protos/route.proto b/protos/route.proto
index 046a63aca3..48db51e019 100644
--- a/protos/route.proto
+++ b/protos/route.proto
@@ -285,7 +285,9 @@ service FlowService {
rpc ShutdownFlow(ShutdownRequest) returns (ShutdownResponse) {
option (google.api.http) = { post: "/v1/mirrors/drop", body: "*" };
}
- rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {}
+ rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) {
+ option (google.api.http) = { post: "/v1/mirrors/state_change", body: "*" };
+ }
rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) {
option (google.api.http) = { get: "/v1/mirrors/{flow_job_name}" };
}
diff --git a/ui/app/api/mirrors/drop/route.ts b/ui/app/api/mirrors/drop/route.ts
index e3be0f7c41..a52e4b078d 100644
--- a/ui/app/api/mirrors/drop/route.ts
+++ b/ui/app/api/mirrors/drop/route.ts
@@ -13,7 +13,7 @@ export async function POST(request: Request) {
destinationPeer,
removeFlowEntry: true,
};
- console.log('/drop/mirror: req:', req);
+ console.log('/mirrors/drop: req:', req);
try {
const dropStatus: ShutdownResponse = await fetch(
`${flowServiceAddr}/v1/mirrors/drop`,
diff --git a/ui/app/api/mirrors/state_change/route.ts b/ui/app/api/mirrors/state_change/route.ts
new file mode 100644
index 0000000000..bcc4dd2a8a
--- /dev/null
+++ b/ui/app/api/mirrors/state_change/route.ts
@@ -0,0 +1,23 @@
+import { FlowStateChangeResponse } from '@/grpc_generated/route';
+import { GetFlowHttpAddressFromEnv } from '@/rpc/http';
+
+export async function POST(request: Request) {
+ const body = await request.json();
+ const flowServiceAddr = GetFlowHttpAddressFromEnv();
+ console.log('/mirrors/state_change: req:', body);
+ try {
+ const res: FlowStateChangeResponse = await fetch(
+ `${flowServiceAddr}/v1/mirrors/state_change`,
+ {
+ method: 'POST',
+ body: JSON.stringify(body),
+ }
+ ).then((res) => {
+ return res.json();
+ });
+
+ return new Response(JSON.stringify(res));
+ } catch (e) {
+ console.error(e);
+ }
+}
diff --git a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx
index e7f7ee0ca1..dd55b5a9bf 100644
--- a/ui/app/mirrors/edit/[mirrorId]/cdc.tsx
+++ b/ui/app/mirrors/edit/[mirrorId]/cdc.tsx
@@ -2,8 +2,8 @@
import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import TimeLabel from '@/components/TimeComponent';
import {
- CDCMirrorStatus,
CloneTableSummary,
+ MirrorStatusResponse,
SnapshotStatus,
} from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
@@ -230,13 +230,13 @@ export const SnapshotStatusTable = ({ status }: SnapshotStatusProps) => {
};
type CDCMirrorStatusProps = {
- cdc: CDCMirrorStatus;
+ status: MirrorStatusResponse;
rows: SyncStatusRow[];
createdAt?: Date;
syncStatusChild?: React.ReactNode;
};
export function CDCMirror({
- cdc,
+ status,
rows,
createdAt,
syncStatusChild,
@@ -249,8 +249,10 @@ export function CDCMirror({
};
let snapshot = <>>;
- if (cdc.snapshotStatus) {
- snapshot = ;
+ if (status.cdcStatus?.snapshotStatus) {
+ snapshot = (
+
+ );
}
useEffect(() => {
setMounted(true);
@@ -283,7 +285,8 @@ export function CDCMirror({
{syncStatusChild}
diff --git a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
index 24f4c2ac38..b71dd15615 100644
--- a/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
+++ b/ui/app/mirrors/edit/[mirrorId]/cdcDetails.tsx
@@ -3,8 +3,11 @@ import { SyncStatusRow } from '@/app/dto/MirrorsDTO';
import MirrorInfo from '@/components/MirrorInfo';
import PeerButton from '@/components/PeerComponent';
import TimeLabel from '@/components/TimeComponent';
-import { FlowConnectionConfigs } from '@/grpc_generated/flow';
+import { FlowConnectionConfigs, FlowStatus } from '@/grpc_generated/flow';
import { dBTypeFromJSON } from '@/grpc_generated/peers';
+import { FlowStateChangeRequest } from '@/grpc_generated/route';
+import { Button } from '@/lib/Button';
+import { Icon } from '@/lib/Icon';
import { Label } from '@/lib/Label';
import moment from 'moment';
import Link from 'next/link';
@@ -13,10 +16,11 @@ import TablePairs from './tablePairs';
type props = {
syncs: SyncStatusRow[];
- mirrorConfig: FlowConnectionConfigs | undefined;
+ mirrorConfig: FlowConnectionConfigs;
createdAt?: Date;
+ mirrorStatus: FlowStatus;
};
-function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
+function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
let lastSyncedAt = moment(
syncs.length > 1
? syncs[1]?.endTime
@@ -31,7 +35,7 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
return acc;
}, 0);
- const tablesSynced = mirrorConfig?.tableMappings;
+ const tablesSynced = mirrorConfig.tableMappings;
return (
<>
@@ -49,11 +53,14 @@ function CdcDetails({ syncs, createdAt, mirrorConfig }: props) {
borderRadius: '1rem',
border: '1px solid rgba(0,0,0,0.1)',
cursor: 'pointer',
+ display: 'flex',
+ alignItems: 'center',
}}
>
-
-
+
+
+ {statusChangeHandle(mirrorConfig, mirrorStatus)}
@@ -140,4 +147,80 @@ export function numberWithCommas(x: any): string {
return x.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ',');
}
+function statusChangeHandle(
+ mirrorConfig: FlowConnectionConfigs,
+ mirrorStatus: FlowStatus
+) {
+ // hopefully there's a better way to do this cast
+ if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_RUNNING]) {
+ return (
+
+ );
+ } else if (mirrorStatus.toString() === FlowStatus[FlowStatus.STATUS_PAUSED]) {
+ return (
+
+ );
+ } else {
+ return (
+
+ );
+ }
+}
+
+function formatStatus(mirrorStatus: FlowStatus) {
+ const mirrorStatusLower = mirrorStatus
+ .toString()
+ .split('_')
+ .at(-1)
+ ?.toLocaleLowerCase()!;
+ return (
+ mirrorStatusLower.at(0)?.toLocaleUpperCase() + mirrorStatusLower.slice(1)
+ );
+}
+
export default CdcDetails;
diff --git a/ui/app/mirrors/edit/[mirrorId]/page.tsx b/ui/app/mirrors/edit/[mirrorId]/page.tsx
index d472b849e7..c4c8f75cd8 100644
--- a/ui/app/mirrors/edit/[mirrorId]/page.tsx
+++ b/ui/app/mirrors/edit/[mirrorId]/page.tsx
@@ -33,9 +33,10 @@ export default async function EditMirror({
return
No mirror status found!
;
}
- let createdAt = await prisma.flows.findFirst({
+ let mirrorInfo = await prisma.flows.findFirst({
select: {
created_at: true,
+ workflow_id: true,
},
where: {
name: mirrorId,
@@ -86,9 +87,9 @@ export default async function EditMirror({
);