Skip to content

Commit

Permalink
[COMMON] don't reference to Broker from Replica (#1791)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored May 29, 2023
1 parent c453129 commit eb0c07d
Show file tree
Hide file tree
Showing 52 changed files with 291 additions and 324 deletions.
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/backup/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void restoreDistribution(ClusterInfo clusterInfo, String bootstrapServers
.sorted(
Comparator.comparing(
replica -> !replica.isLeader()))
.map(replica -> replica.broker().id())
.map(replica -> replica.brokerId())
.toList())))))
.configs(topic.config().raw())
.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ else if (specifiedByBroker) {
.join()
.replicaStream()
.filter(Replica::isLeader)
.filter(replica -> specifyBrokers.contains(replica.broker().id()))
.filter(replica -> specifyBrokers.contains(replica.brokerId()))
.map(replica -> TopicPartition.of(replica.topic(), replica.partition()))
.distinct()
.toList();
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ static class Placement {
final Optional<Long> size;

Placement(Replica replica, Optional<Long> size) {
this.brokerId = replica.broker().id();
this.brokerId = replica.brokerId();
this.directory = replica.path();
this.size = size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static class AddingReplica implements Response {
AddingReplica(Replica addingReplica, long leaderSize) {
this.topicName = addingReplica.topic();
this.partition = addingReplica.partition();
this.broker = addingReplica.broker().id();
this.broker = addingReplica.brokerId();
this.dataFolder = addingReplica.path();
this.size = addingReplica.size();
this.leaderSize = leaderSize;
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/org/astraea/app/web/TopicHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private Replica() {

Replica(org.astraea.common.admin.Replica replica) {
this(
replica.broker().id(),
replica.brokerId(),
replica.lag(),
replica.size(),
replica.isLeader(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void testPartitionSupplier() {
.join()
.replicaStream()
.filter(Replica::isLeader)
.filter(r -> r.broker().id() == 1)
.filter(r -> r.brokerId() == 1)
.map(Replica::topicPartition)
.collect(Collectors.toUnmodifiableSet());

Expand Down Expand Up @@ -239,7 +239,7 @@ void testPartitionSupplier() {
.join()
.replicaStream()
.filter(Replica::isLeader)
.filter(replica -> replica.broker().id() == 1)
.filter(replica -> replica.brokerId() == 1)
.map(Replica::topicPartition)
.collect(Collectors.toSet());
var selector2 = args.topicPartitionSelector();
Expand Down Expand Up @@ -273,8 +273,7 @@ void testPartitionSupplier() {
.replicaStream()
.findFirst()
.get()
.broker()
.id();
.brokerId();
var noPartitionBroker = (validBroker == 3) ? 1 : validBroker + 1;
args =
Argument.parse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ void testChangeOrder() {
(short) 10,
r ->
Replica.builder(r)
.broker(base.node(srcIter.next()))
.brokerId(base.node(srcIter.next()).id())
.isPreferredLeader(srcPrefIter.next())
.path(srcDirIter.next())
.build())
Expand All @@ -986,7 +986,7 @@ void testChangeOrder() {
(short) 10,
r ->
Replica.builder(r)
.broker(base.node(dstIter.next()))
.brokerId(base.node(dstIter.next()).id())
.isPreferredLeader(dstPrefIter.next())
.path(dstDirIter.next())
.build())
Expand Down
20 changes: 7 additions & 13 deletions app/src/test/java/org/astraea/app/web/ReassignmentHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ void testMigrateToAnotherBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.broker()
.id();
.brokerId();
var nextBroker =
SERVICE.dataFolders().keySet().stream().filter(i -> i != currentBroker).findAny().get();

Expand All @@ -88,8 +87,7 @@ void testMigrateToAnotherBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.broker()
.id());
.brokerId());
}
}

Expand All @@ -111,7 +109,7 @@ void testMigrateToAnotherPath() {
.findFirst()
.get();

var currentBroker = currentReplica.broker().id();
var currentBroker = currentReplica.brokerId();
var currentPath = currentReplica.path();
var nextPath =
SERVICE.dataFolders().get(currentBroker).stream()
Expand Down Expand Up @@ -170,8 +168,7 @@ void testExcludeSpecificBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.broker()
.id();
.brokerId();

var body =
String.format("{\"excludeNodes\": [{\"%s\": \"%s\"}]}", EXCLUDE_KEY, currentBroker);
Expand All @@ -194,8 +191,7 @@ void testExcludeSpecificBroker() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.broker()
.id());
.brokerId());
Assertions.assertEquals(
0,
admin.topicPartitionReplicas(Set.of(currentBroker)).toCompletableFuture().join().size());
Expand All @@ -221,8 +217,7 @@ void testExcludeSpecificBrokerTopic() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.broker()
.id();
.brokerId();

var body =
String.format(
Expand All @@ -247,8 +242,7 @@ void testExcludeSpecificBrokerTopic() {
.filter(replica -> replica.partition() == 0)
.findFirst()
.get()
.broker()
.id());
.brokerId());
Assertions.assertNotEquals(
0,
admin.topicPartitionReplicas(Set.of(currentBroker)).toCompletableFuture().join().size());
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private static ReplicaOuterClass.Replica toOuterClass(Replica replica) {
return ReplicaOuterClass.Replica.newBuilder()
.setTopic(replica.topic())
.setPartition(replica.partition())
.setBroker(toOuterClass(replica.broker()))
.setBrokerId(replica.brokerId())
.setLag(replica.lag())
.setSize(replica.size())
.setIsInternal(replica.isInternal())
Expand Down Expand Up @@ -414,7 +414,7 @@ private static Replica toReplica(ReplicaOuterClass.Replica replica) {
return Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.broker(toBroker(replica.getBroker()))
.brokerId(replica.getBrokerId())
.lag(replica.getLag())
.size(replica.getSize())
.isInternal(replica.getIsInternal())
Expand Down
9 changes: 5 additions & 4 deletions common/src/main/java/org/astraea/common/admin/AdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -685,9 +685,10 @@ private CompletionStage<List<Replica>> replicas(Set<String> topics) {
.isInternal(internal)
.isAdding(isAdding)
.isRemoving(isRemoving)
.broker(
brokers.getOrDefault(
node.id(), Broker.of(node)))
.brokerId(
brokers
.getOrDefault(node.id(), Broker.of(node))
.id())
.lag(pathAndReplica.getValue().offsetLag())
.size(pathAndReplica.getValue().size())
.isLeader(
Expand Down Expand Up @@ -716,7 +717,7 @@ private CompletionStage<List<Replica>> replicas(Set<String> topics) {
.sorted(
Comparator.comparing(Replica::topic)
.thenComparing(Replica::partition)
.thenComparing(r -> r.broker().id()))
.thenComparing(r -> r.brokerId()))
.toList());
}

Expand Down
19 changes: 9 additions & 10 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ static boolean placementMatch(
.sorted(
Comparator.comparing(Replica::isPreferredLeader)
.reversed()
.thenComparing(r -> r.broker().id()))
.collect(Collectors.toUnmodifiableList());
.thenComparing(Replica::brokerId))
.toList();
final var targetIds =
targetReplicas.stream()
.sorted(
Comparator.comparing(Replica::isPreferredLeader)
.reversed()
.thenComparing(r -> r.broker().id()))
.collect(Collectors.toUnmodifiableList());
.thenComparing(Replica::brokerId))
.toList();
return IntStream.range(0, sourceIds.size())
.allMatch(
index -> {
final var source = sourceIds.get(index);
final var target = targetIds.get(index);
return source.isPreferredLeader() == target.isPreferredLeader()
&& source.broker().id() == target.broker().id()
&& source.brokerId() == target.brokerId()
&& Objects.equals(source.path(), target.path());
});
}
Expand All @@ -121,7 +121,7 @@ static String toString(ClusterInfo allocation) {
.forEach(
log ->
stringBuilder.append(
String.format("(%s, %s) ", log.broker().id(), log.path())));
String.format("(%s, %s) ", log.brokerId(), log.path())));

stringBuilder.append(System.lineSeparator());
});
Expand Down Expand Up @@ -332,24 +332,23 @@ default Map<Integer, Set<String>> brokerFolders() {
// implements following methods by smart index to speed up the queries

default Stream<Replica> replicaStream(int broker) {
return replicaStream().filter(r -> r.broker().id() == broker);
return replicaStream().filter(r -> r.brokerId() == broker);
}

default Stream<Replica> replicaStream(String topic) {
return replicaStream().filter(r -> r.topic().equals(topic));
}

default Stream<Replica> replicaStream(BrokerTopic brokerTopic) {
return replicaStream(brokerTopic.topic()).filter(r -> r.broker().id() == brokerTopic.broker());
return replicaStream(brokerTopic.topic()).filter(r -> r.brokerId() == brokerTopic.broker());
}

default Stream<Replica> replicaStream(TopicPartition partition) {
return replicaStream(partition.topic()).filter(r -> r.partition() == partition.partition());
}

default Stream<Replica> replicaStream(TopicPartitionReplica replica) {
return replicaStream(replica.topicPartition())
.filter(r -> r.broker().id() == replica.brokerId());
return replicaStream(replica.topicPartition()).filter(r -> r.brokerId() == replica.brokerId());
}

// ---------------------[abstract methods]---------------------//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ClusterInfoBuilder addNode(Set<Integer> brokerIds) {
+ " but another broker with this id already existed");
});
return Stream.concat(nodes.stream(), brokerIds.stream().map(ClusterInfoBuilder::fakeNode))
.collect(Collectors.toUnmodifiableList());
.toList();
});
}

Expand Down Expand Up @@ -132,7 +132,7 @@ public ClusterInfoBuilder addFolders(Map<Integer, Set<String>> folders) {
.toList());
else return node;
})
.collect(Collectors.toUnmodifiableList());
.toList();
});
}

Expand Down Expand Up @@ -174,15 +174,15 @@ public ClusterInfoBuilder addTopic(
nodes.stream()
.collect(
Collectors.toUnmodifiableMap(
node -> node,
Broker::id,
node ->
node.dataFolders().stream()
.collect(
Collectors.toMap(
Broker.DataFolder::path, x -> new AtomicInteger()))));
replicas.forEach(
replica ->
folderLogCounter.get(replica.broker()).get(replica.path()).incrementAndGet());
folderLogCounter.get(replica.brokerId()).get(replica.path()).incrementAndGet());

folderLogCounter.forEach(
(node, folders) -> {
Expand All @@ -200,7 +200,7 @@ public ClusterInfoBuilder addTopic(
index -> {
final Broker broker = nodeSelector.next();
final String path =
folderLogCounter.get(broker).entrySet().stream()
folderLogCounter.get(broker.id()).entrySet().stream()
.min(Comparator.comparing(x -> x.getValue().get()))
.map(
entry -> {
Expand All @@ -212,7 +212,7 @@ public ClusterInfoBuilder addTopic(
return Replica.builder()
.topic(tp.topic())
.partition(tp.partition())
.broker(broker)
.brokerId(broker.id())
.isAdding(false)
.isRemoving(false)
.lag(0)
Expand All @@ -227,8 +227,7 @@ public ClusterInfoBuilder addTopic(
}))
.map(mapper);

return Stream.concat(replicas.stream(), newTopic)
.collect(Collectors.toUnmodifiableList());
return Stream.concat(replicas.stream(), newTopic).toList();
});
}

Expand All @@ -239,9 +238,7 @@ public ClusterInfoBuilder addTopic(
* @return this.
*/
public ClusterInfoBuilder mapLog(Function<Replica, Replica> mapper) {
return applyReplicas(
(nodes, replicas) ->
replicas.stream().map(mapper).collect(Collectors.toUnmodifiableList()));
return applyReplicas((nodes, replicas) -> replicas.stream().map(mapper).toList());
}

/**
Expand Down Expand Up @@ -269,12 +266,12 @@ public ClusterInfoBuilder reassignReplica(
r -> {
if (r.topicPartitionReplica().equals(replica)) {
matched.set(true);
return Replica.builder(r).broker(newNode).path(toDir).build();
return Replica.builder(r).brokerId(newNode.id()).path(toDir).build();
} else {
return r;
}
})
.collect(Collectors.toUnmodifiableList());
.toList();
if (!matched.get()) throw new IllegalArgumentException("No such replica: " + replica);
return collect;
});
Expand Down Expand Up @@ -307,7 +304,7 @@ public ClusterInfoBuilder setPreferredLeader(TopicPartitionReplica replica) {
return r;
}
})
.collect(Collectors.toUnmodifiableList());
.toList();
if (!matched.get()) throw new IllegalArgumentException("No such replica: " + replica);

return collect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class OptimizedClusterInfo implements ClusterInfo {
all.stream()
.collect(
Collectors.groupingBy(
r -> BrokerTopic.of(r.broker().id(), r.topic()),
r -> BrokerTopic.of(r.brokerId(), r.topic()),
Collectors.toUnmodifiableList())));
this.byBrokerTopicForLeader =
Lazy.of(
Expand All @@ -83,16 +83,15 @@ class OptimizedClusterInfo implements ClusterInfo {
.filter(Replica::isLeader)
.collect(
Collectors.groupingBy(
r -> BrokerTopic.of(r.broker().id(), r.topic()),
r -> BrokerTopic.of(r.brokerId(), r.topic()),
Collectors.toUnmodifiableList())));

this.byBroker =
Lazy.of(
() ->
all.stream()
.collect(
Collectors.groupingBy(
r -> r.broker().id(), Collectors.toUnmodifiableList())));
Collectors.groupingBy(r -> r.brokerId(), Collectors.toUnmodifiableList())));

this.byTopic =
Lazy.of(
Expand Down
Loading

0 comments on commit eb0c07d

Please sign in to comment.