Skip to content

Commit

Permalink
Merge branch '1.11.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
shakuzen committed Nov 2, 2023
2 parents 588988d + c5ab6da commit c37ffb9
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import io.micrometer.core.instrument.util.TimeUtils;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.*;

public abstract class PushMeterRegistry extends MeterRegistry {

Expand All @@ -39,7 +35,7 @@ public abstract class PushMeterRegistry extends MeterRegistry {

private final PushRegistryConfig config;

private final AtomicBoolean publishing = new AtomicBoolean(false);
private final Semaphore publishingSemaphore = new Semaphore(1);

private long lastScheduledPublishStartTime = 0L;

Expand All @@ -57,11 +53,12 @@ protected PushMeterRegistry(PushRegistryConfig config, Clock clock) {
protected abstract void publish();

/**
* Catch uncaught exceptions thrown from {@link #publish()}.
* Catch uncaught exceptions thrown from {@link #publish()}. Skip publishing if
* another call to this method is already in progress.
*/
// VisibleForTesting
void publishSafely() {
if (this.publishing.compareAndSet(false, true)) {
void publishSafelyOrSkipIfInProgress() {
if (this.publishingSemaphore.tryAcquire()) {
this.lastScheduledPublishStartTime = clock.wallTime();
try {
publish();
Expand All @@ -71,7 +68,7 @@ void publishSafely() {
e);
}
finally {
this.publishing.set(false);
this.publishingSemaphore.release();
}
}
else {
Expand All @@ -85,12 +82,12 @@ void publishSafely() {
* @since 1.11.0
*/
protected boolean isPublishing() {
return publishing.get();
return publishingSemaphore.availablePermits() == 0;
}

/**
* Returns the time, in milliseconds, when the last scheduled publish was started by
* {@link PushMeterRegistry#publishSafely()}.
* {@link PushMeterRegistry#publishSafelyOrSkipIfInProgress()}.
* @since 1.11.1
*/
protected long getLastScheduledPublishStartTime() {
Expand All @@ -116,8 +113,8 @@ public void start(ThreadFactory threadFactory) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
long stepMillis = config.step().toMillis();
long initialDelayMillis = calculateInitialDelay();
scheduledExecutorService.scheduleAtFixedRate(this::publishSafely, initialDelayMillis, stepMillis,
TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(this::publishSafelyOrSkipIfInProgress, initialDelayMillis,
stepMillis, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -132,11 +129,24 @@ public void stop() {
public void close() {
stop();
if (config.enabled() && !isClosed()) {
publishSafely();
// do a final publish on close or wait for the in progress scheduled publish
publishSafelyOrSkipIfInProgress();
waitForInProgressScheduledPublish();
}
super.close();
}

private void waitForInProgressScheduledPublish() {
try {
// block until in progress publish finishes
publishingSemaphore.acquire();
publishingSemaphore.release();
}
catch (InterruptedException e) {
logger.warn("Interrupted while waiting for publish on close to finish", e);
}
}

// VisibleForTesting
long calculateInitialDelay() {
long stepMillis = config.step().toMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,15 @@
import java.util.stream.LongStream;

import static java.util.concurrent.TimeUnit.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.await;

/**
* Tests for {@link PushMeterRegistry}.
*/
class PushMeterRegistryTest {

static final Duration STEP_DURATION = Duration.ofMillis(10);
static ThreadFactory threadFactory = new NamedThreadFactory("PushMeterRegistryTest");

StepRegistryConfig config = new StepRegistryConfig() {
@Override
Expand All @@ -68,11 +67,13 @@ public String get(String key) {

@Test
void whenUncaughtExceptionInPublish_taskStillScheduled() throws InterruptedException {
ThreadFactory threadFactory = new NamedThreadFactory("PushMeterRegistryTest");
PushMeterRegistry pushMeterRegistry = new ThrowingPushMeterRegistry(config, latch);
pushMeterRegistry.start(threadFactory);
assertThat(latch.await(500, TimeUnit.MILLISECONDS))
.as("publish should continue to be scheduled even if an uncaught exception is thrown")
.isTrue();
pushMeterRegistry.close();
}

@Test
Expand All @@ -93,7 +94,7 @@ void publishOnlyHappensOnceWithMultipleClose() {

@Test
@Issue("#3711")
void scheduledPublishOverlapWithPublishOnClose() throws InterruptedException {
void doNotPublishAgainOnClose_whenScheduledPublishInProgress() throws InterruptedException {
MockClock clock = new MockClock();
CyclicBarrier barrier = new CyclicBarrier(2);
OverlappingStepMeterRegistry overlappingStepMeterRegistry = new OverlappingStepMeterRegistry(config, clock,
Expand All @@ -106,12 +107,18 @@ void scheduledPublishOverlapWithPublishOnClose() throws InterruptedException {

// simulated scheduled publish
Thread scheduledPublishingThread = new Thread(
() -> ((PushMeterRegistry) overlappingStepMeterRegistry).publishSafely(),
() -> ((PushMeterRegistry) overlappingStepMeterRegistry).publishSafelyOrSkipIfInProgress(),
"scheduledMetricsPublisherThread");
scheduledPublishingThread.start();
// publish on shutdown
Thread onClosePublishThread = new Thread(overlappingStepMeterRegistry::close, "shutdownHookThread");
onClosePublishThread.start();
try {
barrier.await(100, MILLISECONDS);
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new RuntimeException(e);
}
scheduledPublishingThread.join();
onClosePublishThread.join();

Expand Down Expand Up @@ -146,6 +153,110 @@ void publishTimeIsRandomizedWithinStep() {
assertThat(observedDelays).containsExactlyElementsOf(expectedDelays);
}

@Test
@Issue("#3872")
void waitForScheduledPublishToFinish_whenClosedWhilePublishIsInProgress()
throws InterruptedException, BrokenBarrierException, TimeoutException {
MockClock clock = new MockClock();
CyclicBarrier barrier = new CyclicBarrier(2);
OverlappingStepMeterRegistry registry = new OverlappingStepMeterRegistry(config, clock, barrier);
Counter c1 = registry.counter("c1");
Counter c2 = registry.counter("c2");
c1.increment();
c2.increment(2.5);
clock.add(config.step());

// start scheduled publish but don't let it finish yet
Thread scheduledPublishingThread = new Thread(
() -> ((PushMeterRegistry) registry).publishSafelyOrSkipIfInProgress(),
"testScheduledMetricsPublisherThread");
scheduledPublishingThread.start();
// close registry during publish
Thread closeThread = new Thread(registry::close, "simulatedShutdownHookThread");
closeThread.start();
// close is blocked (waiting for publish to finish)
await().atMost(config.step())
.pollInterval(1, MILLISECONDS)
.untilAsserted(() -> assertThat(closeThread.getState()).isEqualTo(Thread.State.WAITING));
// allow publish to finish
barrier.await(config.step().toMillis(), MILLISECONDS);
// publish thread will finish, followed by the close thread
scheduledPublishingThread.join();
closeThread.join();

assertThat(registry.publishes).as("only one publish happened").hasSize(1);
Deque<Double> firstPublishValues = registry.publishes.get(0);
assertThat(firstPublishValues.pop()).isEqualTo(1); // c1 counter count
assertThat(firstPublishValues.pop()).isEqualTo(2.5); // c2 counter count
}

@Test
void publishSafelyRespectsInterrupt() throws InterruptedException {
MockClock clock = new MockClock();
CyclicBarrier barrier = new CyclicBarrier(2);
OverlappingStepMeterRegistry registry = new OverlappingStepMeterRegistry(config, clock, barrier);
Counter c1 = registry.counter("c1");
Counter c2 = registry.counter("c2");
c1.increment();
c2.increment(2.5);
clock.add(config.step());

// start scheduled publish but don't let it finish yet
Thread scheduledPublishingThread = new Thread(
() -> ((PushMeterRegistry) registry).publishSafelyOrSkipIfInProgress(),
"testScheduledMetricsPublisherThread");
scheduledPublishingThread.start();
// close registry during publish
Thread closeThread = new Thread(registry::close, "simulatedShutdownHookThread");
closeThread.start();
// close is blocked (waiting for publish to finish)
await().atMost(config.step())
.pollInterval(1, MILLISECONDS)
.untilAsserted(() -> assertThat(closeThread.getState()).isEqualTo(Thread.State.WAITING));

scheduledPublishingThread.interrupt();

// both threads finish without reaching the barrier
scheduledPublishingThread.join();
closeThread.join();

assertThat(registry.numberOfPublishes.get()).isZero();
}

@Test
void closeRespectsInterrupt() throws InterruptedException {
MockClock clock = new MockClock();
CyclicBarrier barrier = new CyclicBarrier(2);
OverlappingStepMeterRegistry registry = new OverlappingStepMeterRegistry(config, clock, barrier);
Counter c1 = registry.counter("c1");
Counter c2 = registry.counter("c2");
c1.increment();
c2.increment(2.5);
clock.add(config.step());

// start scheduled publish but don't let it finish yet
Thread scheduledPublishingThread = new Thread(
() -> ((PushMeterRegistry) registry).publishSafelyOrSkipIfInProgress(),
"testScheduledMetricsPublisherThread");
scheduledPublishingThread.start();
// close registry during publish
Thread closeThread = new Thread(registry::close, "simulatedShutdownHookThread");
closeThread.start();
// close is blocked (waiting for publish to finish)
await().atMost(config.step())
.pollInterval(1, MILLISECONDS)
.untilAsserted(() -> assertThat(closeThread.getState()).isEqualTo(Thread.State.WAITING));

closeThread.interrupt();

// close thread finishes without reaching barrier
closeThread.join();

// publish thread will continue in background, but hopefully the 100ms block on
// the barrier means it won't finish before this assertion
assertThat(registry.numberOfPublishes.get()).isZero();
}

private static class OverlappingStepMeterRegistry extends StepMeterRegistry {

private final AtomicInteger numberOfPublishes = new AtomicInteger();
Expand Down Expand Up @@ -183,17 +294,6 @@ protected void publish() {
}));
}

@Override
public void close() {
try {
barrier.await(100, MILLISECONDS);
}
catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new RuntimeException(e);
}
super.close();
}

}

static class CountingPushMeterRegistry extends PushMeterRegistry {
Expand Down

0 comments on commit c37ffb9

Please sign in to comment.