Skip to content

Commit

Permalink
feat: Add table copy state from workflow status in web UI
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Aug 22, 2024
1 parent 99ef5af commit 3449de3
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 141 deletions.
290 changes: 165 additions & 125 deletions web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,138 +14,178 @@
* limitations under the License.
*/

import { orderBy } from "lodash-es";
import React, { useMemo } from "react";
import { Link } from "react-router-dom";
import { orderBy } from 'lodash-es';
import React, { useMemo } from 'react';
import { Link } from 'react-router-dom';

import { useWorkflow, useWorkflows } from "../../../hooks/api";
import { formatDateTime } from "../../../util/time";
import { useWorkflow, useWorkflowStatus, useWorkflows } from '../../../hooks/api';
import { formatDateTime } from '../../../util/time';
import {
formatStreamKey,
getReverseWorkflow,
getStreams,
} from "../../../util/workflows";
import { DataTable } from "../../dataTable/DataTable";
import { vtctldata } from "../../../proto/vtadmin";
import { DataCell } from "../../dataTable/DataCell";
import { StreamStatePip } from "../../pips/StreamStatePip";
import { ThrottleThresholdSeconds } from "../Workflows";
TableCopyState,
formatStreamKey,
getReverseWorkflow,
getStreams,
getTableCopyStates,
} from '../../../util/workflows';
import { DataTable } from '../../dataTable/DataTable';
import { vtctldata } from '../../../proto/vtadmin';
import { DataCell } from '../../dataTable/DataCell';
import { StreamStatePip } from '../../pips/StreamStatePip';
import { ThrottleThresholdSeconds } from '../Workflows';

interface Props {
clusterID: string;
keyspace: string;
name: string;
clusterID: string;
keyspace: string;
name: string;
}

const LOG_COLUMNS = ['Type', 'State', 'Updated At', 'Message', 'Count'];

const TABLE_COPY_STATE_COLUMNS = ['Table Name', 'Total Bytes', 'Bytes Copied', 'Total Rows', 'Rows Copied'];

export const WorkflowDetails = ({ clusterID, keyspace, name }: Props) => {
const { data } = useWorkflow({ clusterID, keyspace, name });

const { data: workflowsData = [] } = useWorkflows();

const streams = useMemo(() => {
const rows = getStreams(data).map((stream) => ({
key: formatStreamKey(stream),
...stream,
}));

return orderBy(rows, "streamKey");
}, [data]);

const renderRows = (rows: vtctldata.Workflow.Stream.ILog[]) => {
return rows.map((row) => {
let message = row.message ? `${row.message}` : "-";
// TODO(@beingnoble03): Investigate how message can be parsed and displayed to JSON in case of "Stream Created"
if (row.type == "Stream Created") {
message = "-";
}
return (
<tr key={`${row.id}`}>
<DataCell>{`${row.type}`}</DataCell>
<DataCell>{`${row.state}`}</DataCell>
<DataCell>{`${formatDateTime(
parseInt(`${row.updated_at?.seconds}`, 10)
)}`}</DataCell>
<DataCell>{message}</DataCell>
<DataCell>{`${row.count}`}</DataCell>
</tr>
);
const { data } = useWorkflow({ clusterID, keyspace, name });

const { data: workflowsData = [] } = useWorkflows();

const { data: workflowStatus } = useWorkflowStatus({
clusterID,
keyspace,
name,
});
};

const reverseWorkflow = getReverseWorkflow(workflowsData, data);

return (
<div className="mt-12 mb-16">
{reverseWorkflow && (
<div>
<h3 className="my-8">Reverse Workflow</h3>
<div className="font-bold text-lg">
<Link
to={`/workflow/${reverseWorkflow.cluster?.id}/${reverseWorkflow.keyspace}/${reverseWorkflow.workflow?.name}`}
>
{reverseWorkflow.workflow?.name}
</Link>
</div>
<p className="text-base">
<strong>Keyspace</strong> <br />
<Link
to={`/keyspace/${reverseWorkflow.cluster?.id}/${reverseWorkflow.keyspace}`}
>
{`${reverseWorkflow.keyspace}`}
</Link>
</p>
{reverseWorkflow.workflow?.max_v_replication_lag && (
<p className="text-base">
<strong>Max VReplication Lag</strong> <br />
{`${reverseWorkflow.workflow?.max_v_replication_lag}`}
</p>
)}
</div>
)}
<h3 className="my-8">Streams</h3>
{streams.map((stream) => {
const href =
stream.tablet && stream.id
? `/workflow/${clusterID}/${keyspace}/${name}/stream/${stream.tablet.cell}/${stream.tablet.uid}/${stream.id}`
: null;

var isThrottled =
Number(stream.throttler_status?.time_throttled?.seconds) >
Date.now() / 1000 - ThrottleThresholdSeconds;
const streamState = isThrottled ? "Throttled" : stream.state;
return (
<div className="my-8">
<div className="text-lg font-bold">
<StreamStatePip state={streamState} />{" "}
<Link to={href}>{`${stream.key}`}</Link>
</div>
<p className="text-base">
<strong>State</strong> <br />
{streamState}
</p>
{isThrottled && (
<p className="text-base">
<strong>Component Throttled</strong> <br />
{stream.throttler_status?.component_throttled}
</p>

const streams = useMemo(() => {
const rows = getStreams(data).map((stream) => ({
key: formatStreamKey(stream),
...stream,
}));

return orderBy(rows, 'streamKey');
}, [data]);

const renderRows = (rows: vtctldata.Workflow.Stream.ILog[]) => {
return rows.map((row) => {
let message = row.message ? `${row.message}` : '-';
// TODO(@beingnoble03): Investigate how message can be parsed and displayed to JSON in case of "Stream Created"
if (row.type == 'Stream Created') {

Check warning on line 70 in web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx

View workflow job for this annotation

GitHub Actions / lint

Expected '===' and instead saw '=='

Check warning on line 70 in web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx

View workflow job for this annotation

GitHub Actions / build

Expected '===' and instead saw '=='
message = '-';
}
return (
<tr key={`${row.id}`}>
<DataCell>{`${row.type}`}</DataCell>
<DataCell>{`${row.state}`}</DataCell>
<DataCell>{`${formatDateTime(parseInt(`${row.updated_at?.seconds}`, 10))}`}</DataCell>
<DataCell>{message}</DataCell>
<DataCell>{`${row.count}`}</DataCell>
</tr>
);
});
};

const reverseWorkflow = getReverseWorkflow(workflowsData, data);

const tableCopyStates = getTableCopyStates(workflowStatus);

const renderTableCopyStateRows = (tableCopyStates: TableCopyState[]) => {
return tableCopyStates.map((copyState, index) => {
const tableKey = `${copyState.tableName}/${index}`;
return (
<tr key={tableKey}>
<DataCell>{`${copyState.tableName}`}</DataCell>
<DataCell>{copyState.bytes_total ? `${copyState.bytes_total}` : `N/A`}</DataCell>
<DataCell>
{copyState.bytes_copied ? `${copyState.bytes_copied}` : `N/A`}{' '}
{copyState.bytes_percentage ? `(${copyState.bytes_percentage}%)` : ``}
</DataCell>
<DataCell>{copyState.rows_total ? `${copyState.rows_total}` : `N/A`}</DataCell>
<DataCell>
{copyState.rows_copied ? `${copyState.rows_copied}` : `N/A`}{' '}
{copyState.rows_percentage ? `(${copyState.rows_percentage}%)` : ``}
</DataCell>
</tr>
);
});
};

return (
<div className="mt-12 mb-16">
{tableCopyStates && (
<div>
<h3 className="my-8">Table Copy State</h3>
<DataTable
columns={TABLE_COPY_STATE_COLUMNS}
data={tableCopyStates}
renderRows={renderTableCopyStateRows}
pageSize={1000}
/>
</div>
)}
{streamState == "Running" &&
data?.workflow?.max_v_replication_lag && (
<p className="text-base">
<strong>Max VReplication Lag</strong> <br />
{`${data?.workflow?.max_v_replication_lag}`}
</p>
)}
<DataTable
columns={["Type", "State", "Updated At", "Message", "Count"]}
data={stream.logs?.reverse()!}
renderRows={renderRows}
pageSize={1000}
title="Recent Logs"
/>
</div>
);
})}
</div>
);
<h3 className="my-8">Streams</h3>
{streams.map((stream) => {
const href =
stream.tablet && stream.id
? `/workflow/${clusterID}/${keyspace}/${name}/stream/${stream.tablet.cell}/${stream.tablet.uid}/${stream.id}`
: null;

var isThrottled =
Number(stream.throttler_status?.time_throttled?.seconds) >
Date.now() / 1000 - ThrottleThresholdSeconds;
const streamState = isThrottled ? 'Throttled' : stream.state;
return (
<div className="my-8">
<div className="text-lg font-bold">
<StreamStatePip state={streamState} /> <Link to={href}>{`${stream.key}`}</Link>
</div>
<p className="text-base">
<strong>State</strong> <br />
{streamState}
</p>
{isThrottled && (
<p className="text-base">
<strong>Component Throttled</strong> <br />
{stream.throttler_status?.component_throttled}
</p>
)}
{streamState == 'Running' && data?.workflow?.max_v_replication_lag && (

Check warning on line 149 in web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx

View workflow job for this annotation

GitHub Actions / lint

Expected '===' and instead saw '=='

Check warning on line 149 in web/vtadmin/src/components/routes/workflow/WorkflowDetails.tsx

View workflow job for this annotation

GitHub Actions / build

Expected '===' and instead saw '=='
<p className="text-base">
<strong>Max VReplication Lag</strong> <br />
{`${data?.workflow?.max_v_replication_lag}`}
</p>
)}
<DataTable
columns={LOG_COLUMNS}
data={stream.logs?.reverse()!}
renderRows={renderRows}
pageSize={1000}
title="Recent Logs"
/>
</div>
);
})}
{reverseWorkflow && (
<div>
<h3 className="my-8">Reverse Workflow</h3>
<div className="font-bold text-lg">
<Link
to={`/workflow/${reverseWorkflow.cluster?.id}/${reverseWorkflow.keyspace}/${reverseWorkflow.workflow?.name}`}
>
{reverseWorkflow.workflow?.name}
</Link>
</div>
<p className="text-base">
<strong>Keyspace</strong> <br />
<Link to={`/keyspace/${reverseWorkflow.cluster?.id}/${reverseWorkflow.keyspace}`}>
{`${reverseWorkflow.keyspace}`}
</Link>
</p>
{reverseWorkflow.workflow?.max_v_replication_lag && (
<p className="text-base">
<strong>Max VReplication Lag</strong> <br />
{`${reverseWorkflow.workflow?.max_v_replication_lag}`}
</p>
)}
</div>
)}
</div>
);
};
50 changes: 34 additions & 16 deletions web/vtadmin/src/util/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,39 @@ export const getStreamTablets = <W extends pb.IWorkflow>(workflow: W | null | un
* suffix and the source and target keyspace from all `workflows` list.
*/
export const getReverseWorkflow = <W extends pb.Workflow>(
workflows: W[],
originalWorkflow: W | undefined | null
workflows: W[],
originalWorkflow: W | undefined | null
): W | undefined => {
if (!originalWorkflow) return;
const originalWorkflowName = originalWorkflow.workflow?.name!;
let reverseWorkflowName = originalWorkflowName.concat("_reverse");
if (originalWorkflowName.endsWith("_reverse")) {
reverseWorkflowName = originalWorkflowName.split("_reverse")[0];
}
return workflows.find(
(workflow) =>
workflow.workflow?.name === reverseWorkflowName &&
workflow.workflow?.source?.keyspace ===
originalWorkflow.workflow?.target?.keyspace &&
workflow.workflow?.target?.keyspace ===
originalWorkflow.workflow?.source?.keyspace
);
if (!originalWorkflow) return;
const originalWorkflowName = originalWorkflow.workflow?.name!;
let reverseWorkflowName = originalWorkflowName.concat('_reverse');
if (originalWorkflowName.endsWith('_reverse')) {
reverseWorkflowName = originalWorkflowName.split('_reverse')[0];
}
return workflows.find(
(workflow) =>
workflow.workflow?.name === reverseWorkflowName &&
workflow.workflow?.source?.keyspace === originalWorkflow.workflow?.target?.keyspace &&
workflow.workflow?.target?.keyspace === originalWorkflow.workflow?.source?.keyspace
);
};

export interface TableCopyState extends vtctldata.WorkflowStatusResponse.ITableCopyState {
tableName: string;
}

export const getTableCopyStates = (
workflowStatus: vtctldata.WorkflowStatusResponse | undefined
): TableCopyState[] | undefined => {
if (!workflowStatus) return;
const tableNames = Object.keys(workflowStatus.table_copy_state);
if (!tableNames.length) return;
const tableCopyState: TableCopyState[] = tableNames.map((tableName) => {
const tableState = workflowStatus.table_copy_state[tableName];
return {
tableName,
...tableState,
};
});
return tableCopyState;
};

0 comments on commit 3449de3

Please sign in to comment.