Skip to content

Commit

Permalink
feat(s3stream): reduce histogram sample size by dividing into explici…
Browse files Browse the repository at this point in the history
…t percentiles (#956)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Mar 6, 2024
1 parent c6c0a36 commit c332510
Show file tree
Hide file tree
Showing 29 changed files with 499 additions and 287 deletions.
1 change: 1 addition & 0 deletions s3stream/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
api 'org.aspectj:aspectjrt:1.9.20.1'
api 'org.aspectj:aspectjweaver:1.9.20.1'
api 'com.github.jnr:jnr-posix:3.1.19'
api 'com.yammer.metrics:metrics-core:2.2.0'
testImplementation 'org.slf4j:slf4j-simple:2.0.9'
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
testImplementation 'org.mockito:mockito-core:5.5.0'
Expand Down
5 changes: 5 additions & 0 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>jnr-posix</artifactId>
<version>3.1.19</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>

<build>
Expand Down
21 changes: 10 additions & 11 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
Expand Down Expand Up @@ -334,7 +333,7 @@ public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch s
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
StorageOperationStats.getInstance().appendStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().appendStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
return cf;
}
Expand All @@ -354,7 +353,7 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
if (!fromBackoff) {
backoffRecords.offer(request);
}
StorageOperationStats.getInstance().appendLogCacheFullStats.record(MetricsLevel.INFO, 0L);
StorageOperationStats.getInstance().appendLogCacheFullStats.record(0L);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", deltaWALCache.size(), maxDeltaWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -437,7 +436,7 @@ public CompletableFuture<ReadDataBlock> read(FetchContext context,
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().readStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}

Expand Down Expand Up @@ -515,7 +514,7 @@ public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture<Void> cf = new CompletableFuture<>();
// Wait for a while to group force upload tasks.
forceUploadTicker.tick().whenComplete((nil, ex) -> {
StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
uploadDeltaWAL(streamId, true);
// Wait for all tasks contains streamId complete.
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.stream()
Expand All @@ -525,7 +524,7 @@ public CompletableFuture<Void> forceUpload(long streamId) {
callbackSequencer.tryFree(streamId);
}
});
cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)));
cf.whenComplete((nil, ex) -> StorageOperationStats.getInstance().forceUploadWALCompleteStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}

Expand Down Expand Up @@ -557,7 +556,7 @@ private void handleAppendCallback0(WalWriteRequest request) {
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
StorageOperationStats.getInstance().appendCallbackStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().appendCallbackStats.record(timer.elapsedAs(TimeUnit.NANOSECONDS));
}

private Lock getStreamCallbackLock(long streamId) {
Expand Down Expand Up @@ -602,7 +601,7 @@ CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
inflightWALUploadTasks.add(context);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadDeltaWAL0(context), cf, LOGGER, "uploadDeltaWAL"));
cf.whenComplete((nil, ex) -> {
StorageOperationStats.getInstance().uploadWALCompleteStats.record(MetricsLevel.INFO, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().uploadWALCompleteStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS));
inflightWALUploadTasks.remove(context);
if (ex != null) {
LOGGER.error("upload delta WAL fail", ex);
Expand Down Expand Up @@ -643,11 +642,11 @@ private void uploadDeltaWAL0(DeltaWALUploadTaskContext context) {

private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.prepare().thenAcceptAsync(nil -> {
StorageOperationStats.getInstance().uploadWALPrepareStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().uploadWALPrepareStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS));
// 1. poll out current task and trigger upload.
DeltaWALUploadTaskContext peek = walPrepareQueue.poll();
Objects.requireNonNull(peek).task.upload().thenAccept(nil2 -> StorageOperationStats.getInstance()
.uploadWALUploadStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS)));
.uploadWALUploadStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS)));
// 2. add task to commit queue.
boolean walObjectCommitQueueEmpty = walCommitQueue.isEmpty();
walCommitQueue.add(peek);
Expand All @@ -664,7 +663,7 @@ private void prepareDeltaWALUpload(DeltaWALUploadTaskContext context) {

private void commitDeltaWALUpload(DeltaWALUploadTaskContext context) {
context.task.commit().thenAcceptAsync(nil -> {
StorageOperationStats.getInstance().uploadWALCommitStats.record(MetricsLevel.DEBUG, context.timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().uploadWALCommitStats.record(context.timer.elapsedAs(TimeUnit.NANOSECONDS));
// 1. poll out current task
walCommitQueue.poll();
if (context.cache.confirmOffset() != 0) {
Expand Down
11 changes: 5 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
Expand Down Expand Up @@ -144,7 +143,7 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
StreamOperationStats.getInstance().appendStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StreamOperationStats.getInstance().appendStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -190,7 +189,7 @@ public CompletableFuture<FetchResult> fetch(FetchContext context,
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
StreamOperationStats.getInstance().fetchStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StreamOperationStats.getInstance().fetchStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
Throwable cause = FutureUtil.cause(ex);
if (!(cause instanceof FastReadFailFastException)) {
Expand Down Expand Up @@ -256,7 +255,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
CompletableFuture<Void> cf = new CompletableFuture<>();
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}, LOGGER, "trim");
} finally {
Expand Down Expand Up @@ -307,10 +306,10 @@ public CompletableFuture<Void> close() {
closeCf.whenComplete((nil, ex) -> {
if (ex != null) {
LOGGER.error("{} close fail", logIdent, ex);
StreamOperationStats.getInstance().closeStreamStats(false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StreamOperationStats.getInstance().closeStreamStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
} else {
LOGGER.info("{} closed", logIdent);
StreamOperationStats.getInstance().closeStreamStats(true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StreamOperationStats.getInstance().closeStreamStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
Expand Down Expand Up @@ -83,7 +82,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
StreamOperationStats.getInstance().createStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
Expand Down Expand Up @@ -117,7 +116,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, networkInboundBucket, networkOutboundBucket));
openedStreams.put(streamId, stream);
StreamOperationStats.getInstance().openStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return stream;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package com.automq.stream.s3.cache;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
Expand Down Expand Up @@ -105,7 +104,7 @@ public CompletableFuture<ReadDataBlock> read(TraceContext traceContext,

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT;
StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed);
StorageOperationStats.getInstance().readBlockCacheStats(isCacheHit).record(timeElapsed);
Span.fromContext(finalTraceContext.currentContext()).setAttribute("cache_hit", isCacheHit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
Expand Down Expand Up @@ -94,7 +93,7 @@ public boolean put(StreamRecordBatch recordBatch) {
} finally {
readLock.unlock();
}
StorageOperationStats.getInstance().appendLogCacheStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().appendLogCacheStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return full;
}

Expand Down Expand Up @@ -145,7 +144,7 @@ public List<StreamRecordBatch> get(TraceContext context,

long timeElapsed = timerUtil.elapsedAs(TimeUnit.NANOSECONDS);
boolean isCacheHit = !records.isEmpty() && records.get(0).getBaseOffset() <= startOffset;
StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(MetricsLevel.INFO, timeElapsed);
StorageOperationStats.getInstance().readLogCacheStats(isCacheHit).record(timeElapsed);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package com.automq.stream.s3.cache;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.utils.LogContext;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void updateReadAheadResult(long readAheadEndOffset, int readAheadSize) {
lock.lock();
this.readAheadEndOffset = readAheadEndOffset;
this.lastReadAheadSize = readAheadSize;
StorageOperationStats.getInstance().readAheadSizeStats.record(MetricsLevel.INFO, readAheadSize);
StorageOperationStats.getInstance().readAheadSizeStats.record(readAheadSize);
if (logger.isDebugEnabled()) {
logger.debug("update read ahead offset {}, size: {}, lastReadOffset: {}", readAheadEndOffset, readAheadSize, lastReadOffset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.model.StreamRecordBatch;
Expand Down Expand Up @@ -131,7 +130,7 @@ public CompletableFuture<List<StreamRecordBatch>> syncReadAhead(TraceContext tra
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
StorageOperationStats.getInstance().blockCacheReadAheadStats(true).record(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().blockCacheReadAheadStats(true).record(timer.elapsedAs(TimeUnit.NANOSECONDS));
});
}

Expand Down Expand Up @@ -168,7 +167,7 @@ CompletableFuture<List<StreamRecordBatch>> handleSyncReadAhead(TraceContext trac
CompletableFuture<Void> throttleCf = inflightReadThrottle.acquire(traceContext, uuid, totalReserveSize);
return throttleCf.thenComposeAsync(nil -> {
// concurrently read all data blocks
StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
for (int i = 0; i < streamDataBlocksToRead.size(); i++) {
Pair<ObjectReader, StreamDataBlock> pair = streamDataBlocksToRead.get(i);
ObjectReader objectReader = pair.getLeft();
Expand Down Expand Up @@ -296,7 +295,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
completeInflightTask0(key, ex);
}
context.taskKeySet.clear();
StorageOperationStats.getInstance().blockCacheReadAheadStats(false).record(MetricsLevel.INFO, timer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().blockCacheReadAheadStats(false).record(timer.elapsedAs(TimeUnit.NANOSECONDS));
});
}

Expand Down Expand Up @@ -364,7 +363,7 @@ CompletableFuture<Void> handleAsyncReadAhead(long streamId, long startOffset, lo
if (reserveResult.reserveSize() > 0) {
TimerUtil throttleTimer = new TimerUtil();
inflightReadThrottle.acquire(TraceContext.DEFAULT, uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> {
StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(MetricsLevel.INFO, throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
StorageOperationStats.getInstance().readAheadLimiterQueueTimeStats.record(throttleTimer.elapsedAs(TimeUnit.NANOSECONDS));
// read data block
if (context.taskKeySet.contains(taskKey)) {
setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class S3StreamMetricsConstant {
public static final String OPERATION_LATENCY_METRIC_NAME = "operation_latency";
public static final String OBJECT_COUNT_METRIC_NAME = "object_count";
public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost";
public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size";
public static final String OBJECT_DOWNLOAD_SIZE_METRIC_NAME = "object_download_size";
public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage";
public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage";
public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth";
Expand All @@ -80,6 +78,12 @@ public class S3StreamMetricsConstant {
public static final String NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_inbound_limiter_queue_time";
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time";
public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
public static final String SUM_METRIC_NAME_SUFFIX = "_sum";
public static final String COUNT_METRIC_NAME_SUFFIX = "_count";
public static final String P50_METRIC_NAME_SUFFIX = "_50p";
public static final String P99_METRIC_NAME_SUFFIX = "_99p";
public static final String MEAN_METRIC_NAME_SUFFIX = "_mean";
public static final String MAX_METRIC_NAME_SUFFIX = "_max";
public static final String WAL_START_OFFSET = "wal_start_offset";
public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset";
public static final String DELTA_WAL_CACHE_SIZE = "delta_wal_cache_size";
Expand Down
Loading

0 comments on commit c332510

Please sign in to comment.