From b619cb96fcd1088649a1382872a35d7289d8c0d1 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Sun, 6 Aug 2023 14:52:11 +0800 Subject: [PATCH 1/6] Add one thread on checking --- .../astraea/common/metrics/collector/MetricStore.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 616109f693..76c5f4badd 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -236,6 +237,7 @@ private MetricStoreImpl( this.receivers = receivers; // receiver + cleaner this.executor = Executors.newFixedThreadPool(2); + var isChecking = new AtomicBoolean(false); Runnable cleanerJob = () -> { while (!closed.get()) { @@ -292,7 +294,13 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - checkWaitingList(this.waitingList, clusterBean()); + // Create one thread for checking the waiting list, if there is no + // thread checking it currently. + if (isChecking.compareAndSet(false, true)) { + CompletableFuture.runAsync( + () -> checkWaitingList(this.waitingList, clusterBean())) + .thenAccept(ignore -> isChecking.set(false)); + } } }); } catch (Exception e) { From b8441729ef8daa2f48d391dd5c8dc5ec676fa323 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Mon, 7 Aug 2023 15:12:19 +0800 Subject: [PATCH 2/6] Ensure latest update beeing checked --- .../common/metrics/collector/MetricStore.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 76c5f4badd..fafb1b046d 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -237,7 +237,7 @@ private MetricStoreImpl( this.receivers = receivers; // receiver + cleaner this.executor = Executors.newFixedThreadPool(2); - var isChecking = new AtomicBoolean(false); + Runnable cleanerJob = () -> { while (!closed.get()) { @@ -260,6 +260,8 @@ private MetricStoreImpl( }; Runnable receiverJob = () -> { + var isChecking = new AtomicBoolean(false); + var needChecking = new AtomicBoolean(false); while (!closed.get()) { try { receivers.stream() @@ -294,13 +296,18 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - // Create one thread for checking the waiting list, if there is no - // thread checking it currently. - if (isChecking.compareAndSet(false, true)) { - CompletableFuture.runAsync( - () -> checkWaitingList(this.waitingList, clusterBean())) - .thenAccept(ignore -> isChecking.set(false)); - } + needChecking.set(true); + } + // Create one thread for checking the waiting list, if there is no + // thread checking it currently. + // To avoid the latest update being ignored by other checking thread, here + // comes a flag "needChecking" representing there might be an update + // between now and last checking, even if there is no update in this loop. + if (needChecking.get() && isChecking.compareAndSet(false, true)) { + needChecking.set(false); + CompletableFuture.runAsync( + () -> checkWaitingList(this.waitingList, clusterBean())) + .thenAccept(ignore -> isChecking.set(false)); } }); } catch (Exception e) { From 7ad495455dcc44d1d06aa1c82ed73fb6f6949a81 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Wed, 9 Aug 2023 14:29:58 +0800 Subject: [PATCH 3/6] User threads run checker if beans have changed --- .../common/metrics/collector/MetricStore.java | 75 ++++++++----------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index fafb1b046d..d9373c7b8f 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -223,8 +222,8 @@ class MetricStoreImpl implements MetricStore { private final Set identities = new ConcurrentSkipListSet<>(); private volatile Map> lastSensors = Map.of(); - private final Map> waitingList = - new ConcurrentHashMap<>(); + // Thread ids and latches for detecting cluster bean changing. + private final Map waitingList = new ConcurrentHashMap<>(); // For mbean register. To distinguish mbeans of different metricStore. private final String uid = Utils.randomString(); private final Sensor beanReceivedSensor = @@ -296,18 +295,8 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - needChecking.set(true); - } - // Create one thread for checking the waiting list, if there is no - // thread checking it currently. - // To avoid the latest update being ignored by other checking thread, here - // comes a flag "needChecking" representing there might be an update - // between now and last checking, even if there is no update in this loop. - if (needChecking.get() && isChecking.compareAndSet(false, true)) { - needChecking.set(false); - CompletableFuture.runAsync( - () -> checkWaitingList(this.waitingList, clusterBean())) - .thenAccept(ignore -> isChecking.set(false)); + // Tell all waiting threads that cluster bean has been changed + this.waitingList.values().forEach(CountDownLatch::countDown); } }); } catch (Exception e) { @@ -364,22 +353,39 @@ public void close() { receivers.forEach(Receiver::close); } - /** User thread will "wait" until being awakened by the metric store or being timeout. */ + /** + * User thread will "wait" until checker pass or timeout. First, register a latch to the waiting + * list. Second run the checker with current clusterBean. If the checker passes, done. + * Otherwise, wait for the cluster bean changing (/the latch counted down) and try again. + */ @Override - public void wait(Predicate checker, Duration timeout) { - var latch = new CountDownLatch(1); + public void wait(Predicate checker, Duration duration) { + long timeout = System.currentTimeMillis() + duration.toMillis(); + var threadId = Thread.currentThread().getId(); + // For first check, we don't need to wait. + var latch = new CountDownLatch(0); try { - waitingList.put(latch, checker); - // Check the newly added checker immediately - checkWaitingList(Map.of(latch, checker), clusterBean()); - // Wait until being awake or timeout - if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Timeout waiting for the checker"); + while (System.currentTimeMillis() < timeout) { + try { + // Wait for clusterBean being updated + if (!latch.await(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Timeout waiting for the checker"); + } + // Add new latch for detecting clusterBean updated + latch = new CountDownLatch(1); + this.waitingList.put(threadId, latch); + + // Return if check pass. + if (checker.test(clusterBean())) return; + } catch (NoSufficientMetricsException e) { + // Check failed. Try again next time. + } catch (InterruptedException ie) { + throw new IllegalStateException("Interrupted while waiting for the checker"); + } } - } catch (InterruptedException ie) { - throw new IllegalStateException("Interrupted while waiting for the checker"); + throw new IllegalStateException("Timeout waiting for the checker"); } finally { - waitingList.remove(latch); + this.waitingList.remove(threadId); } } @@ -392,20 +398,5 @@ private void updateClusterBean() { Collectors.toUnmodifiableMap( Map.Entry::getKey, e -> List.copyOf(e.getValue())))); } - - /** - * Check the checkers in the waiting list. If the checker returns true, count down the latch. - */ - private static void checkWaitingList( - Map> waitingList, ClusterBean clusterBean) { - waitingList.forEach( - (latch, checker) -> { - try { - if (checker.test(clusterBean)) latch.countDown(); - } catch (NoSufficientMetricsException e) { - // Check failed. Try again next time. - } - }); - } } } From e2a736d8656971de60e27e4c87db87c596b2dc4f Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Thu, 10 Aug 2023 09:48:54 +0800 Subject: [PATCH 4/6] Remove unused varibles --- .../java/org/astraea/common/metrics/collector/MetricStore.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index d9373c7b8f..4f03729121 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -259,8 +259,6 @@ private MetricStoreImpl( }; Runnable receiverJob = () -> { - var isChecking = new AtomicBoolean(false); - var needChecking = new AtomicBoolean(false); while (!closed.get()) { try { receivers.stream() From 3d46b411c151dc832361b5e1db91b888cdda7cd7 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Mon, 14 Aug 2023 13:23:00 +0800 Subject: [PATCH 5/6] Waiting threads wait on an object --- .../common/metrics/collector/MetricStore.java | 54 ++++++++----------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 4f03729121..109dc68f7f 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -222,8 +221,8 @@ class MetricStoreImpl implements MetricStore { private final Set identities = new ConcurrentSkipListSet<>(); private volatile Map> lastSensors = Map.of(); - // Thread ids and latches for detecting cluster bean changing. - private final Map waitingList = new ConcurrentHashMap<>(); + // Monitor for detecting cluster bean changing. + private final Object beanUpdateMonitor = new Object(); // For mbean register. To distinguish mbeans of different metricStore. private final String uid = Utils.randomString(); private final Sensor beanReceivedSensor = @@ -293,8 +292,10 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - // Tell all waiting threads that cluster bean has been changed - this.waitingList.values().forEach(CountDownLatch::countDown); + // Tell waiting threads that cluster bean has been changed + synchronized (beanUpdateMonitor) { + beanUpdateMonitor.notifyAll(); + } } }); } catch (Exception e) { @@ -352,39 +353,30 @@ public void close() { } /** - * User thread will "wait" until checker pass or timeout. First, register a latch to the waiting - * list. Second run the checker with current clusterBean. If the checker passes, done. - * Otherwise, wait for the cluster bean changing (/the latch counted down) and try again. + * User thread will wait until checker pass or timeout. When cluster bean has changed, the + * waiting threads will be notified. */ @Override public void wait(Predicate checker, Duration duration) { long timeout = System.currentTimeMillis() + duration.toMillis(); - var threadId = Thread.currentThread().getId(); - // For first check, we don't need to wait. - var latch = new CountDownLatch(0); - try { - while (System.currentTimeMillis() < timeout) { - try { - // Wait for clusterBean being updated - if (!latch.await(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Timeout waiting for the checker"); - } - // Add new latch for detecting clusterBean updated - latch = new CountDownLatch(1); - this.waitingList.put(threadId, latch); - - // Return if check pass. - if (checker.test(clusterBean())) return; - } catch (NoSufficientMetricsException e) { - // Check failed. Try again next time. - } catch (InterruptedException ie) { - throw new IllegalStateException("Interrupted while waiting for the checker"); + if (checker.test(clusterBean())) return; + + while (System.currentTimeMillis() < timeout) { + try { + synchronized (beanUpdateMonitor) { + // Release the lock and wait for clusterBean being updated + this.beanUpdateMonitor.wait(timeout - System.currentTimeMillis()); + // Tell other threads clusterBean has been updated + beanUpdateMonitor.notifyAll(); } + if (checker.test(clusterBean())) return; + } catch (NoSufficientMetricsException e) { + // Check failed. Try again next time. + } catch (InterruptedException ie) { + throw new IllegalStateException("Interrupted while waiting for the checker"); } - throw new IllegalStateException("Timeout waiting for the checker"); - } finally { - this.waitingList.remove(threadId); } + throw new IllegalStateException("Timeout waiting for the checker"); } private void updateClusterBean() { From 6a8fe294e628b9b60fb87bc207e269e7ca2d91fa Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Thu, 17 Aug 2023 16:17:35 +0800 Subject: [PATCH 6/6] Simplify code --- .../astraea/common/metrics/collector/MetricStore.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 109dc68f7f..c323a6d1a9 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -235,7 +235,6 @@ private MetricStoreImpl( this.receivers = receivers; // receiver + cleaner this.executor = Executors.newFixedThreadPool(2); - Runnable cleanerJob = () -> { while (!closed.get()) { @@ -358,16 +357,15 @@ public void close() { */ @Override public void wait(Predicate checker, Duration duration) { - long timeout = System.currentTimeMillis() + duration.toMillis(); + var endTime = System.currentTimeMillis() + duration.toMillis(); + var timeout = duration.toMillis(); if (checker.test(clusterBean())) return; - while (System.currentTimeMillis() < timeout) { + while (timeout > 0) { try { synchronized (beanUpdateMonitor) { // Release the lock and wait for clusterBean being updated - this.beanUpdateMonitor.wait(timeout - System.currentTimeMillis()); - // Tell other threads clusterBean has been updated - beanUpdateMonitor.notifyAll(); + this.beanUpdateMonitor.wait(timeout); } if (checker.test(clusterBean())) return; } catch (NoSufficientMetricsException e) { @@ -375,6 +373,7 @@ public void wait(Predicate checker, Duration duration) { } catch (InterruptedException ie) { throw new IllegalStateException("Interrupted while waiting for the checker"); } + timeout = endTime - System.currentTimeMillis(); } throw new IllegalStateException("Timeout waiting for the checker"); }