diff --git a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java index 5ef76a7e..628458bd 100644 --- a/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java +++ b/orion-commons/src/main/java/com/pinterest/orion/common/NodeInfo.java @@ -16,7 +16,9 @@ package com.pinterest.orion.common; import java.io.Serializable; +import java.util.HashSet; import java.util.Map; +import java.util.Set; public class NodeInfo implements Serializable { @@ -33,6 +35,32 @@ public class NodeInfo implements Serializable { private Map serviceInfo; private Map agentSettings; private Map environment; + private Set brokersets = new HashSet<>(); + private Map brokerStatus; + /** + * @param brokerStatus + */ + public void setBrokerStatus(Map brokerStatus) { + this.brokerStatus = brokerStatus; + } + /** + * @return the brokerStatus + */ + public Map getBrokerStatus() { + return brokerStatus; + } + /** + * @param brokersets + */ + public void setBrokersets(Set brokersets) { + this.brokersets = brokersets; + } + /** + * @return the brokersets + */ + public Set getBrokersets() { + return brokersets; + } /** * @return the timestamp */ diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java new file mode 100644 index 00000000..173e36cd --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/BrokersetStateSensor.java @@ -0,0 +1,113 @@ +package com.pinterest.orion.core.automation.sensor.kafka; + +import com.pinterest.orion.core.Attribute; +import com.pinterest.orion.core.Node; +import com.pinterest.orion.core.kafka.Brokerset; +import com.pinterest.orion.core.kafka.BrokersetState; +import com.pinterest.orion.core.kafka.KafkaCluster; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; + +public class BrokersetStateSensor extends KafkaSensor { + private static final Logger logger = Logger.getLogger(BrokersetStateSensor.class.getName()); + public static final String ATTR_BROKERSET_STATE_KEY = "brokersetState"; + + /** + * KafkaClusterInfoSensor loads the brokerset information from the config files. + * This sensor generates brokerset state information based on the brokerset information. + * @param cluster Kafka cluster to sense. + * @throws Exception If there is an error sensing the cluster. + */ + @Override + public void sense(KafkaCluster cluster) throws Exception { + // Brokerset information and broker information are loaded from other sensors. + // If they are not ready, skip this sensor. + if (!cluster.containsAttribute(KafkaClusterInfoSensor.ATTR_BROKERSET_KEY)) { + // No brokerset information available. Skip. + return; + } + Attribute brokersetMapAttr = cluster.getAttribute(KafkaClusterInfoSensor.ATTR_BROKERSET_KEY); + if (brokersetMapAttr == null) { + // No brokerset information available. Skip. + return; + } + if (cluster.getNodeMap() == null || cluster.getNodeMap().isEmpty()) { + // No node information available. Skip. + return; + } + // Brokerset info from config files + Map brokersetMap = brokersetMapAttr.getValue(); + // Brokerset state map. + Map brokersetStateMap = new HashMap<>(); + // BrokerId to brokerset alias set map + Map> brokerToBrokersetsMap = new HashMap<>(); + for (String nodeId : cluster.getNodeMap().keySet()) { + brokerToBrokersetsMap.put(nodeId, new HashSet<>()); + } + // Get broker ids for each brokerset. + for (Brokerset brokerset : brokersetMap.values()) { + String brokersetAlias = brokerset.getBrokersetAlias(); + BrokersetState brokersetState = new BrokersetState(brokersetAlias); + Set brokerIds = new HashSet<>(); + List brokersetRanges = brokerset.getEntries(); + if (brokersetRanges == null || brokersetRanges.isEmpty()) { + logger.warning(String.format("Brokerset %s in cluster %s has no brokerset range.", + brokersetAlias, cluster.getName())); + continue; + } + boolean invalidBrokerset = false; + for (Brokerset.BrokersetRange brokersetRange : brokersetRanges) { + int start = brokersetRange.getStartBrokerIdx(); + int end = brokersetRange.getEndBrokerIdx(); + for (int i = start; i <= end; i++) { + String nodeId = Integer.toString(i); + if (cluster.getNodeMap().containsKey(nodeId)) { + Node node = cluster.getNodeMap().get(nodeId); + brokerIds.add(node.getCurrentNodeInfo().getNodeId()); + brokerToBrokersetsMap.get(nodeId).add(brokersetAlias); + } else { + invalidBrokerset = true; + } + } + brokersetState.addBrokerRange(Arrays.asList(start, end)); + } + brokersetState.setBrokerIds(new ArrayList<>(brokerIds)); + brokersetStateMap.put(brokersetAlias, brokersetState); + if (invalidBrokerset) { + handleInvalidBrokerset(brokersetAlias, cluster.getName()); + } + } + // TODO: Add other brokerset status information (ex. usage data). + cluster.setAttribute(ATTR_BROKERSET_STATE_KEY, brokersetStateMap); + // Update node info with brokerset information. + for (Node node : cluster.getNodeMap().values()) { + if (node != null && node.getCurrentNodeInfo() != null) { + String nodeId = node.getCurrentNodeInfo().getNodeId(); + node.getCurrentNodeInfo().setBrokersets(brokerToBrokersetsMap.get(nodeId)); + } + } + } + + @Override + public String getName() { + return "BrokersetStateSensor"; + } + + /** + * Handle invalid brokerset. + * Invalid brokerset means that some brokers recorded in brokerset config files are not found in the cluster. + * @param brokersetAlias Brokerset alias. + * @param clusterId Cluster id. + */ + protected void handleInvalidBrokerset(String brokersetAlias, String clusterId) { + logger.warning(String.format("Brokerset %s in cluster %s has invalid brokers.", + brokersetAlias, clusterId)); + } +} diff --git a/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java new file mode 100644 index 00000000..d1bcbd10 --- /dev/null +++ b/orion-server/src/main/java/com/pinterest/orion/core/kafka/BrokersetState.java @@ -0,0 +1,111 @@ +package com.pinterest.orion.core.kafka; + + +import java.util.ArrayList; +import java.util.List; + +public class BrokersetState { + /** + * The brokersetAlias is the alias of the brokerset. + */ + private String brokersetAlias; + /** + * The brokersetRanges are the ranges of brokerset that are in the brokerset. + * The brokersetRanges are obtained from the brokerset configuration file. + */ + private List> brokersetRanges = new ArrayList<>(); + /** + * The brokerIds are the ids of running broker that are in the brokerset. + * The brokerIds are obtained from the cluster state. + */ + private List brokerIds = new ArrayList<>(); + /** + * The constructor of BrokersetState. + * @param brokersetAlias + */ + public BrokersetState(String brokersetAlias) { + this.brokersetAlias = brokersetAlias; + } + /** + * The constructor of BrokersetState. + * @param brokersetAlias + * @param brokersetRanges + * @param brokerIds + */ + public BrokersetState(String brokersetAlias, List> brokersetRanges, List brokerIds) { + this.brokersetAlias = brokersetAlias; + this.brokersetRanges = brokersetRanges; + this.brokerIds = brokerIds; + } + /** + * The method is used to get the size of brokerset. + * The size of brokerset is the number of brokers in the brokerset. + * It may not be equal to the sum of brokersetRanges if some brokers are not running. + * @return the size of brokerset. + */ + public int getSize() { + return brokerIds.size(); + } + /** + * The method is used to get the alias of brokerset. + * @return the alias of brokerset. + */ + public String getBrokersetAlias() { + return brokersetAlias; + } + /** + * The method is used to set the alias of brokerset. + * @param brokersetAlias + */ + public void setBrokersetAlias(String brokersetAlias) { + this.brokersetAlias = brokersetAlias; + } + /** + * The method is used to get the ranges of brokerset. + * @return the ranges of brokerset. + */ + public List> getBrokersetRanges() { + return brokersetRanges; + } + /** + * The method is used to add the range of brokerset. + * Each range is a list of two integers. + * @param brokerRange + * @throws IllegalArgumentException + */ + public void addBrokerRange(List brokerRange) throws IllegalArgumentException { + if (brokerRange.size() != 2) { + throw new IllegalArgumentException( + "The size of brokerRange should be 2 for brokerset " + brokersetAlias + "."); + } + brokersetRanges.add(brokerRange); + } + /** + * The method is used to set the ranges of brokerset. + * @param brokersetRanges + * @throws IllegalArgumentException + */ + public void setBrokersetRanges(List> brokersetRanges) throws IllegalArgumentException { + for (List brokerRange : brokersetRanges) { + if (brokerRange.size() != 2) { + throw new IllegalArgumentException( + "The size of brokerRange should be 2 for brokerset " + brokersetAlias + "."); + } + } + this.brokersetRanges = brokersetRanges; + } + /** + * The method is used to get the ids of brokers in the brokerset. + * @return brokerIds + */ + public List getBrokerIds() { + return brokerIds; + } + /** + * The method is used to set the ids of brokers in the brokerset. + * @param brokerIds + */ + public void setBrokerIds(List brokerIds) { + this.brokerIds = brokerIds; + } +} diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js new file mode 100644 index 00000000..00b8f1a2 --- /dev/null +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/BrokersetEntry.js @@ -0,0 +1,179 @@ +import React from "react"; +import {Tab, Tabs, Grid, Box, Link, Typography, Chip} from "@material-ui/core"; +import { Link as RouterLink, Redirect, Route, Switch } from "react-router-dom"; +import PropsTable from "../Commons/PropsTable"; + +const routes = [ + { + subpath: "assignments", + component: PropsTable, + label: "Assignments", + getData: getAssignmentData, + getColumns: getAssignmentColumns, + }, + { + subpath: "brokers", + component: PropsTable, + label: "Brokers", + getData: getBrokerData, + getColumns: getBrokerColumns, + }, + { + subpath: "stats", + component: PropsTable, + label: "Stats", + getData: getStatsData, + getColumns: getStatsColumns, + } +]; + +function getStatsData(clusterId, rawData) { + let brokersetStats = []; + let brokersetData = rawData.brokersetData; + brokersetStats.push({ key: "Broker Count", value: brokersetData.size}); + return brokersetStats; +} + +function getStatsColumns() { + return [ + { title: "Key", field: "key" }, + { title: "Value", field: "value" }, + ]; +} + +function brokerToLink(broker, clusterId) { + return ( + + {broker} + + ); +} + +function getBrokerData(clusterId, rawData) { + let brokersetData = rawData.brokersetData; + let brokers = []; + for (let brokerId of brokersetData.brokerIds) { + brokers.push({ broker: brokerToLink(brokerId, clusterId) }); + } + return brokers; +} + +function getBrokerColumns() { + return [ + { title: "Broker", field: "broker" }, + ]; +} + +function getAssignmentData(clusterId, rawData) { + let brokersetData = rawData.brokersetData; + let assignments = []; + for (let range of brokersetData.brokersetRanges) { + assignments.push({ + startBrokerIdx: range[0], + endBrokerIdx: range[1], + size: range[1] - range[0] + 1 + }); + } + return assignments; +} + +function getAssignmentColumns() { + return [ + { title: "Start Broker Id", field: "startBrokerIdx" }, + { title: "End Broker Id", field: "endBrokerIdx" }, + { title: "Size", field: "size" } + ]; +} + +function BrokersetNavTabs(props) { + return ( + + {routes.map((route, idx) => ( + + ))} + + ); +} + +function getBrokersetInfoHeader(rawData, clusterId) { + let brokersetData = rawData.brokersetData; + let brokersetAlias = brokersetData.brokersetAlias; + let brokerCount = brokersetData.size; + return ( + + + + + {brokersetAlias} + + + + + + + + + + + ); +} + +export default function BrokersetEntry(props) { + let rowData = props.rowData; + let clusterId = props.clusterId; + return ( +
+ {getBrokersetInfoHeader(rowData, clusterId)} + + + + + + + + + + {routes.map((route, idx) => { + return ( + + {} + + ); + })} + + + +
+ ) +} diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js index 73a607f4..1ea7aa51 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/Brokersets.js @@ -13,46 +13,118 @@ * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ -import React from "react"; -import BrokersetPanel from "./BrokersetPanel"; +import React, {Suspense, useState} from "react"; import MaterialTable from "material-table"; -import { Box } from "@material-ui/core"; -import CreateTopicPanel from "./CreateTopicPanel"; +import {Backdrop, Box, Fade, Modal} from "@material-ui/core"; +import {useHistory, useRouteMatch} from "react-router-dom"; +import BrokersetEntry from "./BrokersetEntry"; +import {makeStyles} from "@material-ui/core/styles"; +const modalStyles = makeStyles(theme => ({ + modal: { + display: "flex", + alignItems: "center", + justifyContent: "center" + }, + paper: { + backgroundColor: theme.palette.background.paper, + border: "2px solid #000", + maxHeight: "500px", + boxShadow: theme.shadows[5], + padding: theme.spacing(2, 4, 3) + } +})); export default function Brokersets(props) { - let rows = []; - if (props.cluster.attributes.brokerset) { - rows = Object.values(props.cluster.attributes.brokerset); - } + let brokersets = []; + if (props.cluster.attributes.brokersetState) { + brokersets = Object.values(props.cluster.attributes.brokersetState); + } + let columns = [ + { title: "Name", field: "brokersetAlias" }, + { title: "Broker Count", field: "brokerCount" } + ] + let clusterId = props.cluster.clusterId; + let brokersetToRowValuesMap = {}; + for (let brokerset of brokersets) { + let brokersetAlias = brokerset.brokersetAlias; + brokersetToRowValuesMap[brokersetAlias] = { + "brokersetAlias": brokersetAlias, + "clusterId": clusterId, + "brokerCount": brokerset.size, + "brokersetData": brokerset + } + } - let columns = [ - { title: "Alias", field: "brokersetAlias" }, - { title: "Brokers", field: "brokers" } - ]; - let data = rows.map(row => ({ - brokersetAlias: row.brokersetAlias, - brokers: row.size, - raw: row - })); - return ( - - { - return ( -
-
- -
-
- ); - }} - columns={columns} - data={data} - /> -
- ); + let data = Object.values(brokersetToRowValuesMap); + + const classes = modalStyles(); + const history = useHistory(); + let match = useRouteMatch("/clusters/:clusterId/service/brokersets/:brokersetAlias?"); + + const [selectedRow, setSelectedRow] = useState(); + const [openModal, setOpenModal] = useState(false); + const [openDetailsModal, setOpenDetailsModal] = useState(false); + const handleOpen = () => { + setOpenModal(true); + }; + const handleClose = () => { + setOpenModal(false); + }; + const handleDetailsClose = () => { + setOpenDetailsModal(false); + setSelectedRow(); + history.push("/clusters/" + clusterId + "/service/brokersets") + } + + if (selectedRow != null && !openDetailsModal) { + setOpenDetailsModal(true); + } + + return ( + + + +
+ {selectedRow && ( + Loading...
}> + + + )} + +
+
+ { + history.push("/clusters/" + clusterId + "/service/brokersets/" + rowData.brokersetAlias); + setSelectedRow(rowData); + setOpenDetailsModal(true); + }} + columns={columns} + data={data} + /> +
+ ) } diff --git a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js index 3c97e659..bbafb494 100644 --- a/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js +++ b/orion-server/src/main/resources/webapp/src/basic-components/Kafka/KafkaNode.js @@ -16,7 +16,6 @@ import React from "react"; import { Tab, Tabs, Grid, Typography, Box, Chip, Link } from "@material-ui/core"; import { Link as RouterLink, Redirect, Route, Switch } from "react-router-dom"; -import { makeStyles } from "@material-ui/core/styles"; import PropsTable from "../Commons/PropsTable"; const routes = [ @@ -40,6 +39,20 @@ const routes = [ label: "Broker Environment", getData: getBrokerEnvironmentData, getColumns: getBrokerEnvironmentColumns, + }, + { + subpath: "brokersets", + component: PropsTable, + label: "Brokersets", + getData: getBrokersetData, + getColumns: getBrokersetColumns, + }, + { + subpath: "brokerstats", + component: PropsTable, + label: "Broker Stats", + getData: getBrokerStatsData, + getColumns: getBrokerStatsColumns, } ]; @@ -133,6 +146,14 @@ function getNodeInfoHeader(node, nodeId, brokerId, hostname) { label={node.currentNodeInfo.rack} /> + + + @@ -202,6 +223,61 @@ function getTopicPartitionsColumns() { ]); } +function brokersetToLink(brokerset, clusterId) { + return ( + + {brokerset} + + ); +} + +function getBrokersetData(cluster, node) { + let clusterId = node.currentNodeInfo.clusterId; + let brokersets = node.currentNodeInfo.brokersets; + let brokersetRows = []; + for (let brokersetAlias of brokersets) { + const brokersetAliasSplit = brokersetAlias.split("_"); + let brokersetType = brokersetAliasSplit[0]; + let brokerCount = -1; + let partitionCount = -1; + if (brokersetType === "Capacity" || brokersetType === "Static") { + brokerCount = parseInt(brokersetAliasSplit[1].replace("B", "")); + partitionCount = parseInt(brokersetAliasSplit[2].replace("P", "")); + } + brokersetRows.push({ + brokersetAlias: {brokersetToLink(brokersetAlias, clusterId)}, + type: brokersetType, + brokerCount: brokerCount, + partitionCount: partitionCount + }); + } + return brokersetRows; +} + +function getBrokersetColumns() { + return ([ + { title: "Brokerset Name", field: "brokersetAlias" }, + { title: "Type", field: "type" }, + { title: "Broker Count", field: "brokerCount" }, + { title: "Partition Count", field: "partitionCount" } + ]); +} + +function getBrokerStatsData(cluster, node) { + let brokerStatsData = []; + return brokerStatsData; +} + +function getBrokerStatsColumns() { + return ([ + { title: "Key", field: "key" }, + { title: "Value", field: "value" } + ]); +} + function getBrokerEnvironmentData(cluster, node) { let environmentConfigRows = []; if (node && node.agentNodeInfo) {