-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Metric] 討論 MetricStore 支援 retry 機制 #1660
Comments
我們想要等到有 "足夠" metric ,但其實 如果要在 /**
* Block until all given broker, topic-partition, type of metrics has value.
* @param idAndProperties the broker id and corresponding partition, metric type that we want
*/
ClusterBean blockUntilExist(Map<Integer, Collection<Map<String, String>>> idAndProperties); 這個方法會不會有點偏向特定用途? 或者,不斷重試會比較通用,但是檢查的程式碼仍然會放在 caller 。 ClusterBean blockUntilExist(Function<ClusterBean, Boolean> allValueExist); |
加入新的方法會複雜化這個物件,我們是否有機會沿用 metrics sensor 這個方法來達到 「監聽」的用途? |
原來如此,請問是指可以在 目前想到需要處理以下問題:
每個 broker 都有共通 or 各自要拉的 metric,所以在判斷的時候也要知道哪個 broker 沒有拉到 metric,而 並不是每個 "broker" 都會有 不知道我理解有沒有錯? |
我的想法是要做一個介面讓使用者可以輸入特定的量,然後實作其實是用 sensor 來做,例如 sensor 可以檢查現在已經存在 store 的 metrics 有沒有滿足
「有多少broker」是條件之一嗎? |
請問是讓
我認為是需要的,原先也是因為某些 broker 還沒拉回來 metric 所以資料不足,無法判斷。這裡 broker 還沒拉回來不一定是 broker 有問題,可能只是拉取順序的問題。 |
@chinghongfang 既然規格和需求已經明確,可否麻煩你設計看看API?一種是全新的實作用來監控內部的狀態並回報,另一種則是使用/擴充 metrics sensor 來完成看看 |
好的,我來試著設計看看。 |
因為一直想不出來要如何設計,所以這邊先整理思緒。 目的這次修改目的是:減少重複的程式碼 (Assignor, Balancer) 限制這邊不希望複雜化 另外,是要擴展 因為 "完整的" 定義會隨使用端有所不同 (Assignor, Balancer 要蒐集全部 partition,Partitioner 要盡快),所以 "完整的" 定義將要放在使用端。 推論
// MetricSensor.java
default boolean complete(ClusterBean clusterBean){return true;} 在 // Assignor.java
metricStore =
MetricStore.builder()
.localReceiver(clientSupplier)
.sensorsSupplier(
() ->
this.costFunction
.metricSensor()
.map(
s ->
Map.of(
MetricSensor.of(
List.of(s),
clusterBean -> {
return admin
.brokers()
.thenApply(
brokers ->
brokers.stream()
.map(NodeInfo::id)
.map(
id ->
clusterBean
.all()
.getOrDefault(id, List.of()))
.map(
beans ->
beans.stream()
.map(
HasBeanObject::beanObject)
.collect(
Collectors
.toUnmodifiableList()))
.map(
beans ->
s.fetch(
MBeanClient.of(beans),
clusterBean))
.noneMatch(Collection::isEmpty))
.toCompletableFuture()
.join();
}),
(BiConsumer<Integer, Exception>) (integer, e) -> {}))
.orElse(Map.of()))
.build(); 這樣的實做非常不容易使用,也許需要放掉某些限制,再來實做好用的介面,我會再試著找找看方法,若有其他想法再上來更新。 |
@chinghongfang 可能故事有點太複雜了,一個簡單的版本如下: default void wait(Predicate<ClusterBean> check, Duration timeout) {
while (System.currentTimeMillis() < timeout.toMillis()) {
try {
if (check.test(clusterBean())) return;
} catch (NoSufficientMetricsException e) {
Utils.sleep(Duration.ofSeconds(1));
}
}
throw new RuntimeException("Failed to fetch clusterBean due to timeout");
} 這個版本有一個副作用,就是會過於頻繁測試,更好的方式是在 cluster bean 有更新的時候再測試,這也是為何我說可以考慮用 metric sensor 去做,因為 metric sensor 可以明確知道何時有新的資料(也就是新的 cluster bean) |
#1524 的例子藉由上述的範例就可以改寫成: wait(
clusterBean ->
costFunction.partitionCost(clusterInfo, clusterBean).value().values().stream()
.noneMatch(v -> Double.isNaN(v)),
Duration.ofSeconds(1)); |
@harryteng9527 麻煩你也看一下喔 |
default void wait(Predicate<ClusterBean> check, Duration timeout) { 看起來這段是放在
認同。
"metric sensor 可以明確知道何時有新的資料",請問 @chia7712 這個意思是 |
MetricStore#wait 這個介面可以不用動,要改的實作的方式,也就是 MetricStore impl中會有一個特殊的sensor專門用來處理event,那個event就是每次呼叫wait的時候會建立一個event,等待 sensor 被呼叫時拿取並且觸發。所謂的被呼叫就是「真的有新metrics」時才會觸發,就可以避免過度觸發的問題 這個優化可以再討論,我建議你先把基本的wait完成 |
我想確認一下想法,也就是說,
那我再按照這個想法實做,感謝 @chia7712 學長幫忙! |
@chinghongfang 沒錯喔,描述得很好。可以就實作難度評估一下,分成兩個階段來完成也可以 |
related #1524 (comment) 此 issue 討論要如何新增
retry機制
在 MetricStore 中Consumer 剛開始啟動並使用 CostAwareAssignor 分配 partitions 時,會因為 metrics 還沒完整拉取完畢(導致有些 partitions 會被當作沒有 cost),進而產生不平衡的分配
目前解決方案
在 assignor 中實作 retry 機制,預防沒有拉取到完整的 metric,避免影響到後續的分配以及效能
The text was updated successfully, but these errors were encountered: