Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BALANCER] Demonstration of ResourceBalancer & ResourceUsageHint #1674

Closed

Conversation

garyparrot
Copy link
Collaborator

這個 PR 描述 POC 版本的 ResourceBalancer 實作和 ResourceUsageHint 框架。

這個 PR 純討論用,裡面的程式碼不是以要合併為目標下去撰寫。

Comment on lines +25 to +35
public interface ResourceUsageHint {

ResourceUsage evaluateClusterResourceUsage(
ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target);

ResourceUsage evaluateReplicaResourceUsage(
ClusterInfo clusterInfo, ClusterBean clusterBean, Replica target);

Collection<ResourceCapacity> evaluateClusterResourceCapacity(
ClusterInfo clusterInfo, ClusterBean clusterBean);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這段是 ResourceUsageHint 的實作,裡面就三個函數

evaluateClusterResourceCapacity 回傳這個 ResourceUsageHint 管理哪些 ResourceCapacity,每個 ResourceCapacity 描述的建議值,還有提供檢查解合法的 Predicate 和比較優劣程度的函數 idealness 和對應的 comparator(可能後面會移除)

Comment on lines +233 to +269
Collection<ResourceCapacity> evaluateIngressResourceCapacity(
ClusterInfo clusterInfo, ClusterBean clusterBean) {
final var cachedCalculation =
calculationCache.computeIfAbsent(clusterBean, CachedCalculation::new);
final var sumIngress =
cachedCalculation.partitionIngressRate.values().stream().mapToDouble(x -> x).sum();
final var avgIngressPerBroker = sumIngress / clusterInfo.brokers().size();

Set<ResourceCapacity> a = clusterInfo.brokers().stream()
.map(
broker ->
new NetworkResourceCapacity(
NETWORK_COST_BROKER_RESOURCE_PREFIX_INGRESS + broker.id(), avgIngressPerBroker))
.collect(Collectors.toSet());

a.add(new NetworkResourceCapacity(NETWORK_COST_REPLICA_RESOURCE_PREFIX_INGRESS, 0));
return a;
}

Collection<ResourceCapacity> evaluateEgressResourceCapacity(
ClusterInfo clusterInfo, ClusterBean clusterBean) {
final var cachedCalculation =
calculationCache.computeIfAbsent(clusterBean, CachedCalculation::new);
final var sumEgress =
cachedCalculation.partitionEgressRate.values().stream().mapToDouble(x -> x).sum();
final var avgEgressPerBroker = sumEgress / clusterInfo.brokers().size();

Set<ResourceCapacity> a = clusterInfo.brokers().stream()
.map(
broker ->
new NetworkResourceCapacity(
NETWORK_COST_BROKER_RESOURCE_PREFIX_EGRESS + broker.id(), avgEgressPerBroker))
.collect(Collectors.toSet());

a.add(new NetworkResourceCapacity(NETWORK_COST_REPLICA_RESOURCE_PREFIX_EGRESS, 0));
return a;
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這是 NetworkCost 會回傳的 ResourceCapacity,他會希望每個 Broker 的網路流量趨近 整體叢集流量/節點數量。然後有對應的 idealness 和 comparator 實作,Predicator 的部分沒有,因為他不是 constraint。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1671 (comment)

這一段在這個部分

Comment on lines +126 to +164
public Collection<ResourceCapacity> evaluateClusterResourceCapacity(ClusterInfo clusterInfo, ClusterBean clusterBean) {
return List.of(new ResourceCapacity() {
@Override
public String resourceName() {
return "migrated_leaders";
}

@Override
public double optimalUsage() {
return config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE);
}

@Override
public double idealness(ResourceUsage usage) {
var a = usage.usage().getOrDefault(resourceName(), 0.0) / optimalUsage();
return a;
}

@Override
public Comparator<ResourceUsage> usageIdealnessComparator() {
return Comparator.comparingDouble(this::idealness);
// return (a,b) -> {
// var aa = a.usage().getOrDefault(resourceName(), 0.0);
// var bb = b.usage().getOrDefault(resourceName(), 0.0);

// if(aa >= optimalUsage() && bb >= optimalUsage()) return 0;
// if(aa >= optimalUsage()) return 1;
// if(bb >= optimalUsage()) return -1;
// return 0;
// };
// return Comparator.comparingDouble(ru -> ru.usage().getOrDefault(resourceName(), 0.0));
}

@Override
public Predicate<ResourceUsage> usageValidnessPredicate() {
return (ru) -> ru.usage().getOrDefault(resourceName(), 0.0) < optimalUsage();
}
});
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面是修正後的 ReplicaLeaderCost,他有一個 Predicate 描述輸入的叢集 ResourceUsage 是不是合法的 (不超過特定數目的 leader 被變更)。

import org.astraea.common.cost.ResourceUsage;
import org.astraea.common.cost.ResourceUsageHint;

public class ResourceBalancer implements Balancer {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這是目前 POC 版本的 Resource Balancer 的實作

Comment on lines +360 to +368
static Comparator<Replica> usageDominationComparator(
List<ResourceCapacity> resourceCapacities, Function<Replica, ResourceUsage> usageHints) {
var cmp = Comparator.<ResourceUsage>comparingDouble(u -> resourceCapacities.stream()
.mapToDouble(c -> c.idealness(u))
.average()
.orElseThrow());

return Comparator.comparing(usageHints, cmp).reversed();
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個 ComparatorResourceBalancer 用來衡量哪個 Replica 是重要的

Comment on lines +385 to +407
static Comparator<ResourceUsage> usageIdealnessDominationComparator(ResourceUsage base, List<ResourceCapacity> resourceCapacities) {
var comparators =
resourceCapacities.stream()
.map(ResourceCapacity::usageIdealnessComparator)
.collect(Collectors.toUnmodifiableSet());

Comparator<ResourceUsage> dominatedCmp = (lhs, rhs) -> {
var dominatedByL = comparators.stream().filter(e -> e.compare(lhs, rhs) <= 0).count();
var dominatedByR = comparators.stream().filter(e -> e.compare(rhs, lhs) <= 0).count();

return -Long.compare(dominatedByL, dominatedByR);
};

// return usageIdealnessDominationComparator(resourceCapacities)
// .thenComparingDouble(usage -> resourceCapacities.stream()
// .mapToDouble(ca -> ca.idealness(usage))
// .average()
// .orElseThrow());
return dominatedCmp.thenComparingDouble(usage -> resourceCapacities.stream()
.mapToDouble(ca -> ca.idealness(usage))
.average()
.orElseThrow());
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個 ComparatorResourceBalancer 用來衡量每個 tweak 的潛力,裡面的實作是一個很假的 dominant sort,實際上這個 Comparator 應該會讓某些輸入的排序出現 violation,不過目前的實驗情境因為一些巧妙的關係所以沒有觸發,後面也許會改正。

Comment on lines +188 to +260
private void search(
Consumer<List<Replica>> updateAnswer,
int next,
List<Replica> originalReplicas,
Map<TopicPartition, List<Replica>> currentAllocation,
ResourceUsage currentResourceUsage) {
if (System.currentTimeMillis() > deadline)
return;
if (originalReplicas.size() == next) {
// if this is a complete answer, call update function and return
updateAnswer.accept(
currentAllocation.entrySet().stream()
.flatMap(x -> x.getValue().stream())
.collect(Collectors.toUnmodifiableList()));
} else {
var nextReplica = originalReplicas.get(next);

List<Map.Entry<ResourceUsage, Tweak>> possibleTweaks =
tweaks(currentAllocation, nextReplica).stream()
.map(
tweaks -> {
var usageAfterTweaked = new ResourceUsage(currentResourceUsage.usage());
tweaks.toRemove.stream()
.flatMap(this::evaluateReplicaUsage)
.forEach(usageAfterTweaked::removeUsage);
tweaks.toReplace.stream()
.flatMap(this::evaluateReplicaUsage)
.forEach(usageAfterTweaked::mergeUsage);

return Map.entry(usageAfterTweaked, tweaks);
})
.filter(e -> feasibleUsage.test(e.getKey()))
.sorted(Map.Entry.comparingByKey(usageIdealnessDominationComparator(currentResourceUsage, this.resourceCapacities)))
// TODO: maybe change to probability style
.limit(trials(next))
.collect(Collectors.toUnmodifiableList());

for (Map.Entry<ResourceUsage, Tweak> entry : possibleTweaks) {
// the tweak we are going to use
var newResourceUsage = entry.getKey();
var tweaks = entry.getValue();

// replace the replicas
tweaks.toRemove.stream()
.filter(replica -> !currentAllocation.get(replica.topicPartition()).remove(replica))
.forEach(
nonexistReplica -> {
throw new IllegalStateException(
"Attempt to remove "
+ nonexistReplica.topicPartitionReplica()
+ " but it does not exists");
});
tweaks.toReplace.forEach(
replica -> currentAllocation.get(replica.topicPartition()).add(replica));

// start next search stage
search(updateAnswer, next + 1, originalReplicas, currentAllocation, newResourceUsage);

// undo the tweak, restore the previous state
tweaks.toReplace.stream()
.filter(replica -> !currentAllocation.get(replica.topicPartition()).remove(replica))
.forEach(
nonexistReplica -> {
throw new IllegalStateException(
"Attempt to remove "
+ nonexistReplica.topicPartitionReplica()
+ " but it does not exists");
});
tweaks.toRemove.forEach(
replica -> currentAllocation.get(replica.topicPartition()).add(replica));
}
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

搜尋演算法的主體在這裡,每個 replica 會遞迴一次,如果有 10000 個 replica 則這個函數會呼叫 10000 次,在預設的 JVM stack size 下會 StackOverflow,目前的解法是調整 -Xss20m,這個數字足以容納 10000+ 次的 recursion,後面比較正式的版本會再處理這部分。

@garyparrot garyparrot changed the title [BALANCER] Demonistration of ResourceBalancer & ResourceUsageHint [BALANCER] Demonstration of ResourceBalancer & ResourceUsageHint Apr 26, 2023
@garyparrot garyparrot closed this May 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant