Skip to content

Commit

Permalink
feat(s3stream): simplify operation counter metrics (#553)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 2, 2023
1 parent c3abfbc commit 00f82cd
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 80 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.20-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.21-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.20-SNAPSHOT</version>
<version>0.1.21-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
11 changes: 4 additions & 7 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
append0(writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE).update(timerUtil.elapsed());
});
return cf;
}
Expand All @@ -247,7 +246,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
if (!fromBackoff) {
backoffRecords.offer(request);
}
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).inc();
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -305,8 +304,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE).update(timerUtil.elapsed());
});
return cf;
}
Expand Down Expand Up @@ -409,8 +407,7 @@ CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadWALObject0(logCacheBlock, cf), cf, LOGGER, "uploadWALObject"));
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_STORAGE_WAL).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.UPLOAD_STORAGE_WAL).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.UPLOAD_STORAGE_WAL).update(timerUtil.elapsed());
inflightWALUploadTasks.remove(cf);
if (ex != null) {
LOGGER.error("upload WAL object fail", ex);
Expand Down
9 changes: 3 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STREAM).operationTime.update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(System.currentTimeMillis() - start);
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -183,8 +182,7 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.FETCH_STREAM).operationTime.update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(System.currentTimeMillis() - start);
if (ex != null) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
} else if (networkOutboundLimiter != null) {
Expand Down Expand Up @@ -229,8 +227,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.TRIM_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.TRIM_STREAM).operationTime.update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(System.currentTimeMillis() - start);
});
return cf;
}, LOGGER, "trim");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.CREATE_STREAM).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsed());
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
}
Expand Down Expand Up @@ -140,8 +139,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.OPEN_STREAM).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsed());
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.s3StreamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.s3StreamObjectCompactionLivingTimeMinutes() * 60L * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
}

if (ret.isCacheHit()) {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE).inc();
} else {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_BLOCK_CACHE_MISS).inc();
}
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_BLOCK_CACHE).operationTime.update(timerUtil.elapsedAndReset());
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_BLOCK_CACHE).update(timerUtil.elapsedAndReset());
});
return readCf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public boolean put(StreamRecordBatch recordBatch) {
tryRealFree();
size.addAndGet(recordBatch.size());
boolean full = activeBlock.put(recordBatch);
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.APPEND_STORAGE_LOG_CACHE).update(timerUtil.elapsed());
return full;
}

Expand Down Expand Up @@ -104,11 +103,11 @@ public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffs
List<StreamRecordBatch> records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE).inc();
} else {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE_MISS).operationCount.inc();
OperationMetricsStats.getCounter(S3Operation.READ_STORAGE_LOG_CACHE_MISS).inc();
}
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.READ_STORAGE_LOG_CACHE).operationTime.update(timerUtil.elapsed());
OperationMetricsStats.getHistogram(S3Operation.READ_STORAGE_LOG_CACHE).update(timerUtil.elapsed());
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,30 @@
import java.util.concurrent.ConcurrentHashMap;

public class OperationMetricsStats {
private static final Map<String, OperationMetrics> OPERATION_METRICS_MAP = new ConcurrentHashMap<>();
private static final Map<String, Counter> OPERATION_COUNTER_MAP = new ConcurrentHashMap<>();
private static final Map<String, Histogram> OPERATION_HIST_MAP = new ConcurrentHashMap<>();

public static OperationMetrics getOrCreateOperationMetrics(S3Operation s3Operation) {
return OPERATION_METRICS_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id ->
new OperationMetrics(s3Operation));
public static Counter getCounter(S3Operation s3Operation) {
return getOrCreateCounterMetrics(s3Operation);
}

public static class OperationMetrics {
public final Counter operationCount;
public final Histogram operationTime;
public static Histogram getHistogram(S3Operation s3Operation) {
return getOrCreateHistMetrics(s3Operation);
}

private static Counter getOrCreateCounterMetrics(S3Operation s3Operation) {
return OPERATION_COUNTER_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
.newCounter("operation_count" + Counter.SUFFIX, tags(s3Operation)));
}

public OperationMetrics(S3Operation s3Operation) {
Map<String, String> tags = Map.of(
"operation", s3Operation.getName(),
"op_type", s3Operation.getType().getName());
operationCount = S3StreamMetricsRegistry.getMetricsGroup().newCounter("operation_count" + Counter.SUFFIX, tags);
operationTime = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("operation_time", tags);
}
private static Histogram getOrCreateHistMetrics(S3Operation s3Operation) {
return OPERATION_HIST_MAP.computeIfAbsent(s3Operation.getUniqueKey(), id -> S3StreamMetricsRegistry.getMetricsGroup()
.newHistogram("operation_time", tags(s3Operation)));
}

private static Map<String, String> tags(S3Operation s3Operation) {
return Map.of(
"operation", s3Operation.getName(),
"op_type", s3Operation.getType().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class S3ObjectMetricsStats {
public static final Histogram S3_OBJECT_UPLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_upload_size", Collections.emptyMap());
public static final Histogram S3_OBJECT_DOWNLOAD_SIZE = S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_download_size", Collections.emptyMap());

public static Histogram getOrCreateS3ObjectMetrics(S3ObjectStage stage) {
public static Histogram getHistogram(S3ObjectStage stage) {
return S3_OBJECT_TIME_MAP.computeIfAbsent(stage.getName(), op -> {
Map<String, String> tags = Map.of("stage", stage.getName());
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram("s3_object_stage_time", tags);
Expand Down
Loading

0 comments on commit 00f82cd

Please sign in to comment.