Skip to content

Commit

Permalink
Add pendingResponse to ServerMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
EunJungYoo committed Sep 5, 2024
1 parent 19068f6 commit 4d5d2e8
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.LongAdder;

import com.google.common.base.Ticker;

Expand All @@ -29,39 +28,47 @@
*/
abstract class GracefulShutdownSupport {

static GracefulShutdownSupport create(Duration quietPeriod, Executor blockingTaskExecutor) {
return create(quietPeriod, blockingTaskExecutor, Ticker.systemTicker());
private final ServerMetrics serverMetrics;

/**
* Creates a new instance.
*/
public GracefulShutdownSupport(ServerMetrics serverMetrics) {
this.serverMetrics = serverMetrics;
}

static GracefulShutdownSupport create(Duration quietPeriod, Executor blockingTaskExecutor, Ticker ticker) {
return new DefaultGracefulShutdownSupport(quietPeriod, blockingTaskExecutor, ticker);
static GracefulShutdownSupport create(Duration quietPeriod, Executor blockingTaskExecutor,
ServerMetrics serverMetrics) {
return create(quietPeriod, blockingTaskExecutor, Ticker.systemTicker(), serverMetrics);
}

static GracefulShutdownSupport createDisabled() {
return new DisabledGracefulShutdownSupport();
static GracefulShutdownSupport create(Duration quietPeriod, Executor blockingTaskExecutor, Ticker ticker, ServerMetrics serverMetrics) {
return new DefaultGracefulShutdownSupport(quietPeriod, blockingTaskExecutor, ticker, serverMetrics);
}

private final LongAdder pendingResponses = new LongAdder();
static GracefulShutdownSupport createDisabled(ServerMetrics serverMetrics) {
return new DisabledGracefulShutdownSupport(serverMetrics);
}

/**
* Increases the number of pending responses.
*/
final void inc() {
pendingResponses.increment();
serverMetrics.increaseNonTransientRequests();
}

/**
* Decreases the number of pending responses.
*/
void dec() {
pendingResponses.decrement();
serverMetrics.decreaseNonTransientRequests();
}

/**
* Returns the number of pending responses.
*/
final long pendingResponses() {
return pendingResponses.sum();
final long activeNonTransientResponses() {
return serverMetrics.activeNonTransientRequests();
}

/**
Expand All @@ -78,6 +85,15 @@ private static final class DisabledGracefulShutdownSupport extends GracefulShutd

private volatile boolean shuttingDown;

/**
* Creates a new instance.
*
* @param serverMetrics
*/
public DisabledGracefulShutdownSupport(ServerMetrics serverMetrics) {
super(serverMetrics);
}

@Override
boolean isShuttingDown() {
return shuttingDown;
Expand All @@ -97,12 +113,14 @@ private static final class DefaultGracefulShutdownSupport extends GracefulShutdo
private final Executor blockingTaskExecutor;

/**
* Declared as non-volatile because using {@link #pendingResponses} as a memory barrier.
* Declared as non-volatile because using {@link #activeNonTransientResponses} as a memory barrier.
*/
private long lastResTimeNanos;
private volatile long shutdownStartTimeNanos;

DefaultGracefulShutdownSupport(Duration quietPeriod, Executor blockingTaskExecutor, Ticker ticker) {
DefaultGracefulShutdownSupport(Duration quietPeriod, Executor blockingTaskExecutor, Ticker ticker,
ServerMetrics serverMetrics) {
super(serverMetrics);
quietPeriodNanos = quietPeriod.toNanos();
this.blockingTaskExecutor = blockingTaskExecutor;
this.ticker = ticker;
Expand All @@ -125,7 +143,7 @@ boolean completedQuietPeriod() {
shutdownStartTimeNanos = readTicker();
}

if (pendingResponses() != 0 || !completedBlockingTasks()) {
if (activeNonTransientResponses() != 0 || !completedBlockingTasks()) {
return false;
}

Expand Down
24 changes: 12 additions & 12 deletions core/src/main/java/com/linecorp/armeria/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -502,11 +502,11 @@ private final class ServerStartStopSupport extends StartStopSupport<Void, Void,
@Override
protected CompletionStage<Void> doStart(@Nullable Void arg) {
if (config().gracefulShutdownQuietPeriod().isZero()) {
gracefulShutdownSupport = GracefulShutdownSupport.createDisabled();
gracefulShutdownSupport = GracefulShutdownSupport.createDisabled(config.serverMetrics());
} else {
gracefulShutdownSupport =
GracefulShutdownSupport.create(config().gracefulShutdownQuietPeriod(),
config().blockingTaskExecutor());
config().blockingTaskExecutor(), config.serverMetrics());
}

// Initialize the server sockets asynchronously.
Expand All @@ -520,7 +520,7 @@ protected CompletionStage<Void> doStart(@Nullable Void arg) {
try {
doStart(primary).addListener(new ServerPortStartListener(primary))
.addListener(new NextServerPortStartListener(this, it, future));
setupServerMetrics();
// setupServerMetrics();
} catch (Throwable cause) {
future.completeExceptionally(cause);
}
Expand Down Expand Up @@ -580,15 +580,15 @@ private ChannelFuture doStart(ServerPort port) {
return b.bind(localAddress);
}

private void setupServerMetrics() {
final MeterRegistry meterRegistry = config.meterRegistry();
final GracefulShutdownSupport gracefulShutdownSupport = this.gracefulShutdownSupport;
assert gracefulShutdownSupport != null;

meterRegistry.gauge("armeria.server.pending.responses", gracefulShutdownSupport,
GracefulShutdownSupport::pendingResponses);
config.serverMetrics().bindTo(meterRegistry);
}
// private void setupServerMetrics() {
// final MeterRegistry meterRegistry = config.meterRegistry();
// final GracefulShutdownSupport gracefulShutdownSupport = this.gracefulShutdownSupport;
// assert gracefulShutdownSupport != null;
//
// meterRegistry.gauge("armeria.server.pending.responses", gracefulShutdownSupport,
//// GracefulShutdownSupport::pendingResponses);
// config.serverMetrics().bindTo(meterRegistry);
// }

@Override
protected CompletionStage<Void> doStop(@Nullable Void arg) {
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/com/linecorp/armeria/server/ServerMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class ServerMetrics implements MeterBinder {
private final LongAdder activeHttp1WebSocketRequests = new LongAdder();
private final LongAdder activeHttp1Requests = new LongAdder();
private final LongAdder activeHttp2Requests = new LongAdder();
private final LongAdder activeNonTransientRequests = new LongAdder();

/**
* AtomicInteger is used to read the number of active connections frequently.
Expand Down Expand Up @@ -98,6 +99,13 @@ public long activeHttp2Requests() {
return activeHttp2Requests.longValue();
}

/**
* Returns the number of pending http responses.
*/
public long activeNonTransientRequests() {
return activeNonTransientRequests.longValue();
}

/**
* Returns the number of open connections.
*/
Expand Down Expand Up @@ -153,6 +161,14 @@ void decreaseActiveConnections() {
activeConnections.decrementAndGet();
}

void increaseNonTransientRequests() {
activeNonTransientRequests.increment();
}

void decreaseNonTransientRequests() {
activeNonTransientRequests.decrement();
}

@Override
public void bindTo(MeterRegistry meterRegistry) {
meterRegistry.gauge("armeria.server.connections", activeConnections);
Expand All @@ -174,6 +190,10 @@ public void bindTo(MeterRegistry meterRegistry) {
meterRegistry.gauge(allRequestsMeterName,
ImmutableList.of(Tag.of("protocol", "http1.websocket"), Tag.of("state", "active")),
activeHttp1WebSocketRequests);
// pending non-transient responses
meterRegistry.gauge(allRequestsMeterName,
ImmutableList.of(Tag.of("protocol", "all"), Tag.of("state", "active")),
activeNonTransientRequests);
}

@Override
Expand All @@ -185,6 +205,7 @@ public String toString() {
.add("pendingHttp2Requests", pendingHttp2Requests)
.add("activeHttp2Requests", activeHttp2Requests)
.add("activeConnections", activeConnections)
.add("activeNonTransientRequests", activeNonTransientRequests)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class GracefulShutdownSupportTest {
@Mock
private Ticker ticker;

private ServerMetrics serverMetrics;

private GracefulShutdownSupport support;
private ThreadPoolExecutor executor;

Expand All @@ -54,7 +56,8 @@ void setUp() {
0, 1, 1, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
ThreadFactories.newThreadFactory("graceful-shutdown-test", true));

support = GracefulShutdownSupport.create(Duration.ofNanos(QUIET_PERIOD_NANOS), executor, ticker);
serverMetrics = new ServerMetrics();
support = GracefulShutdownSupport.create(Duration.ofNanos(QUIET_PERIOD_NANOS), executor, ticker, serverMetrics);
}

@AfterEach
Expand All @@ -64,17 +67,17 @@ void tearDown() {

@Test
void testDisabled() {
final GracefulShutdownSupport support = GracefulShutdownSupport.createDisabled();
final GracefulShutdownSupport support = GracefulShutdownSupport.createDisabled(serverMetrics);
assertThat(support.isShuttingDown()).isFalse();
assertThat(support.completedQuietPeriod()).isTrue();
assertThat(support.isShuttingDown()).isTrue();
support.inc();
assertThat(support.pendingResponses()).isOne();
assertThat(support.activeNonTransientResponses()).isOne();
assertThat(support.completedQuietPeriod()).isTrue();

// pendingResponses must be updated even if disabled, because it's part of metrics.
support.dec();
assertThat(support.pendingResponses()).isZero();
assertThat(support.activeNonTransientResponses()).isZero();
}

@Test
Expand Down

0 comments on commit 4d5d2e8

Please sign in to comment.