Skip to content

Commit

Permalink
[COMMON] Replace NodeInfo with Broker (#1763)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaohengstudent authored May 24, 2023
1 parent 28ec2c6 commit ced840f
Show file tree
Hide file tree
Showing 80 changed files with 566 additions and 947 deletions.
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/backup/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void restoreDistribution(ClusterInfo clusterInfo, String bootstrapServers
.sorted(
Comparator.comparing(
replica -> !replica.isLeader()))
.map(replica -> replica.nodeInfo().id())
.map(replica -> replica.broker().id())
.toList())))))
.configs(topic.config().raw())
.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ else if (specifiedByBroker) {
.join()
.replicaStream()
.filter(Replica::isLeader)
.filter(replica -> specifyBrokers.contains(replica.nodeInfo().id()))
.filter(replica -> specifyBrokers.contains(replica.broker().id()))
.map(replica -> TopicPartition.of(replica.topic(), replica.partition()))
.distinct()
.toList();
Expand Down
16 changes: 8 additions & 8 deletions app/src/main/java/org/astraea/app/publisher/MetricPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.astraea.app.argument.StringMapField;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Broker;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.collector.MetricFetcher;

Expand All @@ -50,17 +50,17 @@ static void execute(Arguments arguments) {
.clientSupplier(
() ->
admin
.nodeInfos()
.brokers()
.thenApply(
nodes ->
nodes.stream()
brokers ->
brokers.stream()
.collect(
Collectors.toUnmodifiableMap(
NodeInfo::id,
node ->
Broker::id,
broker ->
JndiClient.of(
node.host(),
arguments.idToJmxPort().apply(node.id()))))))
broker.host(),
arguments.idToJmxPort().apply(broker.id()))))))
.fetchBeanDelay(arguments.period)
.fetchMetadataDelay(Duration.ofMinutes(5))
.threads(3)
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ static class Placement {
final Optional<Long> size;

Placement(Replica replica, Optional<Long> size) {
this.brokerId = replica.nodeInfo().id();
this.brokerId = replica.broker().id();
this.directory = replica.path();
this.size = size;
}
Expand Down
10 changes: 7 additions & 3 deletions app/src/main/java/org/astraea/app/web/BrokerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.TopicPartition;

class BrokerHandler implements Handler {
Expand All @@ -44,8 +44,12 @@ CompletionStage<Set<Integer>> brokers(Optional<String> target) {
.orElseGet(
() ->
admin
.nodeInfos()
.thenApply(ns -> ns.stream().map(NodeInfo::id).collect(Collectors.toSet())));
.brokers()
.thenApply(
ns ->
ns.stream()
.map(org.astraea.common.admin.Broker::id)
.collect(Collectors.toSet())));
} catch (NumberFormatException e) {
return CompletableFuture.failedFuture(
new NoSuchElementException("the broker id must be number"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import java.util.stream.Stream;
import org.astraea.common.FutureUtils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.admin.TopicPartitionReplica;
Expand Down Expand Up @@ -110,7 +110,7 @@ public CompletionStage<Response> post(Channel channel) {
availableBrokers.stream()
.filter(
b -> b.topicPartitions().contains(tp))
.map(NodeInfo::id)
.map(Broker::id)
.toList();
if (!ids.isEmpty()) return ids;
return List.of(
Expand Down Expand Up @@ -211,7 +211,7 @@ static class AddingReplica implements Response {
AddingReplica(Replica addingReplica, long leaderSize) {
this.topicName = addingReplica.topic();
this.partition = addingReplica.partition();
this.broker = addingReplica.nodeInfo().id();
this.broker = addingReplica.broker().id();
this.dataFolder = addingReplica.path();
this.size = addingReplica.size();
this.leaderSize = leaderSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.commons.math3.distribution.IntegerDistribution;
import org.apache.commons.math3.util.Pair;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.admin.TopicPartitionReplica;

Expand Down Expand Up @@ -62,7 +62,7 @@ public CompletionStage<Result> apply(Admin admin) {
admin.waitPartitionLeaderSynced(
Map.of(topicName, partitions), Duration.ofSeconds(4)))
.thenCompose(ignored -> admin.brokers())
.thenApply(brokers -> brokers.stream().map(NodeInfo::id).sorted().toList())
.thenApply(brokers -> brokers.stream().map(Broker::id).sorted().toList())
.thenCompose(
brokerIds -> {
var distribution =
Expand Down
16 changes: 7 additions & 9 deletions app/src/main/java/org/astraea/app/web/ThrottleHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.astraea.common.EnumInfo;
import org.astraea.common.FutureUtils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.BrokerConfigs;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.TopicConfigs;
import org.astraea.common.admin.TopicPartitionReplica;
import org.astraea.common.json.TypeRef;
Expand Down Expand Up @@ -100,9 +100,8 @@ public CompletionStage<Response> post(Channel channel) {

var topicToAppends =
admin
.nodeInfos()
.thenApply(
nodeInfos -> nodeInfos.stream().map(NodeInfo::id).collect(Collectors.toSet()))
.brokers()
.thenApply(brokers -> brokers.stream().map(Broker::id).collect(Collectors.toSet()))
.thenCompose(admin::topicPartitionReplicas)
.thenApply(
replicas ->
Expand Down Expand Up @@ -197,9 +196,8 @@ public CompletionStage<Response> delete(Channel channel) {

var topicToSubtracts =
admin
.nodeInfos()
.thenApply(
nodeInfos -> nodeInfos.stream().map(NodeInfo::id).collect(Collectors.toSet()))
.brokers()
.thenApply(brokers -> brokers.stream().map(Broker::id).collect(Collectors.toSet()))
.thenCompose(admin::topicPartitionReplicas)
.thenApply(
replicas -> {
Expand Down Expand Up @@ -251,11 +249,11 @@ public CompletionStage<Response> delete(Channel channel) {

var brokerToUnset =
admin
.nodeInfos()
.brokers()
.thenApply(
ns ->
ns.stream()
.map(NodeInfo::id)
.map(Broker::id)
.filter(
id ->
!channel.queries().containsKey("broker")
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/TopicHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private Replica() {

Replica(org.astraea.common.admin.Replica replica) {
this(
replica.nodeInfo().id(),
replica.broker().id(),
replica.lag(),
replica.size(),
replica.isLeader(),
Expand Down
3 changes: 1 addition & 2 deletions app/src/main/java/org/astraea/app/web/WebService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricSensor;
Expand Down Expand Up @@ -76,7 +75,7 @@ public WebService(
brokers.stream()
.collect(
Collectors.toUnmodifiableMap(
NodeInfo::id,
Broker::id,
b -> JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id()))));
yield List.of(
MetricStore.Receiver.local(() -> admin.brokers().thenApply(asBeanClientMap)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void testPartitionSupplier() {
.join()
.replicaStream()
.filter(Replica::isLeader)
.filter(r -> r.nodeInfo().id() == 1)
.filter(r -> r.broker().id() == 1)
.map(Replica::topicPartition)
.collect(Collectors.toUnmodifiableSet());

Expand Down Expand Up @@ -239,7 +239,7 @@ void testPartitionSupplier() {
.join()
.replicaStream()
.filter(Replica::isLeader)
.filter(replica -> replica.nodeInfo().id() == 1)
.filter(replica -> replica.broker().id() == 1)
.map(Replica::topicPartition)
.collect(Collectors.toSet());
var selector2 = args.topicPartitionSelector();
Expand Down Expand Up @@ -273,7 +273,7 @@ void testPartitionSupplier() {
.replicaStream()
.findFirst()
.get()
.nodeInfo()
.broker()
.id();
var noPartitionBroker = (validBroker == 3) ? 1 : validBroker + 1;
args =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.balancer.AlgorithmConfig;
Expand Down Expand Up @@ -976,7 +976,7 @@ void testChangeOrder() {
(short) 10,
r ->
Replica.builder(r)
.nodeInfo(base.node(srcIter.next()))
.broker(base.node(srcIter.next()))
.isPreferredLeader(srcPrefIter.next())
.path(srcDirIter.next())
.build())
Expand All @@ -992,7 +992,7 @@ void testChangeOrder() {
(short) 10,
r ->
Replica.builder(r)
.nodeInfo(base.node(dstIter.next()))
.broker(base.node(dstIter.next()))
.isPreferredLeader(dstPrefIter.next())
.path(dstDirIter.next())
.build())
Expand Down Expand Up @@ -1343,7 +1343,7 @@ private MetricStore metricStore(Admin admin, Set<String> costFunctions) {
brokers.stream()
.collect(
Collectors.toUnmodifiableMap(
NodeInfo::id,
Broker::id,
b ->
JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))));
var cf = Utils.costFunctions(costFunctions, HasClusterCost.class, Configuration.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void testMigrateToAnotherBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.nodeInfo()
.broker()
.id();
var nextBroker =
SERVICE.dataFolders().keySet().stream().filter(i -> i != currentBroker).findAny().get();
Expand Down Expand Up @@ -88,7 +88,7 @@ void testMigrateToAnotherBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.nodeInfo()
.broker()
.id());
}
}
Expand All @@ -111,7 +111,7 @@ void testMigrateToAnotherPath() {
.findFirst()
.get();

var currentBroker = currentReplica.nodeInfo().id();
var currentBroker = currentReplica.broker().id();
var currentPath = currentReplica.path();
var nextPath =
SERVICE.dataFolders().get(currentBroker).stream()
Expand Down Expand Up @@ -170,7 +170,7 @@ void testExcludeSpecificBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.nodeInfo()
.broker()
.id();

var body =
Expand All @@ -194,7 +194,7 @@ void testExcludeSpecificBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.nodeInfo()
.broker()
.id());
Assertions.assertEquals(
0,
Expand All @@ -221,7 +221,7 @@ void testExcludeSpecificBrokerTopic() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.nodeInfo()
.broker()
.id();

var body =
Expand All @@ -247,7 +247,7 @@ void testExcludeSpecificBrokerTopic() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.nodeInfo()
.broker()
.id());
Assertions.assertNotEquals(
0,
Expand Down
10 changes: 5 additions & 5 deletions app/src/test/java/org/astraea/app/web/ThrottleHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.astraea.common.DataRate;
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.BrokerConfigs;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.TopicConfigs;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.json.JsonConverter;
Expand All @@ -55,13 +55,13 @@ static void closeService() {
public void cleanup() {
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
admin
.nodeInfos()
.brokers()
.thenApply(
ns ->
ns.stream()
.collect(
Collectors.toMap(
NodeInfo::id,
Broker::id,
ignored ->
Set.of(
BrokerConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG,
Expand Down Expand Up @@ -509,11 +509,11 @@ void testDelete() {
Runnable setThrottle =
() -> {
admin
.nodeInfos()
.brokers()
.thenApply(
ns ->
ns.stream()
.map(NodeInfo::id)
.map(Broker::id)
.collect(
Collectors.toMap(
n -> n,
Expand Down
Loading

0 comments on commit ced840f

Please sign in to comment.