Skip to content

Commit

Permalink
feat(s3stream): stats bytebuf alloc (#496)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 28, 2023
1 parent e451110 commit 12d5d50
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.stream.s3;

import com.automq.stream.s3.metrics.stats.ByteBufMetricsStats;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand All @@ -38,7 +39,14 @@ public static CompositeByteBuf compositeByteBuffer() {
}

public static ByteBuf byteBuffer(int initCapacity) {
return byteBuffer(initCapacity, null);
}

public static ByteBuf byteBuffer(int initCapacity, String name) {
try {
if (name != null) {
ByteBufMetricsStats.getHistogram(name).update(initCapacity);
}
return ALLOC.directBuffer(initCapacity);
} catch (OutOfMemoryError e) {
for (;;) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ class IndexBlock {

public IndexBlock() {
long nextPosition = 0;
buf = DirectByteBufAlloc.byteBuffer(1024 * 1024);
int indexBlockSize = 4 + (8 + 4 + 4 + 8 + 8 + 4 + 4) * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block");
buf.writeInt(completedBlocks.size()); // block count
// block index
for (DataBlock block : completedBlocks) {
Expand Down
3 changes: 2 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public void shutdown() {
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
// encoded before append to free heap ByteBuf.
streamRecord.encoded();
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf);
handleAppendRequest(writeRequest);
append0(writeRequest, false);
Expand Down Expand Up @@ -250,7 +252,6 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
WriteAheadLog.AppendResult appendResult;
try {
StreamRecordBatch streamRecord = request.record;
streamRecord.encoded();
streamRecord.retain();
appendResult = log.append(streamRecord.encoded());
} catch (WriteAheadLog.OverCapacityException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class IndexBlock {

public IndexBlock() {
position = nextDataBlockPosition;
buf = DirectByteBufAlloc.byteBuffer(calculateIndexBlockSize());
buf = DirectByteBufAlloc.byteBuffer(calculateIndexBlockSize(), "write_index_block");
buf.writeInt(completedBlocks.size()); // block count
long nextPosition = 0;
// block index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ public enum S3Operation {
COMMIT_STREAM_OBJECT(S3MetricsType.S3Object, "commit_stream_object"),
GET_OBJECTS(S3MetricsType.S3Object, "get_objects"),
GET_SERVER_OBJECTS(S3MetricsType.S3Object, "get_server_objects"),
GET_STREAM_OBJECTS(S3MetricsType.S3Object, "get_stream_objects");
GET_STREAM_OBJECTS(S3MetricsType.S3Object, "get_stream_objects"),
/* S3 object operations end */

ALLOC_BUFFER(S3MetricsType.S3Storage, "alloc_buffer");

private final S3MetricsType type;
private final String name;
private final String uniqueKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.metrics.stats;

import com.automq.stream.s3.metrics.Histogram;
import com.automq.stream.s3.metrics.S3StreamMetricsRegistry;
import com.automq.stream.s3.metrics.operations.S3MetricsType;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ByteBufMetricsStats {
private static final Map<String, Histogram> SOURCE_TO_HISTOGRAM = new ConcurrentHashMap<>();

public static Histogram getHistogram(String source) {
return SOURCE_TO_HISTOGRAM.computeIfAbsent(source, k -> {
Map<String, String> tags = Map.of("source", k);
return S3StreamMetricsRegistry.getMetricsGroup().newHistogram(S3MetricsType.S3Storage.getName(),
S3MetricsType.S3Storage.getName() + "size", tags);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
.thenAccept(responsePublisher -> {
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.GET_OBJECT).operationTime.update(timerUtil.elapsed());
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) (end - start + 1));
ByteBuf buf = DirectByteBufAlloc.byteBuffer((int) (end - start + 1), "merge_read");
responsePublisher.subscribe(buf::writeBytes).thenAccept(v -> cf.complete(buf));
})
.exceptionally(ex -> {
Expand Down

0 comments on commit 12d5d50

Please sign in to comment.