Skip to content

Commit

Permalink
fix(s3stream): measure time elapse with higher precision (#568)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 6, 2023
1 parent 3730ced commit 9ce85d9
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.1.21-SNAPSHOT</s3stream.version>
<s3stream.version>0.1.22-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.1.21-SNAPSHOT</version>
<version>0.1.22-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
6 changes: 3 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void shutdown() {

@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletableFuture<Void> cf = new CompletableFuture<>();
// encoded before append to free heap ByteBuf.
streamRecord.encoded();
Expand Down Expand Up @@ -300,7 +300,7 @@ private void tryDrainBackoffRecords() {

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
mainReadExecutor.execute(() -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes), cf));
cf.whenComplete((nil, ex) -> {
Expand Down Expand Up @@ -402,7 +402,7 @@ private void handleAppendCallback0(WalWriteRequest request) {
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
CompletableFuture<Void> uploadWALObject(LogCache.LogCacheBlock logCacheBlock) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletableFuture<Void> cf = new CompletableFuture<>();
inflightWALUploadTasks.add(cf);
backgroundExecutor.execute(() -> FutureUtil.exec(() -> uploadWALObject0(logCacheBlock, cf), cf, LOGGER, "uploadWALObject"));
Expand Down
14 changes: 8 additions & 6 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.automq.stream.api.RecordBatchWithContext;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClientException;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.OperationMetricsStats;
import com.automq.stream.s3.model.StreamRecordBatch;
Expand All @@ -42,6 +43,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -132,7 +134,7 @@ public long nextOffset() {
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
writeLock.lock();
try {
long start = System.currentTimeMillis();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletableFuture<AppendResult> cf = exec(() -> {
if (networkInboundLimiter != null) {
networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining());
Expand All @@ -141,7 +143,7 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.APPEND_STREAM).update(timerUtil.elapsed());
pendingAppends.remove(cf);
});
return cf;
Expand Down Expand Up @@ -178,11 +180,11 @@ private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes) {
readLock.lock();
try {
long start = System.currentTimeMillis();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.FETCH_STREAM).update(timerUtil.elapsed());
if (ex != null) {
LOGGER.error("{} stream fetch [{}, {}) {} fail", logIdent, startOffset, endOffset, maxBytes, ex);
} else if (networkOutboundLimiter != null) {
Expand Down Expand Up @@ -221,13 +223,13 @@ private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset,
public CompletableFuture<Void> trim(long newStartOffset) {
writeLock.lock();
try {
long start = System.currentTimeMillis();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
return exec(() -> {
CompletableFuture<Void> cf = new CompletableFuture<>();
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(System.currentTimeMillis() - start);
OperationMetricsStats.getHistogram(S3Operation.TRIM_STREAM).update(timerUtil.elapsed());
});
return cf;
}, LOGGER, "trim");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage

@Override
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
OperationMetricsStats.getHistogram(S3Operation.CREATE_STREAM).update(timerUtil.elapsed());
return openStream0(streamId, options.epoch());
Expand Down Expand Up @@ -136,7 +136,7 @@ private void startStreamObjectsCompactions() {
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
OperationMetricsStats.getHistogram(S3Operation.OPEN_STREAM).update(timerUtil.elapsed());
Expand Down Expand Up @@ -171,7 +171,7 @@ public void shutdown() {
LOGGER.warn("await streamObjectCompactionExecutor close fail", e);
}

TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
Map<Long, CompletableFuture<Void>> streamCloseFutures = new ConcurrentHashMap<>();
openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close()));
for (; ; ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public DefaultS3BlockCache(long cacheBytesSize, ObjectManager objectManager, S3O

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
// submit read task to mainExecutor to avoid read slower the caller thread.
mainExecutor.execute(() -> FutureUtil.exec(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -66,7 +67,7 @@ public LogCache(long cacheBlockMaxSize, int maxCacheBlockStreamCount) {
}

public boolean put(StreamRecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
tryRealFree();
size.addAndGet(recordBatch.size());
boolean full = activeBlock.put(recordBatch);
Expand Down Expand Up @@ -99,7 +100,7 @@ public boolean put(StreamRecordBatch recordBatch) {
* Note: the records is retained, the caller should release it.
*/
public List<StreamRecordBatch> get(long streamId, long startOffset, long endOffset, int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
List<StreamRecordBatch> records = get0(streamId, startOffset, endOffset, maxBytes);
records.forEach(StreamRecordBatch::retain);
if (!records.isEmpty() && records.get(0).getBaseOffset() <= startOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void compact(List<StreamMetadata> streamMetadataList, List<S3ObjectMetad

private void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToForceSplit) {
logger.info("Force split {} WAL objects", objectsToForceSplit.size());
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
for (int i = 0; i < objectsToForceSplit.size(); i++) {
timerUtil.reset();
S3ObjectMetadata objectToForceSplit = objectsToForceSplit.get(i);
Expand Down Expand Up @@ -191,7 +191,7 @@ private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3Obje
objectsToCompact = objectsToCompact.subList(0, maxObjectNumToCompact);
}
logger.info("Compact {} WAL objects", objectsToCompact.size());
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CommitWALObjectRequest request = buildCompactRequest(streamMetadataList, objectsToCompact);
if (request == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@

package com.automq.stream.s3.metrics;

import java.util.concurrent.TimeUnit;

public class TimerUtil {
private final TimeUnit timeUnit;
private long last;

public TimerUtil() {
public TimerUtil(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
reset();
}

public void reset() {
last = System.currentTimeMillis();
last = System.nanoTime();
}

public long elapsed() {
return System.currentTimeMillis() - last;
return timeUnit.convert(System.nanoTime() - last, TimeUnit.NANOSECONDS);
}

public long elapsedAndReset() {
long now = System.currentTimeMillis();
long elapsed = now - last;
long now = System.nanoTime();
long elapsed = timeUnit.convert(now - last, TimeUnit.NANOSECONDS);
last = now;
return elapsed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ CompletableFuture<ByteBuf> mergedRangeRead(String path, long start, long end) {
}

void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();
readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
Expand Down Expand Up @@ -289,7 +289,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
}

private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
int objectSize = data.readableBytes();
PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(data.nioBuffers());
Expand Down Expand Up @@ -319,7 +319,7 @@ public Writer writer(String path, ThrottleStrategy throttleStrategy) {

@Override
public CompletableFuture<Void> delete(String path) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucket).key(path).build();
return writeS3Client.deleteObject(request).thenAccept(deleteObjectResponse -> {
OperationMetricsStats.getHistogram(S3Operation.DELETE_OBJECT).update(timerUtil.elapsed());
Expand All @@ -333,7 +333,7 @@ public CompletableFuture<Void> delete(String path) {

@Override
public CompletableFuture<List<String>> delete(List<String> objectKeys) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
ObjectIdentifier[] toDeleteKeys = objectKeys.stream().map(key ->
ObjectIdentifier.builder()
.key(key)
Expand Down Expand Up @@ -367,7 +367,7 @@ public CompletableFuture<String> createMultipartUpload(String path) {
}

void createMultipartUpload0(String path, CompletableFuture<String> cf) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().bucket(bucket).key(path).build();
writeS3Client.createMultipartUpload(request).thenAccept(createMultipartUploadResponse -> {
OperationMetricsStats.getHistogram(S3Operation.CREATE_MULTI_PART_UPLOAD).update(timerUtil.elapsed());
Expand Down Expand Up @@ -408,7 +408,7 @@ public CompletableFuture<CompletedPart> uploadPart(String path, String uploadId,
}

private void uploadPart0(String path, String uploadId, int partNumber, ByteBuf part, CompletableFuture<CompletedPart> cf) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
AsyncRequestBody body = AsyncRequestBody.fromByteBuffersUnsafe(part.nioBuffers());
UploadPartRequest request = UploadPartRequest.builder().bucket(bucket).key(path).uploadId(uploadId)
.partNumber(partNumber).build();
Expand Down Expand Up @@ -442,7 +442,7 @@ public CompletableFuture<CompletedPart> uploadPartCopy(String sourcePath, String
}

private void uploadPartCopy0(String sourcePath, String path, long start, long end, String uploadId, int partNumber, CompletableFuture<CompletedPart> cf) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
long inclusiveEnd = end - 1;
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, inclusiveEnd)).uploadId(uploadId).partNumber(partNumber).build();
Expand Down Expand Up @@ -476,7 +476,7 @@ public CompletableFuture<Void> completeMultipartUpload(String path, String uploa
}

public void completeMultipartUpload0(String path, String uploadId, List<CompletedPart> parts, CompletableFuture<Void> cf) {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder().parts(parts).build();
CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().bucket(bucket).key(path).uploadId(uploadId).multipartUpload(multipartUpload).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand All @@ -48,7 +49,7 @@ public class MultiPartWriter implements Writer {
*/
private final long minPartSize;
private ObjectPart objectPart = null;
private final TimerUtil timerUtil = new TimerUtil();
private final TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
private final ThrottleStrategy throttleStrategy;
private final AtomicLong totalWriteSize = new AtomicLong(0L);

Expand Down Expand Up @@ -211,7 +212,7 @@ public void upload() {
}

private void upload0() {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
FutureUtil.propagate(uploadIdCf.thenCompose(uploadId -> operator.uploadPart(path, uploadId, partNumber, partBuf, throttleStrategy)), partCf);
partCf.whenComplete((nil, ex) -> S3ObjectMetricsStats.getHistogram(S3ObjectStage.UPLOAD_PART).update(timerUtil.elapsed()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ public AppendResult append(ByteBuf buf, int crc) throws OverCapacityException {
}

public AppendResult append0(ByteBuf body, int crc) throws OverCapacityException {
TimerUtil timerUtil = new TimerUtil();
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
checkReadyToServe();

final long recordSize = RECORD_HEADER_SIZE + body.readableBytes();
Expand Down

0 comments on commit 9ce85d9

Please sign in to comment.