diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index cf0301f3a4..ad30ffa53d 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -34,8 +34,12 @@ import org.astraea.common.admin.Topic; import org.astraea.common.admin.TopicPartition; import org.astraea.common.generated.BeanObjectOuterClass; -import org.astraea.common.generated.ClusterInfoOuterClass; import org.astraea.common.generated.PrimitiveOuterClass; +import org.astraea.common.generated.admin.BrokerOuterClass; +import org.astraea.common.generated.admin.ClusterInfoOuterClass; +import org.astraea.common.generated.admin.ReplicaOuterClass; +import org.astraea.common.generated.admin.TopicOuterClass; +import org.astraea.common.generated.admin.TopicPartitionOuterClass; import org.astraea.common.metrics.BeanObject; public final class ByteUtils { @@ -186,58 +190,13 @@ public static byte[] toBytes(BeanObject value) { return beanBuilder.build().toByteArray(); } - // TODO: Due to the change of NodeInfo to Broker. This and the test should be updated. /** Serialize ClusterInfo by protocol buffer. */ public static byte[] toBytes(ClusterInfo value) { return ClusterInfoOuterClass.ClusterInfo.newBuilder() .setClusterId(value.clusterId()) - .addAllNodeInfo( - value.brokers().stream() - .map( - nodeInfo -> - ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder() - .setId(nodeInfo.id()) - .setHost(nodeInfo.host()) - .setPort(nodeInfo.port()) - .build()) - .collect(Collectors.toList())) - .addAllTopic( - value.topics().values().stream() - .map( - topicClass -> - ClusterInfoOuterClass.ClusterInfo.Topic.newBuilder() - .setName(topicClass.name()) - .putAllConfig(topicClass.config().raw()) - .setInternal(topicClass.internal()) - .addAllPartition( - topicClass.topicPartitions().stream() - .map(TopicPartition::partition) - .collect(Collectors.toList())) - .build()) - .collect(Collectors.toList())) - .addAllReplica( - value.replicas().stream() - .map( - replica -> - ClusterInfoOuterClass.ClusterInfo.Replica.newBuilder() - .setTopic(replica.topic()) - .setPartition(replica.partition()) - .setNodeInfo( - ClusterInfoOuterClass.ClusterInfo.NodeInfo.newBuilder() - .setId(replica.broker().id()) - .setHost(replica.broker().host()) - .setPort(replica.broker().port()) - .build()) - .setLag(replica.lag()) - .setSize(replica.size()) - .setIsLeader(replica.isLeader()) - .setIsSync(replica.isSync()) - .setIsFuture(replica.isFuture()) - .setIsOffline(replica.isOffline()) - .setIsPreferredLeader(replica.isPreferredLeader()) - .setPath(replica.path()) - .build()) - .collect(Collectors.toList())) + .addAllBroker(value.brokers().stream().map(ByteUtils::toOuterClass).toList()) + .addAllTopic(value.topics().values().stream().map(ByteUtils::toOuterClass).toList()) + .addAllReplica(value.replicas().stream().map(ByteUtils::toOuterClass).toList()) .build() .toByteArray(); } @@ -328,51 +287,148 @@ public static BeanObject readBeanObject(byte[] bytes) throws SerializationExcept } } - // TODO: Due to the change of NodeInfo to Broker. This and the test should be updated. /** Deserialize to ClusterInfo with protocol buffer */ public static ClusterInfo readClusterInfo(byte[] bytes) { try { var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes); return ClusterInfo.of( outerClusterInfo.getClusterId(), - outerClusterInfo.getNodeInfoList().stream() - .map(nodeInfo -> Broker.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort())) - .collect(Collectors.toList()), + outerClusterInfo.getBrokerList().stream().map(ByteUtils::toBroker).toList(), outerClusterInfo.getTopicList().stream() - .map( - protoTopic -> - new Topic( - protoTopic.getName(), - new Config(protoTopic.getConfigMap()), - protoTopic.getInternal(), - Set.copyOf(protoTopic.getPartitionList()))) + .map(ByteUtils::toTopic) .collect(Collectors.toMap(Topic::name, Function.identity())), - outerClusterInfo.getReplicaList().stream() - .map( - replica -> - Replica.builder() - .topic(replica.getTopic()) - .partition(replica.getPartition()) - .broker( - Broker.of( - replica.getNodeInfo().getId(), - replica.getNodeInfo().getHost(), - replica.getNodeInfo().getPort())) - .lag(replica.getLag()) - .size(replica.getSize()) - .isLeader(replica.getIsLeader()) - .isSync(replica.getIsSync()) - .isFuture(replica.getIsFuture()) - .isOffline(replica.getIsOffline()) - .isPreferredLeader(replica.getIsPreferredLeader()) - .path(replica.getPath()) - .build()) - .collect(Collectors.toList())); + outerClusterInfo.getReplicaList().stream().map(ByteUtils::toReplica).toList()); } catch (InvalidProtocolBufferException ex) { throw new SerializationException(ex); } } + // ---------------------------Serialize To ProtoBuf Outer Class------------------------------- // + + private static BrokerOuterClass.Broker.DataFolder toOuterClass(Broker.DataFolder dataFolder) { + return BrokerOuterClass.Broker.DataFolder.newBuilder() + .setPath(dataFolder.path()) + .putAllPartitionSizes( + dataFolder.partitionSizes().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue))) + .putAllOrphanPartitionSizes( + dataFolder.orphanPartitionSizes().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue))) + .build(); + } + + private static TopicPartitionOuterClass.TopicPartition toOuterClass( + TopicPartition topicPartition) { + return TopicPartitionOuterClass.TopicPartition.newBuilder() + .setPartition(topicPartition.partition()) + .setTopic(topicPartition.topic()) + .build(); + } + + private static BrokerOuterClass.Broker toOuterClass(Broker broker) { + return BrokerOuterClass.Broker.newBuilder() + .setId(broker.id()) + .setHost(broker.host()) + .setPort(broker.port()) + .setIsController(broker.isController()) + .putAllConfig(broker.config().raw()) + .addAllDataFolder(broker.dataFolders().stream().map(ByteUtils::toOuterClass).toList()) + .addAllTopicPartitions( + broker.topicPartitions().stream().map(ByteUtils::toOuterClass).toList()) + .addAllTopicPartitionLeaders( + broker.topicPartitionLeaders().stream().map(ByteUtils::toOuterClass).toList()) + .build(); + } + + private static TopicOuterClass.Topic toOuterClass(Topic topic) { + return TopicOuterClass.Topic.newBuilder() + .setName(topic.name()) + .putAllConfig(topic.config().raw()) + .setInternal(topic.internal()) + .addAllPartitionIds(topic.partitionIds()) + .build(); + } + + private static ReplicaOuterClass.Replica toOuterClass(Replica replica) { + return ReplicaOuterClass.Replica.newBuilder() + .setTopic(replica.topic()) + .setPartition(replica.partition()) + .setBroker(toOuterClass(replica.broker())) + .setLag(replica.lag()) + .setSize(replica.size()) + .setIsInternal(replica.isInternal()) + .setIsLeader(replica.isLeader()) + .setIsAdding(replica.isAdding()) + .setIsRemoving(replica.isRemoving()) + .setIsSync(replica.isSync()) + .setIsFuture(replica.isFuture()) + .setIsOffline(replica.isOffline()) + .setIsPreferredLeader(replica.isPreferredLeader()) + .setPath(replica.path()) + .build(); + } + + // -------------------------Deserialize From ProtoBuf Outer Class----------------------------- // + + private static Broker.DataFolder toDataFolder(BrokerOuterClass.Broker.DataFolder dataFolder) { + return new Broker.DataFolder( + dataFolder.getPath(), + dataFolder.getPartitionSizesMap().entrySet().stream() + .collect( + Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue)), + dataFolder.getOrphanPartitionSizesMap().entrySet().stream() + .collect( + Collectors.toMap(entry -> TopicPartition.of(entry.getKey()), Map.Entry::getValue))); + } + + private static TopicPartition toTopicPartition( + TopicPartitionOuterClass.TopicPartition topicPartition) { + return new TopicPartition(topicPartition.getTopic(), topicPartition.getPartition()); + } + + private static Broker toBroker(BrokerOuterClass.Broker broker) { + return new Broker( + broker.getId(), + broker.getHost(), + broker.getPort(), + broker.getIsController(), + new Config(broker.getConfigMap()), + broker.getDataFolderList().stream().map(ByteUtils::toDataFolder).toList(), + broker.getTopicPartitionsList().stream() + .map(ByteUtils::toTopicPartition) + .collect(Collectors.toSet()), + broker.getTopicPartitionLeadersList().stream() + .map(ByteUtils::toTopicPartition) + .collect(Collectors.toSet())); + } + + private static Topic toTopic(TopicOuterClass.Topic topic) { + return new Topic( + topic.getName(), + new Config(topic.getConfigMap()), + topic.getInternal(), + Set.copyOf(topic.getPartitionIdsList())); + } + + private static Replica toReplica(ReplicaOuterClass.Replica replica) { + return Replica.builder() + .topic(replica.getTopic()) + .partition(replica.getPartition()) + .broker(toBroker(replica.getBroker())) + .lag(replica.getLag()) + .size(replica.getSize()) + .isInternal(replica.getIsInternal()) + .isLeader(replica.getIsLeader()) + .isAdding(replica.getIsAdding()) + .isRemoving(replica.getIsRemoving()) + .isSync(replica.getIsSync()) + .isFuture(replica.getIsFuture()) + .isOffline(replica.getIsOffline()) + .isPreferredLeader(replica.getIsPreferredLeader()) + .path(replica.getPath()) + .build(); + } + // --------------------------------ProtoBuf Primitive----------------------------------------- // /** diff --git a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto deleted file mode 100644 index a9a6db12f4..0000000000 --- a/common/src/main/proto/org/astraea/common/generated/ClusterInfo.proto +++ /dev/null @@ -1,37 +0,0 @@ -syntax = "proto3"; - -package org.astraea.common.generated; - -message ClusterInfo{ - string clusterId = 1; - - repeated NodeInfo nodeInfo = 2; - message NodeInfo { - int32 id = 1; - string host = 2; - int32 port = 3; - } - - repeated Topic topic = 3; - message Topic { - string name = 1; - map config = 2; - bool internal = 3; - repeated int32 partition = 4; - } - - repeated Replica replica = 4; - message Replica{ - string topic = 1; - int32 partition = 2; - NodeInfo nodeInfo = 3; - int64 lag = 4; - int64 size = 5; - bool isLeader = 6; - bool isSync = 7; - bool isFuture = 8; - bool isOffline = 9; - bool isPreferredLeader = 10; - string path = 11; - } -} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto b/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto new file mode 100644 index 0000000000..cf463e193c --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/Broker.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +import "org/astraea/common/generated/admin/TopicPartition.proto"; + +message Broker { + int32 id = 1; + string host = 2; + int32 port = 3; + bool isController = 4; + map config = 5; + repeated DataFolder dataFolder = 6; + repeated TopicPartition topicPartitions = 7; + repeated TopicPartition topicPartitionLeaders = 8; + + message DataFolder { + string path = 1; + map partitionSizes = 2; + map orphanPartitionSizes = 3; + } +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto b/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto new file mode 100644 index 0000000000..260f65ec64 --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/ClusterInfo.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +import "org/astraea/common/generated/admin/Broker.proto"; +import "org/astraea/common/generated/admin/Topic.proto"; +import "org/astraea/common/generated/admin/Replica.proto"; + +message ClusterInfo { + string clusterId = 1; + repeated Broker broker = 2; + repeated Topic topic = 3; + repeated Replica replica = 4; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto new file mode 100644 index 0000000000..598ade270e --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/Replica.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +import "org/astraea/common/generated/admin/Broker.proto"; + +message Replica { + string topic = 1; + int32 partition = 2; + Broker broker = 3; + bool isLeader = 7; + bool isSync = 10; + bool isOffline = 12; + bool isAdding = 8; + bool isRemoving = 9; + bool isFuture = 11; + bool isPreferredLeader = 13; + int64 lag = 4; + int64 size = 5; + string path = 14; + bool isInternal = 6; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto b/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto new file mode 100644 index 0000000000..cb46f4b9d7 --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/Topic.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +message Topic { + string name = 1; + map config = 2; + bool internal = 3; + repeated int32 partitionIds = 4; +} \ No newline at end of file diff --git a/common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto b/common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto new file mode 100644 index 0000000000..44f446c88d --- /dev/null +++ b/common/src/main/proto/org/astraea/common/generated/admin/TopicPartition.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package org.astraea.common.generated.admin; + +message TopicPartition { + int32 partition = 1; + string topic = 2; +} \ No newline at end of file diff --git a/common/src/test/java/org/astraea/common/ByteUtilsTest.java b/common/src/test/java/org/astraea/common/ByteUtilsTest.java index 616229e122..03e7a0cc76 100644 --- a/common/src/test/java/org/astraea/common/ByteUtilsTest.java +++ b/common/src/test/java/org/astraea/common/ByteUtilsTest.java @@ -17,6 +17,11 @@ package org.astraea.common; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Set; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.it.Service; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -68,4 +73,44 @@ void testBoolean2Bytes() { Assertions.assertArrayEquals(new byte[] {1}, ByteUtils.toBytes(true)); Assertions.assertArrayEquals(new byte[] {0}, ByteUtils.toBytes(false)); } + + @Test + void testReadAndToBytesClusterInfo() { + var topic = Utils.randomString(); + try (var service = Service.builder().numberOfBrokers(3).build()) { + try (var admin = Admin.of(service.bootstrapServers())) { + admin + .creator() + .topic(topic) + .numberOfPartitions(1) + .numberOfReplicas((short) 3) + .run() + .toCompletableFuture() + .join(); + Utils.sleep(Duration.ofSeconds(1)); + var clusterInfo = admin.clusterInfo(Set.of(topic)).toCompletableFuture().join(); + + Assertions.assertDoesNotThrow(() -> ByteUtils.toBytes(clusterInfo)); + var bytes = ByteUtils.toBytes(clusterInfo); + Assertions.assertDoesNotThrow(() -> ByteUtils.readClusterInfo(bytes)); + var deserializedClusterInfo = ByteUtils.readClusterInfo(bytes); + Assertions.assertEquals(clusterInfo.clusterId(), deserializedClusterInfo.clusterId()); + Assertions.assertEquals(clusterInfo.brokers(), deserializedClusterInfo.brokers()); + Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); + Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); + } + } + } + + @Test + void testReadAndToBytesEmptyClusterInfo() { + var clusterInfo = ClusterInfo.empty(); + var serializedInfo = ByteUtils.toBytes(clusterInfo); + var deserializedClusterInfo = ByteUtils.readClusterInfo(serializedInfo); + + Assertions.assertEquals(clusterInfo.clusterId(), deserializedClusterInfo.clusterId()); + Assertions.assertEquals(clusterInfo.brokers(), deserializedClusterInfo.brokers()); + Assertions.assertEquals(clusterInfo.topics(), deserializedClusterInfo.topics()); + Assertions.assertEquals(clusterInfo.replicas(), deserializedClusterInfo.replicas()); + } }