From 2701825bb20c6649c7c2bd918f1d7a6679855db7 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Tue, 9 May 2023 19:25:52 +0800 Subject: [PATCH 01/11] serialize Broker info in ClusterInfo. --- .../java/org/astraea/common/ByteUtils.java | 144 ++++++++++++++++-- .../common/generated/ClusterInfo.proto | 25 ++- .../org/astraea/common/ByteUtilsTest.java | 23 ++- 3 files changed, 180 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index b7db5b56ee..9fb73bb563 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -21,11 +21,13 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import org.astraea.common.admin.Broker; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.Config; import org.astraea.common.admin.NodeInfo; @@ -185,14 +187,60 @@ public static byte[] toBytes(BeanObject value) { public static byte[] toBytes(ClusterInfo value) { return ClusterInfoOuterClass.ClusterInfo.newBuilder() .setClusterId(value.clusterId()) - .addAllNodeInfo( - value.nodes().stream() + .addAllBroker( + value.brokers().stream() .map( - nodeInfo -> - ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder() - .setId(nodeInfo.id()) - .setHost(nodeInfo.host()) - .setPort(nodeInfo.port()) + broker -> + ClusterInfoOuterClass.ClusterInfo.Broker.newBuilder() + .setId(broker.id()) + .setHost(broker.host()) + .setPort(broker.port()) + .setIsController(broker.isController()) + .putAllConfig(broker.config().raw()) + .addAllDatafolder( + broker.dataFolders().stream() + .map( + dataFolder -> + ClusterInfoOuterClass.ClusterInfo.Broker.Datafolder + .newBuilder() + .setPath(dataFolder.path()) + .putAllPartitionSizes( + dataFolder.partitionSizes().entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().toString(), + Map.Entry::getValue))) + .putAllOrphanPartitionSizes( + dataFolder + .orphanPartitionSizes() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().toString(), + Map.Entry::getValue))) + .build()) + .collect(Collectors.toList())) + .addAllTopicPartitions( + broker.topicPartitions().stream() + .map( + tp -> + ClusterInfoOuterClass.ClusterInfo.Broker.TopicPartition + .newBuilder() + .setPartition(tp.partition()) + .setTopic(tp.topic()) + .build()) + .collect(Collectors.toList())) + .addAllTopicPartitionLeaders( + broker.topicPartitionLeaders().stream() + .map( + tp -> + ClusterInfoOuterClass.ClusterInfo.Broker.TopicPartition + .newBuilder() + .setPartition(tp.partition()) + .setTopic(tp.topic()) + .build()) + .collect(Collectors.toList())) .build()) .collect(Collectors.toList())) .addAllTopic( @@ -326,9 +374,87 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes); return ClusterInfo.of( outerClusterInfo.getClusterId(), - outerClusterInfo.getNodeInfoList().stream() + outerClusterInfo.getBrokerList().stream() .map( - nodeInfo -> NodeInfo.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort())) + broker -> + new Broker() { + @Override + public boolean isController() { + return broker.getIsController(); + } + + @Override + public Config config() { + return Config.of(broker.getConfigMap()); + } + + @Override + public List dataFolders() { + return broker.getDatafolderList().stream() + .map( + datafolder -> + new DataFolder() { + @Override + public String path() { + return datafolder.getPath(); + } + + @Override + public Map partitionSizes() { + return datafolder + .getPartitionSizesMap() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> TopicPartition.of(entry.getKey()), + Map.Entry::getValue)); + } + + @Override + public Map orphanPartitionSizes() { + return datafolder + .getOrphanPartitionSizesMap() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> TopicPartition.of(entry.getKey()), + Map.Entry::getValue)); + } + }) + .collect(Collectors.toList()); + } + + @Override + public Set topicPartitions() { + return broker.getTopicPartitionsList().stream() + .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()); + } + + @Override + public Set topicPartitionLeaders() { + return broker.getTopicPartitionLeadersList().stream() + .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()); + } + + @Override + public String host() { + return broker.getHost(); + } + + @Override + public int port() { + return broker.getPort(); + } + + @Override + public int id() { + return broker.getId(); + } + }) .collect(Collectors.toList()), outerClusterInfo.getTopicList().stream() .map( diff --git a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto index a9a6db12f4..7c60a989c1 100644 --- a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto +++ b/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto @@ -5,13 +5,36 @@ package org.astraea.common.generated; message ClusterInfo{ string clusterId = 1; - repeated NodeInfo nodeInfo = 2; message NodeInfo { int32 id = 1; string host = 2; int32 port = 3; } + repeated Broker broker = 2; + message Broker { + int32 id = 1; + string host = 2; + int32 port = 3; + bool isController = 4; + map config = 5; + + repeated Datafolder datafolder = 6; + message Datafolder { + string path = 1; + map partitionSizes = 2; + map orphanPartitionSizes = 3; + } + + repeated TopicPartition topicPartitions = 7; + message TopicPartition { + int32 partition = 1; + string topic = 2; + } + + repeated TopicPartition topicPartitionLeaders = 8; + } + repeated Topic topic = 3; message Topic { string name = 1; diff --git a/common/src/test/java/org/astraea/common/ByteUtilsTest.java b/common/src/test/java/org/astraea/common/ByteUtilsTest.java index 7cdef95b3c..14723a091c 100644 --- a/common/src/test/java/org/astraea/common/ByteUtilsTest.java +++ b/common/src/test/java/org/astraea/common/ByteUtilsTest.java @@ -94,9 +94,28 @@ void testReadAndToBytesClusterInfo() { var bytes = ByteUtils.toBytes(clusterInfo); Assertions.assertDoesNotThrow(() -> ByteUtils.readClusterInfo(bytes)); var deserializedClusterInfo = ByteUtils.readClusterInfo(bytes); - Assertions.assertEquals(clusterInfo.clusterId(), deserializedClusterInfo.clusterId()); - Assertions.assertTrue(clusterInfo.nodes().containsAll(deserializedClusterInfo.nodes())); + for (var i = 0; i < clusterInfo.brokers().size(); ++i) { + var broker = clusterInfo.brokers().get(i); + var deserializedBroker = deserializedClusterInfo.brokers().get(i); + Assertions.assertEquals(broker.id(), deserializedBroker.id()); + Assertions.assertEquals(broker.host(), deserializedBroker.host()); + Assertions.assertEquals(broker.port(), deserializedBroker.port()); + Assertions.assertEquals(broker.isController(), deserializedBroker.isController()); + Assertions.assertEquals(broker.config().raw(), deserializedBroker.config().raw()); + Assertions.assertEquals(broker.topicPartitions(), deserializedBroker.topicPartitions()); + Assertions.assertEquals( + broker.topicPartitionLeaders(), deserializedBroker.topicPartitionLeaders()); + for (var j = 0; j < broker.dataFolders().size(); ++j) { + var dataFolder = broker.dataFolders().get(j); + var deserializedDataFolder = deserializedBroker.dataFolders().get(j); + Assertions.assertEquals(dataFolder.path(), deserializedDataFolder.path()); + Assertions.assertEquals( + dataFolder.partitionSizes(), deserializedDataFolder.partitionSizes()); + Assertions.assertEquals( + dataFolder.orphanPartitionSizes(), deserializedDataFolder.orphanPartitionSizes()); + } + } Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); } From 5facee3eb3375c2ee61e29bb9f9c2e6a6cf7070d Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Wed, 10 May 2023 16:28:45 +0800 Subject: [PATCH 02/11] add Broker.BrokerImpl, Broker.DataFolder.DataFolderImpl and Config.equals() for comparison. --- .../java/org/astraea/common/ByteUtils.java | 128 +++++++----------- .../java/org/astraea/common/admin/Broker.java | 85 ++++-------- .../java/org/astraea/common/admin/Config.java | 8 ++ .../common/generated/ClusterInfo.proto | 36 ++--- .../org/astraea/common/ByteUtilsTest.java | 22 +-- 5 files changed, 101 insertions(+), 178 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 9fb73bb563..ae4adaced9 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -265,7 +264,7 @@ public static byte[] toBytes(ClusterInfo value) { .setTopic(replica.topic()) .setPartition(replica.partition()) .setNodeInfo( - ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder() + ClusterInfoOuterClass.ClusterInfo.Replica.NodeInfo.newBuilder() .setId(replica.nodeInfo().id()) .setHost(replica.nodeInfo().host()) .setPort(replica.nodeInfo().port()) @@ -376,85 +375,52 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { outerClusterInfo.getClusterId(), outerClusterInfo.getBrokerList().stream() .map( - broker -> - new Broker() { - @Override - public boolean isController() { - return broker.getIsController(); - } - - @Override - public Config config() { - return Config.of(broker.getConfigMap()); - } - - @Override - public List dataFolders() { - return broker.getDatafolderList().stream() - .map( - datafolder -> - new DataFolder() { - @Override - public String path() { - return datafolder.getPath(); - } - - @Override - public Map partitionSizes() { - return datafolder - .getPartitionSizesMap() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> TopicPartition.of(entry.getKey()), - Map.Entry::getValue)); - } - - @Override - public Map orphanPartitionSizes() { - return datafolder - .getOrphanPartitionSizesMap() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> TopicPartition.of(entry.getKey()), - Map.Entry::getValue)); - } - }) - .collect(Collectors.toList()); - } - - @Override - public Set topicPartitions() { - return broker.getTopicPartitionsList().stream() - .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) - .collect(Collectors.toSet()); - } - - @Override - public Set topicPartitionLeaders() { - return broker.getTopicPartitionLeadersList().stream() - .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) - .collect(Collectors.toSet()); - } - - @Override - public String host() { - return broker.getHost(); - } - - @Override - public int port() { - return broker.getPort(); - } - - @Override - public int id() { - return broker.getId(); - } - }) + broker -> { + var host = broker.getHost(); + var port = broker.getPort(); + var id = broker.getId(); + var isController = broker.getIsController(); + var config = Config.of(broker.getConfigMap()); + var dataFolders = + broker.getDatafolderList().stream() + .map( + datafolder -> { + var path = datafolder.getPath(); + var partitionSizes = + datafolder.getPartitionSizesMap().entrySet().stream() + .collect( + Collectors.toMap( + entry -> TopicPartition.of(entry.getKey()), + Map.Entry::getValue)); + var orphanPartitionSizes = + datafolder.getOrphanPartitionSizesMap().entrySet().stream() + .collect( + Collectors.toMap( + entry -> TopicPartition.of(entry.getKey()), + Map.Entry::getValue)); + return (Broker.DataFolder) + new Broker.DataFolder.DataFolderImpl( + path, partitionSizes, orphanPartitionSizes); + }) + .toList(); + var topicPartitions = + broker.getTopicPartitionsList().stream() + .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()); + var topicPartitionLeaders = + broker.getTopicPartitionLeadersList().stream() + .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()); + return new Broker.BrokerImpl( + host, + port, + id, + isController, + config, + dataFolders, + topicPartitions, + topicPartitionLeaders); + }) .collect(Collectors.toList()), outerClusterInfo.getTopicList().stream() .map( diff --git a/common/src/main/java/org/astraea/common/admin/Broker.java b/common/src/main/java/org/astraea/common/admin/Broker.java index c4ea2d763a..e42802cab6 100644 --- a/common/src/main/java/org/astraea/common/admin/Broker.java +++ b/common/src/main/java/org/astraea/common/admin/Broker.java @@ -61,23 +61,7 @@ static Broker of( .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return (DataFolder) - new DataFolder() { - - @Override - public String path() { - return path; - } - - @Override - public Map partitionSizes() { - return partitionSizes; - } - - @Override - public Map orphanPartitionSizes() { - return orphanPartitionSizes; - } - }; + new DataFolder.DataFolderImpl(path, partitionSizes, orphanPartitionSizes); }) .collect(Collectors.toList()); var topicPartitionLeaders = @@ -88,47 +72,15 @@ public Map orphanPartitionSizes() { .filter(p -> p.leader() != null && p.leader().id() == nodeInfo.id()) .map(p -> TopicPartition.of(topic.name(), p.partition()))) .collect(Collectors.toUnmodifiableSet()); - return new Broker() { - @Override - public String host() { - return nodeInfo.host(); - } - - @Override - public int port() { - return nodeInfo.port(); - } - - @Override - public int id() { - return nodeInfo.id(); - } - - @Override - public boolean isController() { - return isController; - } - - @Override - public Config config() { - return config; - } - - @Override - public List dataFolders() { - return folders; - } - - @Override - public Set topicPartitions() { - return partitionsFromTopicDesc; - } - - @Override - public Set topicPartitionLeaders() { - return topicPartitionLeaders; - } - }; + return new BrokerImpl( + nodeInfo.host(), + nodeInfo.port(), + nodeInfo.id(), + isController, + config, + folders, + partitionsFromTopicDesc, + topicPartitionLeaders); } boolean isController(); @@ -166,5 +118,22 @@ interface DataFolder { * @return topic partition located by this node but not traced by cluster */ Map orphanPartitionSizes(); + + record DataFolderImpl( + String path, + Map partitionSizes, + Map orphanPartitionSizes) + implements DataFolder {} } + + record BrokerImpl( + String host, + int port, + int id, + boolean isController, + Config config, + List dataFolders, + Set topicPartitions, + Set topicPartitionLeaders) + implements Broker {} } diff --git a/common/src/main/java/org/astraea/common/admin/Config.java b/common/src/main/java/org/astraea/common/admin/Config.java index cee4f20da5..88b049ff3d 100644 --- a/common/src/main/java/org/astraea/common/admin/Config.java +++ b/common/src/main/java/org/astraea/common/admin/Config.java @@ -35,6 +35,14 @@ public Map raw() { public Optional value(String key) { return Optional.ofNullable(configs.get(key)); } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + var objConfig = (Config) obj; + return raw().equals(objConfig.raw()); + } }; } diff --git a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto index 7c60a989c1..4d7fbcae34 100644 --- a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto +++ b/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto @@ -3,39 +3,29 @@ syntax = "proto3"; package org.astraea.common.generated; message ClusterInfo{ - string clusterId = 1; - - message NodeInfo { - int32 id = 1; - string host = 2; - int32 port = 3; - } - repeated Broker broker = 2; message Broker { - int32 id = 1; - string host = 2; - int32 port = 3; - bool isController = 4; - map config = 5; - - repeated Datafolder datafolder = 6; message Datafolder { string path = 1; map partitionSizes = 2; map orphanPartitionSizes = 3; } - repeated TopicPartition topicPartitions = 7; message TopicPartition { int32 partition = 1; string topic = 2; } + int32 id = 1; + string host = 2; + int32 port = 3; + bool isController = 4; + map config = 5; + repeated Datafolder datafolder = 6; + repeated TopicPartition topicPartitions = 7; repeated TopicPartition topicPartitionLeaders = 8; } - repeated Topic topic = 3; message Topic { string name = 1; map config = 2; @@ -43,8 +33,13 @@ message ClusterInfo{ repeated int32 partition = 4; } - repeated Replica replica = 4; message Replica{ + message NodeInfo { + int32 id = 1; + string host = 2; + int32 port = 3; + } + string topic = 1; int32 partition = 2; NodeInfo nodeInfo = 3; @@ -57,4 +52,9 @@ message ClusterInfo{ bool isPreferredLeader = 10; string path = 11; } + + string clusterId = 1; + repeated Broker broker = 2; + repeated Topic topic = 3; + repeated Replica replica = 4; } \ No newline at end of file diff --git a/common/src/test/java/org/astraea/common/ByteUtilsTest.java b/common/src/test/java/org/astraea/common/ByteUtilsTest.java index 14723a091c..42ec89a7d6 100644 --- a/common/src/test/java/org/astraea/common/ByteUtilsTest.java +++ b/common/src/test/java/org/astraea/common/ByteUtilsTest.java @@ -95,27 +95,7 @@ void testReadAndToBytesClusterInfo() { Assertions.assertDoesNotThrow(() -> ByteUtils.readClusterInfo(bytes)); var deserializedClusterInfo = ByteUtils.readClusterInfo(bytes); Assertions.assertEquals(clusterInfo.clusterId(), deserializedClusterInfo.clusterId()); - for (var i = 0; i < clusterInfo.brokers().size(); ++i) { - var broker = clusterInfo.brokers().get(i); - var deserializedBroker = deserializedClusterInfo.brokers().get(i); - Assertions.assertEquals(broker.id(), deserializedBroker.id()); - Assertions.assertEquals(broker.host(), deserializedBroker.host()); - Assertions.assertEquals(broker.port(), deserializedBroker.port()); - Assertions.assertEquals(broker.isController(), deserializedBroker.isController()); - Assertions.assertEquals(broker.config().raw(), deserializedBroker.config().raw()); - Assertions.assertEquals(broker.topicPartitions(), deserializedBroker.topicPartitions()); - Assertions.assertEquals( - broker.topicPartitionLeaders(), deserializedBroker.topicPartitionLeaders()); - for (var j = 0; j < broker.dataFolders().size(); ++j) { - var dataFolder = broker.dataFolders().get(j); - var deserializedDataFolder = deserializedBroker.dataFolders().get(j); - Assertions.assertEquals(dataFolder.path(), deserializedDataFolder.path()); - Assertions.assertEquals( - dataFolder.partitionSizes(), deserializedDataFolder.partitionSizes()); - Assertions.assertEquals( - dataFolder.orphanPartitionSizes(), deserializedDataFolder.orphanPartitionSizes()); - } - } + Assertions.assertEquals(clusterInfo.nodes(), deserializedClusterInfo.nodes()); Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); } From 17b88cd0071b44e0cf6dc493d898b1896823798f Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Wed, 10 May 2023 17:49:51 +0800 Subject: [PATCH 03/11] apply toList --- .../java/org/astraea/common/ByteUtils.java | 35 ++++++++++--------- .../java/org/astraea/common/admin/Broker.java | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index ae4adaced9..5f6d4df27e 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -219,7 +219,7 @@ public static byte[] toBytes(ClusterInfo value) { entry -> entry.getKey().toString(), Map.Entry::getValue))) .build()) - .collect(Collectors.toList())) + .toList()) .addAllTopicPartitions( broker.topicPartitions().stream() .map( @@ -229,7 +229,7 @@ public static byte[] toBytes(ClusterInfo value) { .setPartition(tp.partition()) .setTopic(tp.topic()) .build()) - .collect(Collectors.toList())) + .toList()) .addAllTopicPartitionLeaders( broker.topicPartitionLeaders().stream() .map( @@ -239,9 +239,9 @@ public static byte[] toBytes(ClusterInfo value) { .setPartition(tp.partition()) .setTopic(tp.topic()) .build()) - .collect(Collectors.toList())) + .toList()) .build()) - .collect(Collectors.toList())) + .toList()) .addAllTopic( value.topics().values().stream() .map( @@ -255,7 +255,7 @@ public static byte[] toBytes(ClusterInfo value) { .map(TopicPartition::partition) .collect(Collectors.toList())) .build()) - .collect(Collectors.toList())) + .toList()) .addAllReplica( value.replicas().stream() .map( @@ -278,7 +278,7 @@ public static byte[] toBytes(ClusterInfo value) { .setIsPreferredLeader(replica.isPreferredLeader()) .setPath(replica.path()) .build()) - .collect(Collectors.toList())) + .toList()) .build() .toByteArray(); } @@ -411,17 +411,18 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { broker.getTopicPartitionLeadersList().stream() .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) .collect(Collectors.toSet()); - return new Broker.BrokerImpl( - host, - port, - id, - isController, - config, - dataFolders, - topicPartitions, - topicPartitionLeaders); + return (NodeInfo) + new Broker.BrokerImpl( + host, + port, + id, + isController, + config, + dataFolders, + topicPartitions, + topicPartitionLeaders); }) - .collect(Collectors.toList()), + .toList(), outerClusterInfo.getTopicList().stream() .map( protoTopic -> @@ -469,7 +470,7 @@ public Set topicPartitions() { .isPreferredLeader(replica.getIsPreferredLeader()) .path(replica.getPath()) .build()) - .collect(Collectors.toList())); + .toList()); } catch (InvalidProtocolBufferException ex) { throw new SerializationException(ex); } diff --git a/common/src/main/java/org/astraea/common/admin/Broker.java b/common/src/main/java/org/astraea/common/admin/Broker.java index e42802cab6..0fed14b4f5 100644 --- a/common/src/main/java/org/astraea/common/admin/Broker.java +++ b/common/src/main/java/org/astraea/common/admin/Broker.java @@ -63,7 +63,7 @@ static Broker of( return (DataFolder) new DataFolder.DataFolderImpl(path, partitionSizes, orphanPartitionSizes); }) - .collect(Collectors.toList()); + .toList(); var topicPartitionLeaders = topics.stream() .flatMap( From 44833504e281df1f035cde5f7e86268e18a8416e Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Thu, 11 May 2023 15:38:46 +0800 Subject: [PATCH 04/11] Separate ClusterInfo proto file. --- .../java/org/astraea/common/ByteUtils.java | 24 ++++---- .../common/generated/ClusterInfo.proto | 60 ------------------- .../common/generated/admin/Broker.proto | 22 +++++++ .../common/generated/admin/ClusterInfo.proto | 14 +++++ .../common/generated/admin/NodeInfo.proto | 9 +++ .../common/generated/admin/Replica.proto | 19 ++++++ .../common/generated/admin/Topic.proto | 10 ++++ .../generated/admin/TopicPartition.proto | 8 +++ 8 files changed, 95 insertions(+), 71 deletions(-) delete mode 100644 common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto create mode 100644 common/src/main/proto/org/astraea/common/generated/admin/Broker.proto create mode 100644 common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto create mode 100644 common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto create mode 100644 common/src/main/proto/org/astraea/common/generated/admin/Replica.proto create mode 100644 common/src/main/proto/org/astraea/common/generated/admin/Topic.proto create mode 100644 common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 5f6d4df27e..ae89ec9a85 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -34,8 +34,13 @@ import org.astraea.common.admin.Topic; import org.astraea.common.admin.TopicPartition; import org.astraea.common.generated.BeanObjectOuterClass; -import org.astraea.common.generated.ClusterInfoOuterClass; import org.astraea.common.generated.PrimitiveOuterClass; +import org.astraea.common.generated.admin.BrokerOuterClass; +import org.astraea.common.generated.admin.ClusterInfoOuterClass; +import org.astraea.common.generated.admin.NodeInfoOuterClass; +import org.astraea.common.generated.admin.ReplicaOuterClass; +import org.astraea.common.generated.admin.TopicOuterClass; +import org.astraea.common.generated.admin.TopicPartitionOuterClass; import org.astraea.common.metrics.BeanObject; public final class ByteUtils { @@ -190,7 +195,7 @@ public static byte[] toBytes(ClusterInfo value) { value.brokers().stream() .map( broker -> - ClusterInfoOuterClass.ClusterInfo.Broker.newBuilder() + BrokerOuterClass.Broker.newBuilder() .setId(broker.id()) .setHost(broker.host()) .setPort(broker.port()) @@ -200,8 +205,7 @@ public static byte[] toBytes(ClusterInfo value) { broker.dataFolders().stream() .map( dataFolder -> - ClusterInfoOuterClass.ClusterInfo.Broker.Datafolder - .newBuilder() + BrokerOuterClass.Broker.Datafolder.newBuilder() .setPath(dataFolder.path()) .putAllPartitionSizes( dataFolder.partitionSizes().entrySet().stream() @@ -224,8 +228,7 @@ public static byte[] toBytes(ClusterInfo value) { broker.topicPartitions().stream() .map( tp -> - ClusterInfoOuterClass.ClusterInfo.Broker.TopicPartition - .newBuilder() + TopicPartitionOuterClass.TopicPartition.newBuilder() .setPartition(tp.partition()) .setTopic(tp.topic()) .build()) @@ -234,8 +237,7 @@ public static byte[] toBytes(ClusterInfo value) { broker.topicPartitionLeaders().stream() .map( tp -> - ClusterInfoOuterClass.ClusterInfo.Broker.TopicPartition - .newBuilder() + TopicPartitionOuterClass.TopicPartition.newBuilder() .setPartition(tp.partition()) .setTopic(tp.topic()) .build()) @@ -246,7 +248,7 @@ public static byte[] toBytes(ClusterInfo value) { value.topics().values().stream() .map( topicClass -> - ClusterInfoOuterClass.ClusterInfo.Topic.newBuilder() + TopicOuterClass.Topic.newBuilder() .setName(topicClass.name()) .putAllConfig(topicClass.config().raw()) .setInternal(topicClass.internal()) @@ -260,11 +262,11 @@ public static byte[] toBytes(ClusterInfo value) { value.replicas().stream() .map( replica -> - ClusterInfoOuterClass.ClusterInfo.Replica.newBuilder() + ReplicaOuterClass.Replica.newBuilder() .setTopic(replica.topic()) .setPartition(replica.partition()) .setNodeInfo( - ClusterInfoOuterClass.ClusterInfo.Replica.NodeInfo.newBuilder() + NodeInfoOuterClass.NodeInfo.newBuilder() .setId(replica.nodeInfo().id()) .setHost(replica.nodeInfo().host()) .setPort(replica.nodeInfo().port()) diff --git a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto deleted file mode 100644 index 4d7fbcae34..0000000000 --- a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto +++ /dev/null @@ -1,60 +0,0 @@ -syntax = "proto3"; - -package org.astraea.common.generated; - -message ClusterInfo{ - - message Broker { - message Datafolder { - string path = 1; - map partitionSizes = 2; - map orphanPartitionSizes = 3; - } - - message TopicPartition { - int32 partition = 1; - string topic = 2; - } - - int32 id = 1; - string host = 2; - int32 port = 3; - bool isController = 4; - map config = 5; - repeated Datafolder datafolder = 6; - repeated TopicPartition topicPartitions = 7; - repeated TopicPartition topicPartitionLeaders = 8; - } - - message Topic { - string name = 1; - map config = 2; - bool internal = 3; - repeated int32 partition = 4; - } - - message Replica{ - message NodeInfo { - int32 id = 1; - string host = 2; - int32 port = 3; - } - - string topic = 1; - int32 partition = 2; - NodeInfo nodeInfo = 3; - int64 lag = 4; - int64 size = 5; - bool isLeader = 6; - bool isSync = 7; - bool isFuture = 8; - bool isOffline = 9; - bool isPreferredLeader = 10; - string path = 11; - } - - string clusterId = 1; - repeated Broker broker = 2; - repeated Topic topic = 3; - repeated Replica replica = 4; -} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto b/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto new file mode 100644 index 0000000000..1f67d7ee63 --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +import "org/astraea/common/generated/admin/TopicPartition.proto"; + +message Broker { + int32 id = 1; + string host = 2; + int32 port = 3; + bool isController = 4; + map config = 5; + repeated Datafolder datafolder = 6; + repeated TopicPartition topicPartitions = 7; + repeated TopicPartition topicPartitionLeaders = 8; + + message Datafolder { + string path = 1; + map partitionSizes = 2; + map orphanPartitionSizes = 3; + } +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto new file mode 100644 index 0000000000..ec452e0dce --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +import "org/astraea/common/generated/admin/Broker.proto"; +import "org/astraea/common/generated/admin/Topic.proto"; +import "org/astraea/common/generated/admin/Replica.proto"; + +message ClusterInfo{ + string clusterId = 1; + repeated Broker broker = 2; + repeated Topic topic = 3; + repeated Replica replica = 4; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto b/common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto new file mode 100644 index 0000000000..057219d597 --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +message NodeInfo { + int32 id = 1; + string host = 2; + int32 port = 3; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto new file mode 100644 index 0000000000..f257bed64b --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +import "org/astraea/common/generated/admin/NodeInfo.proto"; + +message Replica{ + string topic = 1; + int32 partition = 2; + NodeInfo nodeInfo = 3; + int64 lag = 4; + int64 size = 5; + bool isLeader = 6; + bool isSync = 7; + bool isFuture = 8; + bool isOffline = 9; + bool isPreferredLeader = 10; + string path = 11; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto b/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto new file mode 100644 index 0000000000..5721d5d8f3 --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +message Topic { + string name = 1; + map config = 2; + bool internal = 3; + repeated int32 partition = 4; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto b/common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto new file mode 100644 index 0000000000..44f446c88d --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +message TopicPartition { + int32 partition = 1; + string topic = 2; +} \ No newline at end of file From 4d7499d31e7d6b0c21ee6097d192d4d936adfe45 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Fri, 12 May 2023 19:58:03 +0800 Subject: [PATCH 05/11] Make config into record. --- .../java/org/astraea/common/ByteUtils.java | 4 +-- .../java/org/astraea/common/admin/Broker.java | 2 +- .../java/org/astraea/common/admin/Config.java | 31 +++---------------- .../java/org/astraea/common/admin/Topic.java | 2 +- 4 files changed, 9 insertions(+), 30 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index ae89ec9a85..4f4c0b1fbd 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -382,7 +382,7 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { var port = broker.getPort(); var id = broker.getId(); var isController = broker.getIsController(); - var config = Config.of(broker.getConfigMap()); + var config = new Config(broker.getConfigMap()); var dataFolders = broker.getDatafolderList().stream() .map( @@ -436,7 +436,7 @@ public String name() { @Override public Config config() { - return Config.of(protoTopic.getConfigMap()); + return new Config(protoTopic.getConfigMap()); } @Override diff --git a/common/src/main/java/org/astraea/common/admin/Broker.java b/common/src/main/java/org/astraea/common/admin/Broker.java index 0fed14b4f5..3a2acdd112 100644 --- a/common/src/main/java/org/astraea/common/admin/Broker.java +++ b/common/src/main/java/org/astraea/common/admin/Broker.java @@ -31,7 +31,7 @@ static Broker of( Map configs, Map dirs, Collection topics) { - var config = Config.of(configs); + var config = new Config(configs); var partitionsFromTopicDesc = topics.stream() .flatMap( diff --git a/common/src/main/java/org/astraea/common/admin/Config.java b/common/src/main/java/org/astraea/common/admin/Config.java index 88b049ff3d..a57443f3fa 100644 --- a/common/src/main/java/org/astraea/common/admin/Config.java +++ b/common/src/main/java/org/astraea/common/admin/Config.java @@ -20,36 +20,15 @@ import java.util.Optional; /** this interface used to represent the resource (topic or broker) configuration. */ -public interface Config { +public record Config(Map raw) { - Config EMPTY = Config.of(Map.of()); + static Config EMPTY = new Config(Map.of()); - static Config of(Map configs) { - return new Config() { - @Override - public Map raw() { - return configs; - } - - @Override - public Optional value(String key) { - return Optional.ofNullable(configs.get(key)); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null || getClass() != obj.getClass()) return false; - var objConfig = (Config) obj; - return raw().equals(objConfig.raw()); - } - }; - } - - Map raw(); /** * @param key config key * @return the value associated to input key. otherwise, empty */ - Optional value(String key); + public Optional value(String key) { + return Optional.ofNullable(raw.get(key)); + } } diff --git a/common/src/main/java/org/astraea/common/admin/Topic.java b/common/src/main/java/org/astraea/common/admin/Topic.java index 4d9d6232e5..f7e7b8ff19 100644 --- a/common/src/main/java/org/astraea/common/admin/Topic.java +++ b/common/src/main/java/org/astraea/common/admin/Topic.java @@ -27,7 +27,7 @@ static Topic of( org.apache.kafka.clients.admin.TopicDescription topicDescription, Map kafkaConfig) { - var config = Config.of(kafkaConfig); + var config = new Config(kafkaConfig); var topicPartitions = topicDescription.partitions().stream() .map(p -> TopicPartition.of(name, p.partition())) From 6f4658f1ea714ff1be97c642d9e0da63531111a2 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Fri, 12 May 2023 23:18:16 +0800 Subject: [PATCH 06/11] Make Broker into record. --- .../java/org/astraea/common/ByteUtils.java | 9 +- .../java/org/astraea/common/admin/Broker.java | 81 ++++--------- .../common/admin/ClusterInfoBuilder.java | 108 +++--------------- .../common/admin/ClusterInfoBuilderTest.java | 22 ---- .../common/balancer/FakeClusterInfo.java | 75 +++--------- 5 files changed, 55 insertions(+), 240 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 4f4c0b1fbd..b1b93eda35 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -400,9 +400,8 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { Collectors.toMap( entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)); - return (Broker.DataFolder) - new Broker.DataFolder.DataFolderImpl( - path, partitionSizes, orphanPartitionSizes); + return new Broker.DataFolder( + path, partitionSizes, orphanPartitionSizes); }) .toList(); var topicPartitions = @@ -414,10 +413,10 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) .collect(Collectors.toSet()); return (NodeInfo) - new Broker.BrokerImpl( + new Broker( + id, host, port, - id, isController, config, dataFolders, diff --git a/common/src/main/java/org/astraea/common/admin/Broker.java b/common/src/main/java/org/astraea/common/admin/Broker.java index 3a2acdd112..2eac660269 100644 --- a/common/src/main/java/org/astraea/common/admin/Broker.java +++ b/common/src/main/java/org/astraea/common/admin/Broker.java @@ -23,7 +23,19 @@ import java.util.stream.Collectors; import org.apache.kafka.common.requests.DescribeLogDirsResponse; -public interface Broker extends NodeInfo { +public record Broker( + int id, + String host, + int port, + boolean isController, + // config used by this node + Config config, + // the disk folder used to stored data by this node + List dataFolders, + Set topicPartitions, + // partition leaders hosted by this broker + Set topicPartitionLeaders) + implements NodeInfo { static Broker of( boolean isController, @@ -60,8 +72,7 @@ static Broker of( tpAndSize -> !partitionsFromTopicDesc.contains(tpAndSize.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - return (DataFolder) - new DataFolder.DataFolderImpl(path, partitionSizes, orphanPartitionSizes); + return new DataFolder(path, partitionSizes, orphanPartitionSizes); }) .toList(); var topicPartitionLeaders = @@ -72,10 +83,10 @@ static Broker of( .filter(p -> p.leader() != null && p.leader().id() == nodeInfo.id()) .map(p -> TopicPartition.of(topic.name(), p.partition()))) .collect(Collectors.toUnmodifiableSet()); - return new BrokerImpl( + return new Broker( + nodeInfo.id(), nodeInfo.host(), nodeInfo.port(), - nodeInfo.id(), isController, config, folders, @@ -83,57 +94,11 @@ static Broker of( topicPartitionLeaders); } - boolean isController(); - - /** - * @return config used by this node - */ - Config config(); - - /** - * @return the disk folder used to stored data by this node - */ - List dataFolders(); - - Set topicPartitions(); - - /** - * @return partition leaders hosted by this broker - */ - Set topicPartitionLeaders(); - - interface DataFolder { - - /** - * @return the path on the local disk - */ - String path(); - - /** - * @return topic partition hosed by this node and size of files - */ - Map partitionSizes(); - - /** - * @return topic partition located by this node but not traced by cluster - */ - Map orphanPartitionSizes(); - - record DataFolderImpl( - String path, - Map partitionSizes, - Map orphanPartitionSizes) - implements DataFolder {} - } - - record BrokerImpl( - String host, - int port, - int id, - boolean isController, - Config config, - List dataFolders, - Set topicPartitions, - Set topicPartitionLeaders) - implements Broker {} + public record DataFolder( + // the path on the local disk + String path, + // topic partition hosed by this node and size of files + Map partitionSizes, + // topic partition located by this node but not traced by cluster + Map orphanPartitionSizes) {} } diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java index c06919747a..a57da43e73 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java @@ -88,7 +88,7 @@ public ClusterInfoBuilder addNode(Set brokerIds) { + " but another broker with this id already existed"); }); return Stream.concat(nodes.stream(), brokerIds.stream().map(ClusterInfoBuilder::fakeNode)) - .collect(Collectors.toUnmodifiableList()); + .toList(); }); } @@ -108,17 +108,18 @@ public ClusterInfoBuilder addFolders(Map> folders) { .map( node -> { if (folders.containsKey(node.id())) - return FakeBroker.of( + return fakeBroker( node.id(), node.host(), node.port(), Stream.concat( ((Broker) node).dataFolders().stream(), - folders.get(node.id()).stream().map(FakeDataFolder::of)) - .collect(Collectors.toUnmodifiableList())); + folders.get(node.id()).stream() + .map(ClusterInfoBuilder::fakeDataFolder)) + .toList()); else return node; }) - .collect(Collectors.toUnmodifiableList()); + .toList(); }); } @@ -217,8 +218,7 @@ public ClusterInfoBuilder addTopic( })) .map(mapper); - return Stream.concat(replicas.stream(), newTopic) - .collect(Collectors.toUnmodifiableList()); + return Stream.concat(replicas.stream(), newTopic).toList(); }); } @@ -229,9 +229,7 @@ public ClusterInfoBuilder addTopic( * @return this. */ public ClusterInfoBuilder mapLog(Function mapper) { - return applyReplicas( - (nodes, replicas) -> - replicas.stream().map(mapper).collect(Collectors.toUnmodifiableList())); + return applyReplicas((nodes, replicas) -> replicas.stream().map(mapper).toList()); } /** @@ -264,7 +262,7 @@ public ClusterInfoBuilder reassignReplica( return r; } }) - .collect(Collectors.toUnmodifiableList()); + .toList(); if (!matched.get()) throw new IllegalArgumentException("No such replica: " + replica); return collect; }); @@ -297,7 +295,7 @@ public ClusterInfoBuilder setPreferredLeader(TopicPartitionReplica replica) { return r; } }) - .collect(Collectors.toUnmodifiableList()); + .toList(); if (!matched.get()) throw new IllegalArgumentException("No such replica: " + replica); return collect; @@ -325,90 +323,14 @@ private static Broker fakeNode(int brokerId) { var port = new Random(brokerId).nextInt(65535) + 1; var folders = List.of(); - return FakeBroker.of(brokerId, host, port, folders); + return fakeBroker(brokerId, host, port, folders); } - interface FakeBroker extends Broker { - - static FakeBroker of(int id, String host, int port, List folders) { - var hashCode = Objects.hash(id, host, port); - return new FakeBroker() { - @Override - public List dataFolders() { - return folders; - } - - @Override - public String host() { - return host; - } - - @Override - public int port() { - return port; - } - - @Override - public int id() { - return id; - } - - @Override - public String toString() { - return "FakeNodeInfo{" + "host=" + host() + ", id=" + id() + ", port=" + port() + '}'; - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public boolean equals(Object other) { - if (other instanceof NodeInfo) { - var node = (NodeInfo) other; - return id() == node.id() && port() == node.port() && host().equals(node.host()); - } - return false; - } - }; - } - - @Override - default boolean isController() { - throw new UnsupportedOperationException(); - } - - @Override - default Config config() { - throw new UnsupportedOperationException(); - } - - @Override - default Set topicPartitions() { - throw new UnsupportedOperationException(); - } - - @Override - default Set topicPartitionLeaders() { - throw new UnsupportedOperationException(); - } + static Broker fakeBroker(int Id, String host, int port, List dataFolders) { + return new Broker(Id, host, port, false, new Config(Map.of()), dataFolders, Set.of(), Set.of()); } - interface FakeDataFolder extends Broker.DataFolder { - - static FakeDataFolder of(String path) { - return () -> path; - } - - @Override - default Map partitionSizes() { - throw new UnsupportedOperationException(); - } - - @Override - default Map orphanPartitionSizes() { - throw new UnsupportedOperationException(); - } + private static Broker.DataFolder fakeDataFolder(String path) { + return new Broker.DataFolder(path, Map.of(), Map.of()); } } diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java index b7d2851632..36d931b109 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java @@ -22,10 +22,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; class ClusterInfoBuilderTest { @@ -322,23 +319,4 @@ void setPreferredLeader() { ClusterInfo.builder().setPreferredLeader(TopicPartitionReplica.of("no", 0, 0)).build(), "No such replica"); } - - @DisplayName("FakeBroker can interact with normal NodeInfo properly") - @ParameterizedTest - @CsvSource( - value = { - " 1, host1, 1000", - " 20, host2, 2000", - "300, host3, 3000", - }) - void testFakeBrokerInteraction(int id, String host, int port) { - var node0 = ClusterInfoBuilder.FakeBroker.of(id, host, port, List.of()); - var node1 = NodeInfo.of(id, host, port); - var node2 = NodeInfo.of(id + 1, host, port); - - Assertions.assertEquals(node0.hashCode(), node1.hashCode()); - Assertions.assertNotEquals(node0.hashCode(), node2.hashCode()); - Assertions.assertEquals(node0, node1); - Assertions.assertNotEquals(node0, node2); - } } diff --git a/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java b/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java index 3857c4d145..2313c9c49d 100644 --- a/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java +++ b/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java @@ -79,67 +79,18 @@ public static ClusterInfo of( .mapToObj(nodeId -> NodeInfo.of(nodeId, "host" + nodeId, 9092)) .map( node -> - new Broker() { - @Override - public boolean isController() { - throw new UnsupportedOperationException(); - } - - @Override - public Config config() { - throw new UnsupportedOperationException(); - } - - @Override - public List dataFolders() { - return dataDirectories.stream() - .map( - x -> - new DataFolder() { - @Override - public String path() { - return x; - } - - @Override - public Map partitionSizes() { - throw new UnsupportedOperationException(); - } - - @Override - public Map orphanPartitionSizes() { - throw new UnsupportedOperationException(); - } - }) - .collect(Collectors.toUnmodifiableList()); - } - - @Override - public Set topicPartitions() { - throw new UnsupportedOperationException(); - } - - @Override - public Set topicPartitionLeaders() { - throw new UnsupportedOperationException(); - } - - @Override - public String host() { - return node.host(); - } - - @Override - public int port() { - return node.port(); - } - - @Override - public int id() { - return node.id(); - } - }) - .collect(Collectors.toUnmodifiableList()); + new Broker( + node.id(), + node.host(), + node.port(), + false, + new Config(Map.of()), + dataDirectories.stream() + .map(path -> new Broker.DataFolder(path, Map.of(), Map.of())) + .toList(), + Set.of(), + Set.of())) + .toList(); final var dataDirectoryList = List.copyOf(dataDirectories); final var topics = topicNameGenerator.apply(topicCount); final var replicas = @@ -168,7 +119,7 @@ public int id() { dataDirectoryList.get( tp.partition() % dataDirectories.size())) .build())) - .collect(Collectors.toUnmodifiableList()); + .toList(); return ClusterInfo.of("fake", List.copyOf(nodes), Map.of(), replicas); } From 85984f0c759601d920b52be39136b9af4f70265a Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Wed, 24 May 2023 15:09:09 +0800 Subject: [PATCH 07/11] update with main branch. --- .../java/org/astraea/common/ByteUtils.java | 130 +++++++++++++++--- .../common/admin/ClusterInfoBuilder.java | 15 +- .../common/generated/admin/NodeInfo.proto | 9 -- .../common/generated/admin/Replica.proto | 19 +-- .../org/astraea/common/ByteUtilsTest.java | 4 +- .../common/balancer/FakeClusterInfo.java | 2 +- 6 files changed, 136 insertions(+), 43 deletions(-) delete mode 100644 common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 3860182b23..da47c70554 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -37,7 +37,6 @@ import org.astraea.common.generated.PrimitiveOuterClass; import org.astraea.common.generated.admin.BrokerOuterClass; import org.astraea.common.generated.admin.ClusterInfoOuterClass; -import org.astraea.common.generated.admin.NodeInfoOuterClass; import org.astraea.common.generated.admin.ReplicaOuterClass; import org.astraea.common.generated.admin.TopicOuterClass; import org.astraea.common.generated.admin.TopicPartitionOuterClass; @@ -269,15 +268,72 @@ public static byte[] toBytes(ClusterInfo value) { ReplicaOuterClass.Replica.newBuilder() .setTopic(replica.topic()) .setPartition(replica.partition()) - .setNodeInfo( - NodeInfoOuterClass.ClusterInfo.NodeInfo.newBuilder() + .setBroker( + BrokerOuterClass.Broker.newBuilder() .setId(replica.broker().id()) .setHost(replica.broker().host()) .setPort(replica.broker().port()) + .setIsController(replica.broker().isController()) + .putAllConfig(replica.broker().config().raw()) + .addAllDatafolder( + replica.broker().dataFolders().stream() + .map( + dataFolder -> + BrokerOuterClass.Broker.Datafolder.newBuilder() + .setPath(dataFolder.path()) + .putAllPartitionSizes( + dataFolder + .partitionSizes() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> + entry + .getKey() + .toString(), + Map.Entry::getValue))) + .putAllOrphanPartitionSizes( + dataFolder + .orphanPartitionSizes() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> + entry + .getKey() + .toString(), + Map.Entry::getValue))) + .build()) + .toList()) + .addAllTopicPartitions( + replica.broker().topicPartitions().stream() + .map( + tp -> + TopicPartitionOuterClass.TopicPartition + .newBuilder() + .setPartition(tp.partition()) + .setTopic(tp.topic()) + .build()) + .toList()) + .addAllTopicPartitionLeaders( + replica.broker().topicPartitionLeaders().stream() + .map( + tp -> + TopicPartitionOuterClass.TopicPartition + .newBuilder() + .setPartition(tp.partition()) + .setTopic(tp.topic()) + .build()) + .toList()) .build()) .setLag(replica.lag()) .setSize(replica.size()) + .setInternal(replica.internal()) .setIsLeader(replica.isLeader()) + .setIsAdding(replica.isAdding()) + .setIsRemoving(replica.isRemoving()) .setIsSync(replica.isSync()) .setIsFuture(replica.isFuture()) .setIsOffline(replica.isOffline()) @@ -418,16 +474,15 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { broker.getTopicPartitionLeadersList().stream() .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) .collect(Collectors.toSet()); - return (NodeInfo) - new Broker( - id, - host, - port, - isController, - config, - dataFolders, - topicPartitions, - topicPartitionLeaders); + return new Broker( + id, + host, + port, + isController, + config, + dataFolders, + topicPartitions, + topicPartitionLeaders); }) .toList(), outerClusterInfo.getTopicList().stream() @@ -464,13 +519,54 @@ public Set topicPartitions() { .topic(replica.getTopic()) .partition(replica.getPartition()) .broker( - Broker.of( - replica.getNodeInfo().getId(), - replica.getNodeInfo().getHost(), - replica.getNodeInfo().getPort())) + new Broker( + replica.getBroker().getId(), + replica.getBroker().getHost(), + replica.getBroker().getPort(), + replica.getBroker().getIsController(), + new Config(replica.getBroker().getConfigMap()), + replica.getBroker().getDatafolderList().stream() + .map( + datafolder -> { + var path = datafolder.getPath(); + var partitionSizes = + datafolder + .getPartitionSizesMap() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> + TopicPartition.of(entry.getKey()), + Map.Entry::getValue)); + var orphanPartitionSizes = + datafolder + .getOrphanPartitionSizesMap() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> + TopicPartition.of(entry.getKey()), + Map.Entry::getValue)); + return new Broker.DataFolder( + path, partitionSizes, orphanPartitionSizes); + }) + .toList(), + replica.getBroker().getTopicPartitionsList().stream() + .map( + tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()), + replica.getBroker().getTopicPartitionLeadersList().stream() + .map( + tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()))) .lag(replica.getLag()) .size(replica.getSize()) + .internal(replica.getInternal()) .isLeader(replica.getIsLeader()) + .isAdding(replica.getIsAdding()) + .isRemoving(replica.getIsRemoving()) .isSync(replica.getIsSync()) .isFuture(replica.getIsFuture()) .isOffline(replica.getIsOffline()) diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java index 12496a24ba..2661d46071 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java @@ -89,7 +89,7 @@ public ClusterInfoBuilder addNode(Set brokerIds) { + " but another broker with this id already existed"); }); return Stream.concat(nodes.stream(), brokerIds.stream().map(ClusterInfoBuilder::fakeNode)) - .toList(); + .collect(Collectors.toUnmodifiableList()); }); } @@ -132,7 +132,7 @@ public ClusterInfoBuilder addFolders(Map> folders) { .toList()); else return node; }) - .toList(); + .collect(Collectors.toUnmodifiableList()); }); } @@ -227,7 +227,8 @@ public ClusterInfoBuilder addTopic( })) .map(mapper); - return Stream.concat(replicas.stream(), newTopic).toList(); + return Stream.concat(replicas.stream(), newTopic) + .collect(Collectors.toUnmodifiableList()); }); } @@ -238,7 +239,9 @@ public ClusterInfoBuilder addTopic( * @return this. */ public ClusterInfoBuilder mapLog(Function mapper) { - return applyReplicas((nodes, replicas) -> replicas.stream().map(mapper).toList()); + return applyReplicas( + (nodes, replicas) -> + replicas.stream().map(mapper).collect(Collectors.toUnmodifiableList())); } /** @@ -271,7 +274,7 @@ public ClusterInfoBuilder reassignReplica( return r; } }) - .toList(); + .collect(Collectors.toUnmodifiableList()); if (!matched.get()) throw new IllegalArgumentException("No such replica: " + replica); return collect; }); @@ -304,7 +307,7 @@ public ClusterInfoBuilder setPreferredLeader(TopicPartitionReplica replica) { return r; } }) - .toList(); + .collect(Collectors.toUnmodifiableList()); if (!matched.get()) throw new IllegalArgumentException("No such replica: " + replica); return collect; diff --git a/common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto b/common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto deleted file mode 100644 index 057219d597..0000000000 --- a/common/src/main/proto/org/astraea/common/generated/admin/NodeInfo.proto +++ /dev/null @@ -1,9 +0,0 @@ -syntax = "proto3"; - -package org.astraea.common.generated.admin; - -message NodeInfo { - int32 id = 1; - string host = 2; - int32 port = 3; -} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto index f257bed64b..2e0700aa9a 100644 --- a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto +++ b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto @@ -2,18 +2,21 @@ syntax = "proto3"; package org.astraea.common.generated.admin; -import "org/astraea/common/generated/admin/NodeInfo.proto"; +import "org/astraea/common/generated/admin/Broker.proto"; message Replica{ string topic = 1; int32 partition = 2; - NodeInfo nodeInfo = 3; + Broker broker = 3; int64 lag = 4; int64 size = 5; - bool isLeader = 6; - bool isSync = 7; - bool isFuture = 8; - bool isOffline = 9; - bool isPreferredLeader = 10; - string path = 11; + bool internal = 6; + bool isLeader = 7; + bool isAdding = 8; + bool isRemoving = 9; + bool isSync = 10; + bool isFuture = 11; + bool isOffline = 12; + bool isPreferredLeader = 13; + string path = 14; } \ No newline at end of file diff --git a/common/src/test/java/org/astraea/common/ByteUtilsTest.java b/common/src/test/java/org/astraea/common/ByteUtilsTest.java index 42ec89a7d6..03e7a0cc76 100644 --- a/common/src/test/java/org/astraea/common/ByteUtilsTest.java +++ b/common/src/test/java/org/astraea/common/ByteUtilsTest.java @@ -95,7 +95,7 @@ void testReadAndToBytesClusterInfo() { Assertions.assertDoesNotThrow(() -> ByteUtils.readClusterInfo(bytes)); var deserializedClusterInfo = ByteUtils.readClusterInfo(bytes); Assertions.assertEquals(clusterInfo.clusterId(), deserializedClusterInfo.clusterId()); - Assertions.assertEquals(clusterInfo.nodes(), deserializedClusterInfo.nodes()); + Assertions.assertEquals(clusterInfo.brokers(), deserializedClusterInfo.brokers()); Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); } @@ -109,7 +109,7 @@ void testReadAndToBytesEmptyClusterInfo() { var deserializedClusterInfo = ByteUtils.readClusterInfo(serializedInfo); Assertions.assertEquals(clusterInfo.clusterId(), deserializedClusterInfo.clusterId()); - Assertions.assertEquals(clusterInfo.nodes(), deserializedClusterInfo.nodes()); + Assertions.assertEquals(clusterInfo.brokers(), deserializedClusterInfo.brokers()); Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); } diff --git a/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java b/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java index 46ad23a42c..8331e68f5d 100644 --- a/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java +++ b/common/src/test/java/org/astraea/common/balancer/FakeClusterInfo.java @@ -118,7 +118,7 @@ public static ClusterInfo of( dataDirectoryList.get( tp.partition() % dataDirectories.size())) .build())) - .toList(); + .collect(Collectors.toUnmodifiableList()); return ClusterInfo.of("fake", List.copyOf(nodes), Map.of(), replicas); } From 7f978f59c677312f18de6478f823e611162c588b Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Wed, 24 May 2023 16:22:39 +0800 Subject: [PATCH 08/11] fix proto file format. --- .../proto/org/astraea/common/generated/admin/ClusterInfo.proto | 2 +- .../main/proto/org/astraea/common/generated/admin/Replica.proto | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto index ec452e0dce..260f65ec64 100644 --- a/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto +++ b/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto @@ -6,7 +6,7 @@ import "org/astraea/common/generated/admin/Broker.proto"; import "org/astraea/common/generated/admin/Topic.proto"; import "org/astraea/common/generated/admin/Replica.proto"; -message ClusterInfo{ +message ClusterInfo { string clusterId = 1; repeated Broker broker = 2; repeated Topic topic = 3; diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto index 2e0700aa9a..3e81de9013 100644 --- a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto +++ b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto @@ -4,7 +4,7 @@ package org.astraea.common.generated.admin; import "org/astraea/common/generated/admin/Broker.proto"; -message Replica{ +message Replica { string topic = 1; int32 partition = 2; Broker broker = 3; From 7d2991ab47f3fb407bb49404187cb6bdf311d556 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Thu, 25 May 2023 12:32:48 +0800 Subject: [PATCH 09/11] fix typo. reconstruct the code. --- .../java/org/astraea/common/ByteUtils.java | 441 +++++++----------- .../common/generated/admin/Broker.proto | 4 +- 2 files changed, 161 insertions(+), 284 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index da47c70554..83f636ace3 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -194,153 +194,9 @@ public static byte[] toBytes(BeanObject value) { public static byte[] toBytes(ClusterInfo value) { return ClusterInfoOuterClass.ClusterInfo.newBuilder() .setClusterId(value.clusterId()) - .addAllBroker( - value.brokers().stream() - .map( - broker -> - BrokerOuterClass.Broker.newBuilder() - .setId(broker.id()) - .setHost(broker.host()) - .setPort(broker.port()) - .setIsController(broker.isController()) - .putAllConfig(broker.config().raw()) - .addAllDatafolder( - broker.dataFolders().stream() - .map( - dataFolder -> - BrokerOuterClass.Broker.Datafolder.newBuilder() - .setPath(dataFolder.path()) - .putAllPartitionSizes( - dataFolder.partitionSizes().entrySet().stream() - .collect( - Collectors.toMap( - entry -> entry.getKey().toString(), - Map.Entry::getValue))) - .putAllOrphanPartitionSizes( - dataFolder - .orphanPartitionSizes() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> entry.getKey().toString(), - Map.Entry::getValue))) - .build()) - .toList()) - .addAllTopicPartitions( - broker.topicPartitions().stream() - .map( - tp -> - TopicPartitionOuterClass.TopicPartition.newBuilder() - .setPartition(tp.partition()) - .setTopic(tp.topic()) - .build()) - .toList()) - .addAllTopicPartitionLeaders( - broker.topicPartitionLeaders().stream() - .map( - tp -> - TopicPartitionOuterClass.TopicPartition.newBuilder() - .setPartition(tp.partition()) - .setTopic(tp.topic()) - .build()) - .toList()) - .build()) - .toList()) - .addAllTopic( - value.topics().values().stream() - .map( - topicClass -> - TopicOuterClass.Topic.newBuilder() - .setName(topicClass.name()) - .putAllConfig(topicClass.config().raw()) - .setInternal(topicClass.internal()) - .addAllPartition( - topicClass.topicPartitions().stream() - .map(TopicPartition::partition) - .collect(Collectors.toList())) - .build()) - .toList()) - .addAllReplica( - value.replicas().stream() - .map( - replica -> - ReplicaOuterClass.Replica.newBuilder() - .setTopic(replica.topic()) - .setPartition(replica.partition()) - .setBroker( - BrokerOuterClass.Broker.newBuilder() - .setId(replica.broker().id()) - .setHost(replica.broker().host()) - .setPort(replica.broker().port()) - .setIsController(replica.broker().isController()) - .putAllConfig(replica.broker().config().raw()) - .addAllDatafolder( - replica.broker().dataFolders().stream() - .map( - dataFolder -> - BrokerOuterClass.Broker.Datafolder.newBuilder() - .setPath(dataFolder.path()) - .putAllPartitionSizes( - dataFolder - .partitionSizes() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> - entry - .getKey() - .toString(), - Map.Entry::getValue))) - .putAllOrphanPartitionSizes( - dataFolder - .orphanPartitionSizes() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> - entry - .getKey() - .toString(), - Map.Entry::getValue))) - .build()) - .toList()) - .addAllTopicPartitions( - replica.broker().topicPartitions().stream() - .map( - tp -> - TopicPartitionOuterClass.TopicPartition - .newBuilder() - .setPartition(tp.partition()) - .setTopic(tp.topic()) - .build()) - .toList()) - .addAllTopicPartitionLeaders( - replica.broker().topicPartitionLeaders().stream() - .map( - tp -> - TopicPartitionOuterClass.TopicPartition - .newBuilder() - .setPartition(tp.partition()) - .setTopic(tp.topic()) - .build()) - .toList()) - .build()) - .setLag(replica.lag()) - .setSize(replica.size()) - .setInternal(replica.internal()) - .setIsLeader(replica.isLeader()) - .setIsAdding(replica.isAdding()) - .setIsRemoving(replica.isRemoving()) - .setIsSync(replica.isSync()) - .setIsFuture(replica.isFuture()) - .setIsOffline(replica.isOffline()) - .setIsPreferredLeader(replica.isPreferredLeader()) - .setPath(replica.path()) - .build()) - .toList()) + .addAllBroker(value.brokers().stream().map(ByteUtils::toOuterClass).toList()) + .addAllTopic(value.topics().values().stream().map(ByteUtils::toOuterClass).toList()) + .addAllReplica(value.replicas().stream().map(ByteUtils::toOuterClass).toList()) .build() .toByteArray(); } @@ -437,148 +293,169 @@ public static ClusterInfo readClusterInfo(byte[] bytes) { var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes); return ClusterInfo.of( outerClusterInfo.getClusterId(), - outerClusterInfo.getBrokerList().stream() - .map( - broker -> { - var host = broker.getHost(); - var port = broker.getPort(); - var id = broker.getId(); - var isController = broker.getIsController(); - var config = new Config(broker.getConfigMap()); - var dataFolders = - broker.getDatafolderList().stream() - .map( - datafolder -> { - var path = datafolder.getPath(); - var partitionSizes = - datafolder.getPartitionSizesMap().entrySet().stream() - .collect( - Collectors.toMap( - entry -> TopicPartition.of(entry.getKey()), - Map.Entry::getValue)); - var orphanPartitionSizes = - datafolder.getOrphanPartitionSizesMap().entrySet().stream() - .collect( - Collectors.toMap( - entry -> TopicPartition.of(entry.getKey()), - Map.Entry::getValue)); - return new Broker.DataFolder( - path, partitionSizes, orphanPartitionSizes); - }) - .toList(); - var topicPartitions = - broker.getTopicPartitionsList().stream() - .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) - .collect(Collectors.toSet()); - var topicPartitionLeaders = - broker.getTopicPartitionLeadersList().stream() - .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) - .collect(Collectors.toSet()); - return new Broker( - id, - host, - port, - isController, - config, - dataFolders, - topicPartitions, - topicPartitionLeaders); - }) - .toList(), + outerClusterInfo.getBrokerList().stream().map(ByteUtils::toBroker).toList(), outerClusterInfo.getTopicList().stream() - .map( - protoTopic -> - new Topic() { - @Override - public String name() { - return protoTopic.getName(); - } - - @Override - public Config config() { - return new Config(protoTopic.getConfigMap()); - } - - @Override - public boolean internal() { - return protoTopic.getInternal(); - } - - @Override - public Set topicPartitions() { - return protoTopic.getPartitionList().stream() - .map(tp -> TopicPartition.of(protoTopic.getName(), tp)) - .collect(Collectors.toSet()); - } - }) + .map(ByteUtils::toTopic) .collect(Collectors.toMap(Topic::name, Function.identity())), - outerClusterInfo.getReplicaList().stream() - .map( - replica -> - Replica.builder() - .topic(replica.getTopic()) - .partition(replica.getPartition()) - .broker( - new Broker( - replica.getBroker().getId(), - replica.getBroker().getHost(), - replica.getBroker().getPort(), - replica.getBroker().getIsController(), - new Config(replica.getBroker().getConfigMap()), - replica.getBroker().getDatafolderList().stream() - .map( - datafolder -> { - var path = datafolder.getPath(); - var partitionSizes = - datafolder - .getPartitionSizesMap() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> - TopicPartition.of(entry.getKey()), - Map.Entry::getValue)); - var orphanPartitionSizes = - datafolder - .getOrphanPartitionSizesMap() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> - TopicPartition.of(entry.getKey()), - Map.Entry::getValue)); - return new Broker.DataFolder( - path, partitionSizes, orphanPartitionSizes); - }) - .toList(), - replica.getBroker().getTopicPartitionsList().stream() - .map( - tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) - .collect(Collectors.toSet()), - replica.getBroker().getTopicPartitionLeadersList().stream() - .map( - tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) - .collect(Collectors.toSet()))) - .lag(replica.getLag()) - .size(replica.getSize()) - .internal(replica.getInternal()) - .isLeader(replica.getIsLeader()) - .isAdding(replica.getIsAdding()) - .isRemoving(replica.getIsRemoving()) - .isSync(replica.getIsSync()) - .isFuture(replica.getIsFuture()) - .isOffline(replica.getIsOffline()) - .isPreferredLeader(replica.getIsPreferredLeader()) - .path(replica.getPath()) - .build()) - .toList()); + outerClusterInfo.getReplicaList().stream().map(ByteUtils::toReplica).toList()); } catch (InvalidProtocolBufferException ex) { throw new SerializationException(ex); } } + // ---------------------------Serialize To ProtoBuf Outer Class------------------------------- // + + private static BrokerOuterClass.Broker.DataFolder toOuterClass(Broker.DataFolder dataFolder) { + return BrokerOuterClass.Broker.DataFolder.newBuilder() + .setPath(dataFolder.path()) + .putAllPartitionSizes( + dataFolder.partitionSizes().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue))) + .putAllOrphanPartitionSizes( + dataFolder.orphanPartitionSizes().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue))) + .build(); + } + + private static BrokerOuterClass.Broker toOuterClass(Broker broker) { + return BrokerOuterClass.Broker.newBuilder() + .setId(broker.id()) + .setHost(broker.host()) + .setPort(broker.port()) + .setIsController(broker.isController()) + .putAllConfig(broker.config().raw()) + .addAllDataFolder(broker.dataFolders().stream().map(ByteUtils::toOuterClass).toList()) + .addAllTopicPartitions( + broker.topicPartitions().stream() + .map( + tp -> + TopicPartitionOuterClass.TopicPartition.newBuilder() + .setPartition(tp.partition()) + .setTopic(tp.topic()) + .build()) + .toList()) + .addAllTopicPartitionLeaders( + broker.topicPartitionLeaders().stream() + .map( + tp -> + TopicPartitionOuterClass.TopicPartition.newBuilder() + .setPartition(tp.partition()) + .setTopic(tp.topic()) + .build()) + .toList()) + .build(); + } + + private static TopicOuterClass.Topic toOuterClass(Topic topic) { + return TopicOuterClass.Topic.newBuilder() + .setName(topic.name()) + .putAllConfig(topic.config().raw()) + .setInternal(topic.internal()) + .addAllPartition( + topic.topicPartitions().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList())) + .build(); + } + + private static ReplicaOuterClass.Replica toOuterClass(Replica replica) { + return ReplicaOuterClass.Replica.newBuilder() + .setTopic(replica.topic()) + .setPartition(replica.partition()) + .setBroker(toOuterClass(replica.broker())) + .setLag(replica.lag()) + .setSize(replica.size()) + .setInternal(replica.internal()) + .setIsLeader(replica.isLeader()) + .setIsAdding(replica.isAdding()) + .setIsRemoving(replica.isRemoving()) + .setIsSync(replica.isSync()) + .setIsFuture(replica.isFuture()) + .setIsOffline(replica.isOffline()) + .setIsPreferredLeader(replica.isPreferredLeader()) + .setPath(replica.path()) + .build(); + } + + // -------------------------Deserialize From ProtoBuf Outer Class----------------------------- // + + private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) { + var path = dataFolder.getPath(); + var partitionSizes = + dataFolder.getPartitionSizesMap().entrySet().stream() + .collect( + Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)); + var orphanPartitionSizes = + dataFolder.getOrphanPartitionSizesMap().entrySet().stream() + .collect( + Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)); + return new Broker.DataFolder(path, partitionSizes, orphanPartitionSizes); + } + + private static Broker toBroker(BrokerOuterClass.Broker broker) { + var host = broker.getHost(); + var port = broker.getPort(); + var id = broker.getId(); + var isController = broker.getIsController(); + var config = new Config(broker.getConfigMap()); + var dataFolders = broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(); + var topicPartitions = + broker.getTopicPartitionsList().stream() + .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()); + var topicPartitionLeaders = + broker.getTopicPartitionLeadersList().stream() + .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .collect(Collectors.toSet()); + return new Broker( + id, host, port, isController, config, dataFolders, topicPartitions, topicPartitionLeaders); + } + + private static Topic toTopic(TopicOuterClass.Topic topic) { + return new Topic() { + @Override + public String name() { + return topic.getName(); + } + + @Override + public Config config() { + return new Config(topic.getConfigMap()); + } + + @Override + public boolean internal() { + return topic.getInternal(); + } + + @Override + public Set topicPartitions() { + return topic.getPartitionList().stream() + .map(tp -> TopicPartition.of(topic.getName(), tp)) + .collect(Collectors.toSet()); + } + }; + } + + private static Replica toReplica(ReplicaOuterClass.Replica replica) { + return Replica.builder() + .topic(replica.getTopic()) + .partition(replica.getPartition()) + .broker(toBroker(replica.getBroker())) + .lag(replica.getLag()) + .size(replica.getSize()) + .internal(replica.getInternal()) + .isLeader(replica.getIsLeader()) + .isAdding(replica.getIsAdding()) + .isRemoving(replica.getIsRemoving()) + .isSync(replica.getIsSync()) + .isFuture(replica.getIsFuture()) + .isOffline(replica.getIsOffline()) + .isPreferredLeader(replica.getIsPreferredLeader()) + .path(replica.getPath()) + .build(); + } + // --------------------------------ProtoBuf Primitive----------------------------------------- // /** diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto b/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto index 1f67d7ee63..cf463e193c 100644 --- a/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto +++ b/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto @@ -10,11 +10,11 @@ message Broker { int32 port = 3; bool isController = 4; map config = 5; - repeated Datafolder datafolder = 6; + repeated DataFolder dataFolder = 6; repeated TopicPartition topicPartitions = 7; repeated TopicPartition topicPartitionLeaders = 8; - message Datafolder { + message DataFolder { string path = 1; map partitionSizes = 2; map orphanPartitionSizes = 3; From 1488f06803d1301d27ff8ab5aa7be359df4b49c2 Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Thu, 25 May 2023 12:47:52 +0800 Subject: [PATCH 10/11] add toTopicPartition and toOuterClass. --- .../java/org/astraea/common/ByteUtils.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 83f636ace3..b807967d32 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -317,6 +317,14 @@ private static BrokerOuterClass.Broker.DataFolder toOuterClass(Broker.DataFolder .build(); } + private static TopicPartitionOuterClass.TopicPartition toOuterClass( + TopicPartition topicPartition) { + return TopicPartitionOuterClass.TopicPartition.newBuilder() + .setPartition(topicPartition.partition()) + .setTopic(topicPartition.topic()) + .build(); + } + private static BrokerOuterClass.Broker toOuterClass(Broker broker) { return BrokerOuterClass.Broker.newBuilder() .setId(broker.id()) @@ -326,23 +334,9 @@ private static BrokerOuterClass.Broker toOuterClass(Broker broker) { .putAllConfig(broker.config().raw()) .addAllDataFolder(broker.dataFolders().stream().map(ByteUtils::toOuterClass).toList()) .addAllTopicPartitions( - broker.topicPartitions().stream() - .map( - tp -> - TopicPartitionOuterClass.TopicPartition.newBuilder() - .setPartition(tp.partition()) - .setTopic(tp.topic()) - .build()) - .toList()) + broker.topicPartitions().stream().map(ByteUtils::toOuterClass).toList()) .addAllTopicPartitionLeaders( - broker.topicPartitionLeaders().stream() - .map( - tp -> - TopicPartitionOuterClass.TopicPartition.newBuilder() - .setPartition(tp.partition()) - .setTopic(tp.topic()) - .build()) - .toList()) + broker.topicPartitionLeaders().stream().map(ByteUtils::toOuterClass).toList()) .build(); } @@ -392,6 +386,11 @@ private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder return new Broker.DataFolder(path, partitionSizes, orphanPartitionSizes); } + private static TopicPartition toTopicPartition( + TopicPartitionOuterClass.TopicPartition topicPartition) { + return TopicPartition.of(topicPartition.getTopic(), topicPartition.getPartition()); + } + private static Broker toBroker(BrokerOuterClass.Broker broker) { var host = broker.getHost(); var port = broker.getPort(); @@ -401,11 +400,11 @@ private static Broker toBroker(BrokerOuterClass.Broker broker) { var dataFolders = broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(); var topicPartitions = broker.getTopicPartitionsList().stream() - .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .map(ByteUtils::toTopicPartition) .collect(Collectors.toSet()); var topicPartitionLeaders = broker.getTopicPartitionLeadersList().stream() - .map(tp -> TopicPartition.of(tp.getTopic(), tp.getPartition())) + .map(ByteUtils::toTopicPartition) .collect(Collectors.toSet()); return new Broker( id, host, port, isController, config, dataFolders, topicPartitions, topicPartitionLeaders); From 7d5382baf444ce5649f8fa9165ac36906d5164aa Mon Sep 17 00:00:00 2001 From: Chao-Heng Lee Date: Sat, 27 May 2023 15:05:10 +0800 Subject: [PATCH 11/11] fix conflict. --- .../java/org/astraea/common/ByteUtils.java | 50 ++++++++----------- .../common/generated/admin/Replica.proto | 10 ++-- .../common/generated/admin/Topic.proto | 2 +- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 8847752452..ad30ffa53d 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -345,10 +345,7 @@ private static TopicOuterClass.Topic toOuterClass(Topic topic) { .setName(topic.name()) .putAllConfig(topic.config().raw()) .setInternal(topic.internal()) - .addAllPartition( - topic.topicPartitions().stream() - .map(TopicPartition::partition) - .collect(Collectors.toList())) + .addAllPartitionIds(topic.partitionIds()) .build(); } @@ -359,7 +356,7 @@ private static ReplicaOuterClass.Replica toOuterClass(Replica replica) { .setBroker(toOuterClass(replica.broker())) .setLag(replica.lag()) .setSize(replica.size()) - .setInternal(replica.internal()) + .setIsInternal(replica.isInternal()) .setIsLeader(replica.isLeader()) .setIsAdding(replica.isAdding()) .setIsRemoving(replica.isRemoving()) @@ -374,48 +371,43 @@ private static ReplicaOuterClass.Replica toOuterClass(Replica replica) { // -------------------------Deserialize From ProtoBuf Outer Class----------------------------- // private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) { - var path = dataFolder.getPath(); - var partitionSizes = + return new Broker.DataFolder( + dataFolder.getPath(), dataFolder.getPartitionSizesMap().entrySet().stream() .collect( - Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)); - var orphanPartitionSizes = + Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)), dataFolder.getOrphanPartitionSizesMap().entrySet().stream() .collect( - Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)); - return new Broker.DataFolder(path, partitionSizes, orphanPartitionSizes); + Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue))); } private static TopicPartition toTopicPartition( TopicPartitionOuterClass.TopicPartition topicPartition) { - return TopicPartition.of(topicPartition.getTopic(), topicPartition.getPartition()); + return new TopicPartition(topicPartition.getTopic(), topicPartition.getPartition()); } private static Broker toBroker(BrokerOuterClass.Broker broker) { - var host = broker.getHost(); - var port = broker.getPort(); - var id = broker.getId(); - var isController = broker.getIsController(); - var config = new Config(broker.getConfigMap()); - var dataFolders = broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(); - var topicPartitions = + return new Broker( + broker.getId(), + broker.getHost(), + broker.getPort(), + broker.getIsController(), + new Config(broker.getConfigMap()), + broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(), broker.getTopicPartitionsList().stream() .map(ByteUtils::toTopicPartition) - .collect(Collectors.toSet()); - var topicPartitionLeaders = + .collect(Collectors.toSet()), broker.getTopicPartitionLeadersList().stream() .map(ByteUtils::toTopicPartition) - .collect(Collectors.toSet()); - return new Broker( - id, host, port, isController, config, dataFolders, topicPartitions, topicPartitionLeaders); + .collect(Collectors.toSet())); } private static Topic toTopic(TopicOuterClass.Topic topic) { return new Topic( - topic.getName(), - new Config(topic.getConfigMap()), - topic.getInternal(), - Set.copyOf(topic.getPartitionList())); + topic.getName(), + new Config(topic.getConfigMap()), + topic.getInternal(), + Set.copyOf(topic.getPartitionIdsList())); } private static Replica toReplica(ReplicaOuterClass.Replica replica) { @@ -425,7 +417,7 @@ private static Replica toReplica(ReplicaOuterClass.Replica replica) { .broker(toBroker(replica.getBroker())) .lag(replica.getLag()) .size(replica.getSize()) - .internal(replica.getInternal()) + .isInternal(replica.getIsInternal()) .isLeader(replica.getIsLeader()) .isAdding(replica.getIsAdding()) .isRemoving(replica.getIsRemoving()) diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto index 3e81de9013..598ade270e 100644 --- a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto +++ b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto @@ -8,15 +8,15 @@ message Replica { string topic = 1; int32 partition = 2; Broker broker = 3; - int64 lag = 4; - int64 size = 5; - bool internal = 6; bool isLeader = 7; + bool isSync = 10; + bool isOffline = 12; bool isAdding = 8; bool isRemoving = 9; - bool isSync = 10; bool isFuture = 11; - bool isOffline = 12; bool isPreferredLeader = 13; + int64 lag = 4; + int64 size = 5; string path = 14; + bool isInternal = 6; } \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto b/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto index 5721d5d8f3..cb46f4b9d7 100644 --- a/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto +++ b/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto @@ -6,5 +6,5 @@ message Topic { string name = 1; map config = 2; bool internal = 3; - repeated int32 partition = 4; + repeated int32 partitionIds = 4; } \ No newline at end of file