diff --git a/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx b/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx index dc12281d3db..a0c05e89cc9 100644 --- a/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx +++ b/web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx @@ -14,138 +14,178 @@ * limitations under the License. */ -import { orderBy } from "lodash-es"; -import React, { useMemo } from "react"; -import { Link } from "react-router-dom"; +import { orderBy } from 'lodash-es'; +import React, { useMemo } from 'react'; +import { Link } from 'react-router-dom'; -import { useWorkflow, useWorkflows } from "../../../hooks/api"; -import { formatDateTime } from "../../../util/time"; +import { useWorkflow, useWorkflowStatus, useWorkflows } from '../../../hooks/api'; +import { formatDateTime } from '../../../util/time'; import { - formatStreamKey, - getReverseWorkflow, - getStreams, -} from "../../../util/workflows"; -import { DataTable } from "../../dataTable/DataTable"; -import { vtctldata } from "../../../proto/vtadmin"; -import { DataCell } from "../../dataTable/DataCell"; -import { StreamStatePip } from "../../pips/StreamStatePip"; -import { ThrottleThresholdSeconds } from "../Workflows"; + TableCopyState, + formatStreamKey, + getReverseWorkflow, + getStreams, + getTableCopyStates, +} from '../../../util/workflows'; +import { DataTable } from '../../dataTable/DataTable'; +import { vtctldata } from '../../../proto/vtadmin'; +import { DataCell } from '../../dataTable/DataCell'; +import { StreamStatePip } from '../../pips/StreamStatePip'; +import { ThrottleThresholdSeconds } from '../Workflows'; interface Props { - clusterID: string; - keyspace: string; - name: string; + clusterID: string; + keyspace: string; + name: string; } +const LOG_COLUMNS = ['Type', 'State', 'Updated At', 'Message', 'Count']; + +const TABLE_COPY_STATE_COLUMNS = ['Table Name', 'Total Bytes', 'Bytes Copied', 'Total Rows', 'Rows Copied']; + export const WorkflowDetails = ({ clusterID, keyspace, name }: Props) => { - const { data } = useWorkflow({ clusterID, keyspace, name }); - - const { data: workflowsData = [] } = useWorkflows(); - - const streams = useMemo(() => { - const rows = getStreams(data).map((stream) => ({ - key: formatStreamKey(stream), - ...stream, - })); - - return orderBy(rows, "streamKey"); - }, [data]); - - const renderRows = (rows: vtctldata.Workflow.Stream.ILog[]) => { - return rows.map((row) => { - let message = row.message ? `${row.message}` : "-"; - // TODO(@beingnoble03): Investigate how message can be parsed and displayed to JSON in case of "Stream Created" - if (row.type == "Stream Created") { - message = "-"; - } - return ( - - {`${row.type}`} - {`${row.state}`} - {`${formatDateTime( - parseInt(`${row.updated_at?.seconds}`, 10) - )}`} - {message} - {`${row.count}`} - - ); + const { data } = useWorkflow({ clusterID, keyspace, name }); + + const { data: workflowsData = [] } = useWorkflows(); + + const { data: workflowStatus } = useWorkflowStatus({ + clusterID, + keyspace, + name, }); - }; - - const reverseWorkflow = getReverseWorkflow(workflowsData, data); - - return ( -
- {reverseWorkflow && ( -
-

Reverse Workflow

-
- - {reverseWorkflow.workflow?.name} - -
-

- Keyspace
- - {`${reverseWorkflow.keyspace}`} - -

- {reverseWorkflow.workflow?.max_v_replication_lag && ( -

- Max VReplication Lag
- {`${reverseWorkflow.workflow?.max_v_replication_lag}`} -

- )} -
- )} -

Streams

- {streams.map((stream) => { - const href = - stream.tablet && stream.id - ? `/workflow/${clusterID}/${keyspace}/${name}/stream/${stream.tablet.cell}/${stream.tablet.uid}/${stream.id}` - : null; - - var isThrottled = - Number(stream.throttler_status?.time_throttled?.seconds) > - Date.now() / 1000 - ThrottleThresholdSeconds; - const streamState = isThrottled ? "Throttled" : stream.state; - return ( -
-
- {" "} - {`${stream.key}`} -
-

- State
- {streamState} -

- {isThrottled && ( -

- Component Throttled
- {stream.throttler_status?.component_throttled} -

+ + const streams = useMemo(() => { + const rows = getStreams(data).map((stream) => ({ + key: formatStreamKey(stream), + ...stream, + })); + + return orderBy(rows, 'streamKey'); + }, [data]); + + const renderRows = (rows: vtctldata.Workflow.Stream.ILog[]) => { + return rows.map((row) => { + let message = row.message ? `${row.message}` : '-'; + // TODO(@beingnoble03): Investigate how message can be parsed and displayed to JSON in case of "Stream Created" + if (row.type == 'Stream Created') { + message = '-'; + } + return ( + + {`${row.type}`} + {`${row.state}`} + {`${formatDateTime(parseInt(`${row.updated_at?.seconds}`, 10))}`} + {message} + {`${row.count}`} + + ); + }); + }; + + const reverseWorkflow = getReverseWorkflow(workflowsData, data); + + const tableCopyStates = getTableCopyStates(workflowStatus); + + const renderTableCopyStateRows = (tableCopyStates: TableCopyState[]) => { + return tableCopyStates.map((copyState, index) => { + const tableKey = `${copyState.tableName}/${index}`; + return ( + + {`${copyState.tableName}`} + {copyState.bytes_total ? `${copyState.bytes_total}` : `N/A`} + + {copyState.bytes_copied ? `${copyState.bytes_copied}` : `N/A`}{' '} + {copyState.bytes_percentage ? `(${copyState.bytes_percentage}%)` : ``} + + {copyState.rows_total ? `${copyState.rows_total}` : `N/A`} + + {copyState.rows_copied ? `${copyState.rows_copied}` : `N/A`}{' '} + {copyState.rows_percentage ? `(${copyState.rows_percentage}%)` : ``} + + + ); + }); + }; + + return ( +
+ {tableCopyStates && ( +
+

Table Copy State

+ +
)} - {streamState == "Running" && - data?.workflow?.max_v_replication_lag && ( -

- Max VReplication Lag
- {`${data?.workflow?.max_v_replication_lag}`} -

- )} - -
- ); - })} -
- ); +

Streams

+ {streams.map((stream) => { + const href = + stream.tablet && stream.id + ? `/workflow/${clusterID}/${keyspace}/${name}/stream/${stream.tablet.cell}/${stream.tablet.uid}/${stream.id}` + : null; + + var isThrottled = + Number(stream.throttler_status?.time_throttled?.seconds) > + Date.now() / 1000 - ThrottleThresholdSeconds; + const streamState = isThrottled ? 'Throttled' : stream.state; + return ( +
+
+ {`${stream.key}`} +
+

+ State
+ {streamState} +

+ {isThrottled && ( +

+ Component Throttled
+ {stream.throttler_status?.component_throttled} +

+ )} + {streamState == 'Running' && data?.workflow?.max_v_replication_lag && ( +

+ Max VReplication Lag
+ {`${data?.workflow?.max_v_replication_lag}`} +

+ )} + +
+ ); + })} + {reverseWorkflow && ( +
+

Reverse Workflow

+
+ + {reverseWorkflow.workflow?.name} + +
+

+ Keyspace
+ + {`${reverseWorkflow.keyspace}`} + +

+ {reverseWorkflow.workflow?.max_v_replication_lag && ( +

+ Max VReplication Lag
+ {`${reverseWorkflow.workflow?.max_v_replication_lag}`} +

+ )} +
+ )} +
+ ); }; diff --git a/web/vtadmin/src/util/workflows.ts b/web/vtadmin/src/util/workflows.ts index 64c8ebced84..5392ee721c3 100644 --- a/web/vtadmin/src/util/workflows.ts +++ b/web/vtadmin/src/util/workflows.ts @@ -100,21 +100,39 @@ export const getStreamTablets = (workflow: W | null | un * suffix and the source and target keyspace from all `workflows` list. */ export const getReverseWorkflow = ( - workflows: W[], - originalWorkflow: W | undefined | null + workflows: W[], + originalWorkflow: W | undefined | null ): W | undefined => { - if (!originalWorkflow) return; - const originalWorkflowName = originalWorkflow.workflow?.name!; - let reverseWorkflowName = originalWorkflowName.concat("_reverse"); - if (originalWorkflowName.endsWith("_reverse")) { - reverseWorkflowName = originalWorkflowName.split("_reverse")[0]; - } - return workflows.find( - (workflow) => - workflow.workflow?.name === reverseWorkflowName && - workflow.workflow?.source?.keyspace === - originalWorkflow.workflow?.target?.keyspace && - workflow.workflow?.target?.keyspace === - originalWorkflow.workflow?.source?.keyspace - ); + if (!originalWorkflow) return; + const originalWorkflowName = originalWorkflow.workflow?.name!; + let reverseWorkflowName = originalWorkflowName.concat('_reverse'); + if (originalWorkflowName.endsWith('_reverse')) { + reverseWorkflowName = originalWorkflowName.split('_reverse')[0]; + } + return workflows.find( + (workflow) => + workflow.workflow?.name === reverseWorkflowName && + workflow.workflow?.source?.keyspace === originalWorkflow.workflow?.target?.keyspace && + workflow.workflow?.target?.keyspace === originalWorkflow.workflow?.source?.keyspace + ); +}; + +export interface TableCopyState extends vtctldata.WorkflowStatusResponse.ITableCopyState { + tableName: string; +} + +export const getTableCopyStates = ( + workflowStatus: vtctldata.WorkflowStatusResponse | undefined +): TableCopyState[] | undefined => { + if (!workflowStatus) return; + const tableNames = Object.keys(workflowStatus.table_copy_state); + if (!tableNames.length) return; + const tableCopyState: TableCopyState[] = tableNames.map((tableName) => { + const tableState = workflowStatus.table_copy_state[tableName]; + return { + tableName, + ...tableState, + }; + }); + return tableCopyState; };