Skip to content

Commit

Permalink
feat(s3stream): add compaction throttle to flatten load peak (#595)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Nov 7, 2023
1 parent d7eb6b7 commit 960631c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 17 deletions.
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.RecoverRe
long startOffset = records.get(0).getBaseOffset();
long expectedStartOffset = openingStreamEndOffsets.getOrDefault(streamId, startOffset);
if (startOffset > expectedStartOffset) {
throw new IllegalStateException(String.format("[BUG] WAL data may lost, streamId %s endOffset=%s from controller" +
throw new IllegalStateException(String.format("[BUG] WAL data may lost, streamId %s endOffset=%s from controller, " +
"but WAL recovered records startOffset=%s", streamId, expectedStartOffset, startOffset));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.LogContext;
import io.github.bucket4j.Bucket;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -75,6 +78,7 @@ public class CompactionManager {
private final long networkBandwidth;
private final boolean s3ObjectLogEnable;
private final long compactionCacheSize;
private Bucket compactionBucket = null;

public CompactionManager(Config config, ObjectManager objectManager, StreamManager streamManager, S3Operator s3Operator) {
String logPrefix = String.format("[CompactionManager id=%d] ", config.brokerId());
Expand Down Expand Up @@ -105,21 +109,28 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag
}

public void start() {
this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
scheduleNextCompaction((long) this.compactionInterval * 60 * 1000);
}

private void scheduleNextCompaction(long delayMillis) {
logger.info("Next Compaction started in {} ms", delayMillis);
this.scheduledExecutorService.schedule(() -> {
TimerUtil timerUtil = new TimerUtil(TimeUnit.MILLISECONDS);
try {
logger.info("Compaction started");
long start = System.currentTimeMillis();
this.compact()
.thenAccept(result -> logger.info("Compaction complete, total cost {} ms", System.currentTimeMillis() - start))
.thenAccept(result -> logger.info("Compaction complete, total cost {} ms", timerUtil.elapsed()))
.exceptionally(ex -> {
logger.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex);
logger.error("Compaction failed, cost {} ms, ", timerUtil.elapsed(), ex);
return null;
})
.join();
} catch (Exception ex) {
logger.error("Error while compacting objects ", ex);
}
}, 1, this.compactionInterval, TimeUnit.MINUTES);
long nextDelay = Math.max(0, (long) this.compactionInterval * 60 * 1000 - timerUtil.elapsed());
scheduleNextCompaction(nextDelay);
}, delayMillis, TimeUnit.MILLISECONDS);
}

public void shutdown() {
Expand All @@ -139,12 +150,21 @@ private CompletableFuture<Void> compact() {
private void compact(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectMetadataList) {
logger.info("Get {} SST objects from metadata", objectMetadataList.size());
if (objectMetadataList.isEmpty()) {
logger.info("No SST objects to compact");
return;
}
Map<Boolean, List<S3ObjectMetadata>> objectMetadataFilterMap = convertS3Objects(objectMetadataList);
List<S3ObjectMetadata> objectsToForceSplit = objectMetadataFilterMap.get(true);
List<S3ObjectMetadata> objectsToCompact = objectMetadataFilterMap.get(false);

long totalSize = objectsToForceSplit.stream().mapToLong(S3ObjectMetadata::objectSize).sum();
totalSize += objectsToCompact.stream().mapToLong(S3ObjectMetadata::objectSize).sum();
long expectReadBytesPerSec = totalSize / compactionInterval / 60;
compactionBucket = Bucket.builder().addLimit(limit -> limit
.capacity(expectReadBytesPerSec)
.refillIntervally(expectReadBytesPerSec, Duration.ofSeconds(1))).build();
logger.info("Throttle compaction read to {} bytes/s, expect to complete in no less than {}min",
expectReadBytesPerSec, compactionInterval);

if (!objectsToForceSplit.isEmpty()) {
// split SST objects to seperated stream objects
forceSplitObjects(streamMetadataList, objectsToForceSplit);
Expand Down Expand Up @@ -184,6 +204,9 @@ private void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3O
}

private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToCompact) {
if (objectsToCompact.isEmpty()) {
return;
}
// sort by S3 object data time in descending order
objectsToCompact.sort((o1, o2) -> Long.compare(o2.dataTimeInMs(), o1.dataTimeInMs()));
if (maxObjectNumToCompact < objectsToCompact.size()) {
Expand Down Expand Up @@ -318,7 +341,7 @@ private Collection<CompletableFuture<StreamObject>> splitSSTObject(List<StreamMe
objectManager.prepareObject(batchGroup.size(), TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES))
.thenComposeAsync(objectId -> {
List<StreamDataBlock> blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList();
DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator);
DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket);
// batch read
reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth));

Expand Down Expand Up @@ -505,7 +528,7 @@ void executeCompactionPlans(CommitSSTObjectRequest request, List<CompactionPlan>
for (Map.Entry<Long, List<StreamDataBlock>> streamDataBlocEntry : compactionPlan.streamDataBlocksMap().entrySet()) {
S3ObjectMetadata metadata = s3ObjectMetadataMap.get(streamDataBlocEntry.getKey());
List<StreamDataBlock> streamDataBlocks = streamDataBlocEntry.getValue();
DataBlockReader reader = new DataBlockReader(metadata, s3Operator);
DataBlockReader reader = new DataBlockReader(metadata, s3Operator, compactionBucket);
reader.readBlocks(streamDataBlocks, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth));
}
List<CompletableFuture<StreamObject>> streamObjectCFList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.automq.stream.s3.compact.objects.StreamDataBlock;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import io.github.bucket4j.Bucket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
Expand All @@ -32,6 +35,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;

//TODO: refactor to reduce duplicate code with ObjectWriter
public class DataBlockReader {
Expand All @@ -40,11 +44,19 @@ public class DataBlockReader {
private final String objectKey;
private final S3Operator s3Operator;
private final CompletableFuture<List<StreamDataBlock>> indexBlockCf = new CompletableFuture<>();
private final Bucket throttleBucket;
private final ScheduledExecutorService bucketCbExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", false), LOGGER);

public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator) {
this(metadata, s3Operator, null);
}

public DataBlockReader(S3ObjectMetadata metadata, S3Operator s3Operator, Bucket throttleBucket) {
this.metadata = metadata;
this.objectKey = metadata.key();
this.s3Operator = s3Operator;
this.throttleBucket = throttleBucket;
}

public CompletableFuture<List<StreamDataBlock>> getDataBlockIndex() {
Expand Down Expand Up @@ -123,9 +135,7 @@ public void readContinuousBlocks(List<StreamDataBlock> streamDataBlocks, long ma
long readSize = Math.min(remainBytes, maxReadBatchSize);
endPosition = startPosition + readSize;
final int finalCnt = cnt;
cfList.add(s3Operator.rangeRead(objectKey, startPosition, endPosition, ThrottleStrategy.THROTTLE)
.thenAccept(buf -> bufferMap.put(finalCnt, buf))
);
cfList.add(rangeRead(startPosition, endPosition).thenAccept(buf -> bufferMap.put(finalCnt, buf)));
remainBytes -= readSize;
startPosition += readSize;
cnt++;
Expand Down Expand Up @@ -162,9 +172,8 @@ public void readContinuousBlocks(List<StreamDataBlock> streamDataBlocks, long ma
}

private void readContinuousBlocks0(List<StreamDataBlock> streamDataBlocks) {
s3Operator.rangeRead(objectKey,
streamDataBlocks.get(0).getBlockStartPosition(),
streamDataBlocks.get(streamDataBlocks.size() - 1).getBlockEndPosition(), ThrottleStrategy.THROTTLE)
rangeRead(streamDataBlocks.get(0).getBlockStartPosition(),
streamDataBlocks.get(streamDataBlocks.size() - 1).getBlockEndPosition())
.thenAccept(buf -> parseDataBlocks(buf, streamDataBlocks))
.exceptionally(ex -> {
LOGGER.error("read data from object {} failed", metadata.objectId(), ex);
Expand All @@ -173,6 +182,15 @@ private void readContinuousBlocks0(List<StreamDataBlock> streamDataBlocks) {
});
}

private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end);
} else {
return throttleBucket.asScheduler().consume(end - start + 1, bucketCbExecutor)
.thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE));
}
}

private void parseDataBlocks(ByteBuf buf, List<StreamDataBlock> streamDataBlocks) {
for (StreamDataBlock streamDataBlock : streamDataBlocks) {
int blockSize = streamDataBlock.getBlockSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,13 @@ private static boolean isUnrecoverable(Throwable ex) {
private void checkConfig() {
if (this.networkInboundBandwidthLimiter != null) {
if (this.networkInboundBandwidthLimiter.getMaxTokens() < Writer.MIN_PART_SIZE) {
throw new IllegalArgumentException(String.format("Network inbound bandwidth limit %d must be no less than min part size %d",
throw new IllegalArgumentException(String.format("Network inbound burst bandwidth limit %d must be no less than min part size %d",
this.networkInboundBandwidthLimiter.getMaxTokens(), Writer.MIN_PART_SIZE));
}
}
if (this.networkOutboundBandwidthLimiter != null) {
if (this.networkOutboundBandwidthLimiter.getMaxTokens() < Writer.MIN_PART_SIZE) {
throw new IllegalArgumentException(String.format("Network outbound bandwidth limit %d must be no less than min part size %d",
throw new IllegalArgumentException(String.format("Network outbound burst bandwidth limit %d must be no less than min part size %d",
this.networkOutboundBandwidthLimiter.getMaxTokens(), Writer.MIN_PART_SIZE));
}
}
Expand Down

0 comments on commit 960631c

Please sign in to comment.