diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionConstants.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionConstants.java new file mode 100644 index 000000000..83d76ff8c --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.stream.s3.compact; + +public class CompactionConstants { + public static final int S3_OBJECT_TTL_MINUTES = 24 * 60; + public static final int S3_OBJECT_MAX_READ_BATCH = 16 * 1024 * 1024; // 16MB +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index d478843ce..2c56b8737 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -26,19 +26,18 @@ import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.objects.CommitWALObjectRequest; import com.automq.stream.s3.objects.ObjectManager; import com.automq.stream.s3.objects.ObjectStreamRange; import com.automq.stream.s3.objects.StreamObject; import com.automq.stream.s3.operator.S3Operator; -import com.automq.stream.s3.operator.Writer; import com.automq.stream.s3.streams.StreamManager; import com.automq.stream.utils.LogContext; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -49,7 +48,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -76,8 +74,7 @@ public class CompactionManager { private final int maxStreamObjectNumPerCommit; private final long networkInboundBandwidth; private final boolean s3ObjectLogEnable; - - private final Semaphore forceSplitLimit = new Semaphore(500 * 1024 * 1024); + private final long compactionCacheSize; public CompactionManager(Config config, ObjectManager objectManager, StreamManager streamManager, S3Operator s3Operator) { String logPrefix = String.format("[CompactionManager id=%d] ", config.brokerId()); @@ -93,7 +90,7 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag this.s3ObjectLogEnable = config.s3ObjectLogEnable(); this.networkInboundBandwidth = config.networkInboundBaselineBandwidth(); this.uploader = new CompactionUploader(objectManager, s3Operator, config); - long compactionCacheSize = config.s3ObjectCompactionCacheSize(); + this.compactionCacheSize = config.s3ObjectCompactionCacheSize(); long streamSplitSize = config.s3ObjectCompactionStreamSplitSize(); maxStreamNumPerWAL = config.s3ObjectMaxStreamNumPerWAL(); maxStreamObjectNumPerCommit = config.s3ObjectMaxStreamObjectNumPerCommit(); @@ -113,8 +110,7 @@ public void start() { logger.info("Compaction started"); long start = System.currentTimeMillis(); this.compact() - .thenAccept(result -> logger.info("Compaction complete, total cost {} ms, result {}", - System.currentTimeMillis() - start, result)) + .thenAccept(result -> logger.info("Compaction complete, total cost {} ms", System.currentTimeMillis() - start)) .exceptionally(ex -> { logger.error("Compaction failed, cost {} ms, ", System.currentTimeMillis() - start, ex); return null; @@ -131,58 +127,94 @@ public void shutdown() { this.uploader.stop(); } - private CompletableFuture compact() { + private CompletableFuture compact() { return this.objectManager.getServerObjects().thenComposeAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) .map(StreamOffsetRange::getStreamId).distinct().toList(); - return this.streamManager.getStreams(streamIds).thenApplyAsync(streamMetadataList -> { - List s3ObjectMetadataList = new ArrayList<>(objectMetadataList); - if (s3ObjectMetadataList.isEmpty()) { - logger.info("No WAL objects to compact"); - return CompactResult.SKIPPED; - } - // sort by S3 object data time in descending order - s3ObjectMetadataList.sort((o1, o2) -> Long.compare(o2.dataTimeInMs(), o1.dataTimeInMs())); - if (maxObjectNumToCompact < s3ObjectMetadataList.size()) { - // compact latest S3 objects first when number of objects to compact exceeds maxObjectNumToCompact - s3ObjectMetadataList = s3ObjectMetadataList.subList(0, maxObjectNumToCompact); - } - return this.compact(streamMetadataList, s3ObjectMetadataList); - }, compactThreadPool); + return this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> + this.compact(streamMetadataList, objectMetadataList), compactThreadPool); }, compactThreadPool); } - private CompactResult compact(List streamMetadataList, List objectMetadataList) { + private void compact(List streamMetadataList, List objectMetadataList) { logger.info("Get {} WAL objects from metadata", objectMetadataList.size()); - long start = System.currentTimeMillis(); - while (true) { - Set excludedObjectIds = new HashSet<>(); - CommitWALObjectRequest request = buildCompactRequest(streamMetadataList, objectMetadataList, excludedObjectIds); + if (objectMetadataList.isEmpty()) { + logger.info("No WAL objects to compact"); + return; + } + Map> objectMetadataFilterMap = convertS3Objects(objectMetadataList); + List objectsToForceSplit = objectMetadataFilterMap.get(true); + List objectsToCompact = objectMetadataFilterMap.get(false); + if (!objectsToForceSplit.isEmpty()) { + // split WAL objects to seperated stream objects + forceSplitObjects(streamMetadataList, objectsToForceSplit); + } + // compact WAL objects + compactObjects(streamMetadataList, objectsToCompact); + } + + private void forceSplitObjects(List streamMetadataList, List objectsToForceSplit) { + logger.info("Force split {} WAL objects", objectsToForceSplit.size()); + TimerUtil timerUtil = new TimerUtil(); + for (int i = 0; i < objectsToForceSplit.size(); i++) { + timerUtil.reset(); + S3ObjectMetadata objectToForceSplit = objectsToForceSplit.get(i); + logger.info("Force split progress {}/{}, splitting object {}, object size {}", i + 1, objectsToForceSplit.size(), + objectToForceSplit.objectId(), objectToForceSplit.objectSize()); + CommitWALObjectRequest request = buildSplitRequest(streamMetadataList, objectToForceSplit); if (request == null) { - return CompactResult.FAILED; - } - if (request.getCompactedObjectIds().isEmpty()) { - logger.info("No need to compact"); - return CompactResult.SKIPPED; - } - logger.info("Build compact request complete, {} objects compacted, WAL object id: {}, size: {}, stream object num: {}, time cost: {} ms, start committing objects" - , request.getCompactedObjectIds().size(), request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), System.currentTimeMillis() - start); - objectManager.commitWALObject(request).thenApply(resp -> { - logger.info("Commit compact request succeed, time cost: {} ms", System.currentTimeMillis() - start); - if (s3ObjectLogEnable) { - s3ObjectLogger.trace("[Compact] {}", request); - } - return CompactResult.SUCCESS; - }).join(); - if (request.getObjectId() == -1 && !excludedObjectIds.isEmpty()) { - // force split objects not complete because of stream object limit, retry force split on excluded objects - logger.info("Force split not complete, retry on excluded objects {}", excludedObjectIds); - objectMetadataList = objectMetadataList.stream().filter(e -> excludedObjectIds.contains(e.objectId())).collect(Collectors.toList()); continue; } - break; + logger.info("Build force split request for object {} complete, generated {} stream objects, time cost: {} ms, start committing objects", + objectToForceSplit.objectId(), request.getStreamObjects().size(), timerUtil.elapsed()); + timerUtil.reset(); + objectManager.commitWALObject(request) + .thenAccept(resp -> { + logger.info("Commit force split request succeed, time cost: {} ms", timerUtil.elapsed()); + if (s3ObjectLogEnable) { + s3ObjectLogger.trace("[Compact] {}", request); + } + }) + .exceptionally(ex -> { + logger.error("Commit force split request failed, ex: ", ex); + return null; + }) + .join(); + } + } + + private void compactObjects(List streamMetadataList, List objectsToCompact) { + // sort by S3 object data time in descending order + objectsToCompact.sort((o1, o2) -> Long.compare(o2.dataTimeInMs(), o1.dataTimeInMs())); + if (maxObjectNumToCompact < objectsToCompact.size()) { + // compact latest S3 objects first when number of objects to compact exceeds maxObjectNumToCompact + objectsToCompact = objectsToCompact.subList(0, maxObjectNumToCompact); + } + logger.info("Compact {} WAL objects", objectsToCompact.size()); + TimerUtil timerUtil = new TimerUtil(); + CommitWALObjectRequest request = buildCompactRequest(streamMetadataList, objectsToCompact); + if (request == null) { + return; } - return CompactResult.SUCCESS; + if (request.getCompactedObjectIds().isEmpty()) { + logger.info("No WAL objects to compact"); + return; + } + logger.info("Build compact request for {} WAL objects complete, WAL object id: {}, WAl object size: {}, stream object num: {}, time cost: {}, start committing objects", + request.getCompactedObjectIds().size(), request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), timerUtil.elapsed()); + timerUtil.reset(); + objectManager.commitWALObject(request) + .thenAccept(resp -> { + logger.info("Commit compact request succeed, time cost: {} ms", timerUtil.elapsed()); + if (s3ObjectLogEnable) { + s3ObjectLogger.trace("[Compact] {}", request); + } + }) + .exceptionally(ex -> { + logger.error("Commit compact request failed, ex: ", ex); + return null; + }) + .join(); } private void logCompactionPlans(List compactionPlans, Set excludedObjectIds) { @@ -216,21 +248,8 @@ public CompletableFuture forceSplitAll() { logger.info("No WAL objects to force split"); return; } - Set excludedObjects = new HashSet<>(); - CompletableFuture forceSplitCf = forceSplitAndCommit(streamMetadataList, objectMetadataList, excludedObjects); - while (!excludedObjects.isEmpty()) { - // try split excluded objects - List excludedObjectMetaList = objectMetadataList.stream() - .filter(e -> excludedObjects.contains(e.objectId())).collect(Collectors.toList()); - forceSplitCf = forceSplitCf.thenCompose(vv -> forceSplitAndCommit(streamMetadataList, excludedObjectMetaList, excludedObjects)); - } - forceSplitCf.whenComplete((vv, ex) -> { - if (ex != null) { - cf.completeExceptionally(ex); - } else { - cf.complete(null); - } - }); + forceSplitObjects(streamMetadataList, objectMetadataList); + cf.complete(null); }, forceSplitThreadPool); }, forceSplitThreadPool).exceptionally(ex -> { logger.error("Error while force split all WAL objects ", ex); @@ -241,186 +260,149 @@ public CompletableFuture forceSplitAll() { return cf; } - private CompletableFuture forceSplitAndCommit(List streams, List objects, Set excludedObjects) { - CompletableFuture cf = new CompletableFuture<>(); - Collection> cfList = splitWALObjects(streams, objects, excludedObjects); - List streamObjects = cfList.stream().map(e -> { - try { - return e.join(); - } catch (Exception ex) { - logger.error("Error while force split object ", ex); - } - return null; - }).toList(); - if (streamObjects.stream().anyMatch(Objects::isNull)) { - logger.error("Force split WAL objects failed"); - cf.completeExceptionally(new RuntimeException("Force split WAL objects failed")); - return cf; - } - CommitWALObjectRequest request = new CommitWALObjectRequest(); - streamObjects.forEach(request::addStreamObject); - request.setCompactedObjectIds(objects.stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); - objectManager.commitWALObject(request).thenAccept(resp -> { - logger.info("Force split {} WAL objects succeed, produce {} stream objects", objects.size(), streamObjects.size()); - if (s3ObjectLogEnable) { - s3ObjectLogger.trace("[ForceSplit] {}", request); - } - cf.complete(null); - }).exceptionally(ex -> { - logger.error("Force split all WAL objects failed", ex); - cf.completeExceptionally(ex); - return null; - }); - return cf; - } - /** - * Split specified WAL objects into stream objects. + * Split specified WAL object into stream objects. * * @param streamMetadataList metadata of opened streams - * @param objectMetadataList WAL objects to split - * @param excludedObjects objects that are excluded from split + * @param objectMetadata WAL object to split * @return List of CompletableFuture of StreamObject */ - Collection> splitWALObjects(List streamMetadataList, - List objectMetadataList, Set excludedObjects) { - if (objectMetadataList.isEmpty()) { + private Collection> splitWALObject(List streamMetadataList, S3ObjectMetadata objectMetadata) { + if (objectMetadata == null) { return new ArrayList<>(); } - //TODO: temp solution, optimize later - // take first object - Set objectIds = objectMetadataList.stream().map(S3ObjectMetadata::objectId).collect(Collectors.toSet()); - objectMetadataList = Collections.singletonList(objectMetadataList.get(0)); + Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, + Collections.singletonList(objectMetadata), s3Operator); + if (streamDataBlocksMap.isEmpty()) { + // object not exist, metadata is out of date + logger.warn("Object {} not exist, metadata is out of date", objectMetadata.objectId()); + return new ArrayList<>(); + } + List streamDataBlocks = streamDataBlocksMap.get(objectMetadata.objectId()); + if (streamDataBlocks.isEmpty()) { + // object is empty, metadata is out of date + logger.warn("Object {} is empty, metadata is out of date", objectMetadata.objectId()); + return new ArrayList<>(); + } - Map> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, objectMetadataList, s3Operator); List, CompletableFuture>> groupedDataBlocks = new ArrayList<>(); - int totalStreamObjectNum = 0; - // set of object ids to be included in split - Set includedObjects = new HashSet<>(); - // list of , each stream object is a list of adjacent stream data blocks - List>>> sortedObjectGroupedStreamDataBlockList = new ArrayList<>(); - for (Map.Entry> entry : streamDataBlocksMap.entrySet()) { - // group continuous stream data blocks, each group will be written to a stream object - List> groupedStreamDataBlocks = CompactionUtils.groupStreamDataBlocks(entry.getValue()); - sortedObjectGroupedStreamDataBlockList.add(new ImmutablePair<>(entry.getKey(), groupedStreamDataBlocks)); + List> groupedStreamDataBlocks = CompactionUtils.groupStreamDataBlocks(streamDataBlocks); + for (List group : groupedStreamDataBlocks) { + groupedDataBlocks.add(new ImmutablePair<>(group, new CompletableFuture<>())); } - // sort WAL object by number of stream objects to be generated, object with less stream objects will be split first - sortedObjectGroupedStreamDataBlockList.sort(Comparator.comparingInt(e -> e.getRight().size())); - for (Pair>> pair : sortedObjectGroupedStreamDataBlockList) { - long objectId = pair.getLeft(); - List> groupedStreamDataBlocks = pair.getRight(); - if (totalStreamObjectNum + groupedStreamDataBlocks.size() > maxStreamObjectNumPerCommit) { - // exceed max stream object number, stop split - break; + logger.info("Force split object {}, expect to generate {} stream objects", objectMetadata.objectId(), groupedDataBlocks.size()); + + int index = 0; + while (index < groupedDataBlocks.size()) { + List, CompletableFuture>> batchGroup = new ArrayList<>(); + long readSize = 0; + while (index < groupedDataBlocks.size()) { + Pair, CompletableFuture> group = groupedDataBlocks.get(index); + List groupedStreamDataBlock = group.getLeft(); + long size = groupedStreamDataBlock.get(groupedStreamDataBlock.size() - 1).getBlockEndPosition() - + groupedStreamDataBlock.get(0).getBlockStartPosition(); + if (readSize + size > compactionCacheSize) { + break; + } + readSize += size; + batchGroup.add(group); + index++; } - for (List streamDataBlocks : groupedStreamDataBlocks) { - groupedDataBlocks.add(new ImmutablePair<>(streamDataBlocks, new CompletableFuture<>())); + if (batchGroup.isEmpty()) { + logger.error("Force split object failed, not be able to read any data block, maybe compactionCacheSize is too small"); + return new ArrayList<>(); } - includedObjects.add(objectId); - totalStreamObjectNum += groupedStreamDataBlocks.size(); - } - // add objects that are excluded from split - excludedObjects.addAll(objectIds.stream().filter(e -> !includedObjects.contains(e)).collect(Collectors.toSet())); - logger.info("Force split {} WAL objects, expect to generate {} stream objects, max stream objects {}, objects excluded: {}", - objectMetadataList.size(), groupedDataBlocks.size(), maxStreamObjectNumPerCommit, excludedObjects); - if (groupedDataBlocks.isEmpty()) { - return new ArrayList<>(); - } - // prepare N stream objects at one time - objectManager.prepareObject(groupedDataBlocks.size(), TimeUnit.MINUTES.toMillis(60)) - .thenAcceptAsync(objectId -> { - for (Pair, CompletableFuture> pair : groupedDataBlocks) { - List streamDataBlocks = pair.getKey(); - DataBlockWriter writer = new DataBlockWriter(objectId, s3Operator, kafkaConfig.s3ObjectPartSize()); - - StreamDataBlock start = streamDataBlocks.get(0); - StreamDataBlock end = streamDataBlocks.get(streamDataBlocks.size() - 1); - final int dataSize = (int) (end.getBlockEndPosition() + end.getBlockSize() - start.getBlockStartPosition()); - if (dataSize < Writer.MIN_PART_SIZE) { - try { - forceSplitLimit.acquire(dataSize); - } catch (InterruptedException ignored) { - + // prepare N stream objects at one time + objectManager.prepareObject(batchGroup.size(), TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)) + .thenComposeAsync(objectId -> { + List blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList(); + DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator); + // batch read + reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkInboundBandwidth)); + + List> cfs = new ArrayList<>(); + for (Pair, CompletableFuture> pair : batchGroup) { + List blocks = pair.getLeft(); + DataBlockWriter writer = new DataBlockWriter(objectId, s3Operator, kafkaConfig.s3ObjectPartSize()); + for (StreamDataBlock block : blocks) { + writer.write(block); } + long finalObjectId = objectId; + cfs.add(writer.close().thenAccept(v -> { + StreamObject streamObject = new StreamObject(); + streamObject.setObjectId(finalObjectId); + streamObject.setStreamId(blocks.get(0).getStreamId()); + streamObject.setStartOffset(blocks.get(0).getStartOffset()); + streamObject.setEndOffset(blocks.get(blocks.size() - 1).getEndOffset()); + streamObject.setObjectSize(writer.size()); + pair.getValue().complete(streamObject); + })); + objectId++; } - - writer.copyWrite(streamDataBlocks); - final long objectIdFinal = objectId; - writer.close().thenAccept(v -> { - StreamObject streamObject = new StreamObject(); - streamObject.setObjectId(objectIdFinal); - streamObject.setStreamId(streamDataBlocks.get(0).getStreamId()); - streamObject.setStartOffset(streamDataBlocks.get(0).getStartOffset()); - streamObject.setEndOffset(streamDataBlocks.get(streamDataBlocks.size() - 1).getEndOffset()); - streamObject.setObjectSize(writer.size()); - pair.getValue().complete(streamObject); - if (dataSize < Writer.MIN_PART_SIZE) { - forceSplitLimit.release(dataSize); - } - }); - objectId++; - } - }, forceSplitThreadPool).exceptionally(ex -> { - logger.error("Prepare object failed", ex); - for (Pair, CompletableFuture> pair : groupedDataBlocks) { - pair.getValue().completeExceptionally(ex); - } - return null; - }); + return CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])); + }, forceSplitThreadPool) + .exceptionally(ex -> { + //TODO: clean up buffer + logger.error("Force split object failed", ex); + for (Pair, CompletableFuture> pair : groupedDataBlocks) { + pair.getValue().completeExceptionally(ex); + } + return null; + }).join(); + } return groupedDataBlocks.stream().map(Pair::getValue).collect(Collectors.toList()); } - CommitWALObjectRequest buildCompactRequest(List streamMetadataList, List s3ObjectMetadata, Set excludedObjectIds) { - Map> objectMetadataFilterMap = convertS3Objects(s3ObjectMetadata); - List objectsToSplit = objectMetadataFilterMap.get(true); - List objectsToCompact = objectMetadataFilterMap.get(false); - // force split objects that exists for too long - logger.info("{} WAL objects to be force split, total split size {}", objectsToSplit.size(), - objectMetadataFilterMap.get(true).stream().mapToLong(S3ObjectMetadata::objectSize).sum()); - Collection> forceSplitCfs = splitWALObjects(streamMetadataList, objectsToSplit, excludedObjectIds); + CommitWALObjectRequest buildSplitRequest(List streamMetadataList, S3ObjectMetadata objectToSplit) { + Collection> cfs = splitWALObject(streamMetadataList, objectToSplit); + if (cfs.isEmpty()) { + logger.error("Force split object {} failed, no stream object generated", objectToSplit.objectId()); + return null; + } CommitWALObjectRequest request = new CommitWALObjectRequest(); request.setObjectId(-1L); - List compactionPlans = new ArrayList<>(); - if (excludedObjectIds.isEmpty()) { - // compact WAL objects only when compaction limitations are not violated after force split - try { - logger.info("{} WAL objects as compact candidates, total compaction size: {}", - objectsToCompact.size(), objectsToCompact.stream().mapToLong(S3ObjectMetadata::objectSize).sum()); - Map> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, objectsToCompact, s3Operator); - long now = System.currentTimeMillis(); - compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, excludedObjectIds); - logger.info("Analyze compaction plans complete, cost {}ms", System.currentTimeMillis() - now); - logCompactionPlans(compactionPlans, excludedObjectIds); - objectsToCompact = objectsToCompact.stream().filter(e -> !excludedObjectIds.contains(e.objectId())).collect(Collectors.toList()); - compactWALObjects(request, compactionPlans, objectsToCompact); - } catch (Exception e) { - logger.error("Error while compacting objects ", e); - } - } - - forceSplitCfs.stream().map(e -> { + // wait for all force split objects to complete + cfs.stream().map(e -> { try { return e.join(); - } catch (Exception ex) { - logger.error("Force split StreamObject failed ", ex); + } catch (Exception ignored) { return null; } }).filter(Objects::nonNull).forEach(request::addStreamObject); + request.setCompactedObjectIds(Collections.singletonList(objectToSplit.objectId())); + if (!sanityCheckCompactionResult(streamMetadataList, Collections.singletonList(objectToSplit), request)) { + logger.error("Sanity check failed, force split result is illegal"); + return null; + } + + return request; + } + + CommitWALObjectRequest buildCompactRequest(List streamMetadataList, List objectsToCompact) { + CommitWALObjectRequest request = new CommitWALObjectRequest(); + Set compactedObjectIds = new HashSet<>(); - objectMetadataFilterMap.get(true).forEach(e -> { - if (!excludedObjectIds.contains(e.objectId())) { - compactedObjectIds.add(e.objectId()); - } - }); + logger.info("{} WAL objects as compact candidates, total compaction size: {}", + objectsToCompact.size(), objectsToCompact.stream().mapToLong(S3ObjectMetadata::objectSize).sum()); + Map> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, objectsToCompact, s3Operator); + long now = System.currentTimeMillis(); + Set excludedObjectIds = new HashSet<>(); + List compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, excludedObjectIds); + logger.info("Analyze compaction plans complete, cost {}ms", System.currentTimeMillis() - now); + logCompactionPlans(compactionPlans, excludedObjectIds); + objectsToCompact = objectsToCompact.stream().filter(e -> !excludedObjectIds.contains(e.objectId())).collect(Collectors.toList()); + executeCompactionPlans(request, compactionPlans, objectsToCompact); compactionPlans.forEach(c -> c.streamDataBlocksMap().values().forEach(v -> v.forEach(b -> compactedObjectIds.add(b.getObjectId())))); - request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds)); - if (!sanityCheckCompactionResult(streamMetadataList, objectsToSplit, objectsToCompact, request)) { + request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds)); + List compactedObjectMetadata = objectsToCompact.stream() + .filter(e -> compactedObjectIds.contains(e.objectId())).toList(); + if (!sanityCheckCompactionResult(streamMetadataList, compactedObjectMetadata, request)) { logger.error("Sanity check failed, compaction result is illegal"); return null; } @@ -428,14 +410,12 @@ CommitWALObjectRequest buildCompactRequest(List streamMetadataLi return request; } - boolean sanityCheckCompactionResult(List streamMetadataList, List objectsToSplit, - List objectsToCompact, CommitWALObjectRequest request) { + boolean sanityCheckCompactionResult(List streamMetadataList, List compactedObjects, + CommitWALObjectRequest request) { Map streamMetadataMap = streamMetadataList.stream() .collect(Collectors.toMap(StreamMetadata::getStreamId, e -> e)); - Map objectMetadataMap = objectsToCompact.stream() + Map objectMetadataMap = compactedObjects.stream() .collect(Collectors.toMap(S3ObjectMetadata::objectId, e -> e)); - objectMetadataMap.putAll(objectsToSplit.stream() - .collect(Collectors.toMap(S3ObjectMetadata::objectId, e -> e))); List compactedStreamOffsetRanges = new ArrayList<>(); request.getStreamRanges().forEach(o -> compactedStreamOffsetRanges.add(new StreamOffsetRange(o.getStreamId(), o.getStartOffset(), o.getEndOffset()))); @@ -506,7 +486,7 @@ Map> convertS3Objects(List s3W >= TimeUnit.MINUTES.toMillis(this.forceSplitObjectPeriod)))); } - void compactWALObjects(CommitWALObjectRequest request, List compactionPlans, List s3ObjectMetadata) + void executeCompactionPlans(CommitWALObjectRequest request, List compactionPlans, List s3ObjectMetadata) throws IllegalArgumentException { if (compactionPlans.isEmpty()) { return; @@ -546,6 +526,7 @@ void compactWALObjects(CommitWALObjectRequest request, List comp } streamObjectCFList.stream().map(CompletableFuture::join).forEach(request::addStreamObject); } catch (Exception ex) { + //TODO: clean up buffer logger.error("Error while uploading compaction objects", ex); uploader.reset(); throw new IllegalArgumentException("Error while uploading compaction objects", ex); diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java index 5dccc949e..d675209b1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionUploader.java @@ -80,7 +80,7 @@ public CompletableFuture chainWriteWALObject(CompletableFuture prev, private CompletableFuture prepareObjectAndWrite(CompactedObject compactedObject) { if (walObjectIdCf == null) { - walObjectIdCf = this.objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)); + walObjectIdCf = this.objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)); } return walObjectIdCf.thenAcceptAsync(objectId -> { if (walObjectWriter == null) { @@ -103,7 +103,7 @@ public CompletableFuture writeStreamObject(CompactedObject compact .stream() .map(StreamDataBlock::getDataCf) .toArray(CompletableFuture[]::new)) - .thenComposeAsync(v -> objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(60)) + .thenComposeAsync(v -> objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)) .thenComposeAsync(objectId -> { DataBlockWriter dataBlockWriter = new DataBlockWriter(objectId, s3Operator, kafkaConfig.s3ObjectPartSize()); for (StreamDataBlock streamDataBlock : compactedObject.streamDataBlocks()) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 39522dd53..97341f107 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -76,7 +76,7 @@ public void readBlocks(List streamDataBlocks) { readBlocks(streamDataBlocks, -1); } - public void readBlocks(List streamDataBlocks, long networkInboundBandwidth) { + public void readBlocks(List streamDataBlocks, long maxReadBatchSize) { if (streamDataBlocks.isEmpty()) { return; } @@ -86,20 +86,20 @@ public void readBlocks(List streamDataBlocks, long networkInbou // split streamDataBlocks to blocks with continuous offset while (end < streamDataBlocks.size()) { if (offset != -1 && streamDataBlocks.get(end).getBlockStartPosition() != offset) { - readContinuousBlocks(streamDataBlocks.subList(start, end), networkInboundBandwidth); + readContinuousBlocks(streamDataBlocks.subList(start, end), maxReadBatchSize); start = end; } offset = streamDataBlocks.get(end).getBlockEndPosition(); end++; } if (end > start) { - readContinuousBlocks(streamDataBlocks.subList(start, end), networkInboundBandwidth); + readContinuousBlocks(streamDataBlocks.subList(start, end), maxReadBatchSize); } } - public void readContinuousBlocks(List streamDataBlocks, long networkInboundBandwidth) { + public void readContinuousBlocks(List streamDataBlocks, long maxReadBatchSize) { long objectId = metadata.objectId(); - if (networkInboundBandwidth <= 0) { + if (maxReadBatchSize <= 0) { readContinuousBlocks0(streamDataBlocks); return; } @@ -109,7 +109,7 @@ public void readContinuousBlocks(List streamDataBlocks, long ne int end = 0; while (end < streamDataBlocks.size()) { currentReadSize += streamDataBlocks.get(end).getBlockSize(); - if (currentReadSize >= networkInboundBandwidth) { + if (currentReadSize >= maxReadBatchSize) { final int finalStart = start; if (start == end) { // split single data block to multiple read @@ -120,7 +120,7 @@ public void readContinuousBlocks(List streamDataBlocks, long ne Map bufferMap = new ConcurrentHashMap<>(); int cnt = 0; while (remainBytes > 0) { - long readSize = Math.min(remainBytes, networkInboundBandwidth); + long readSize = Math.min(remainBytes, maxReadBatchSize); endPosition = startPosition + readSize; final int finalCnt = cnt; cfList.add(s3Operator.rangeRead(objectKey, startPosition, endPosition, ThrottleStrategy.THROTTLE) diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index 9cfdb05f4..5258f69be 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -96,67 +96,50 @@ public void testForceSplit() { when(config.s3ObjectCompactionForceSplitPeriod()).thenReturn(0); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); - Set excludedIds = new HashSet<>(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedIds); + CommitWALObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0)); Assertions.assertEquals(-1, request.getObjectId()); - Assertions.assertEquals(Set.of(OBJECT_1, OBJECT_2), excludedIds); Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds()); Assertions.assertEquals(3, request.getStreamObjects().size()); Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request)); - - s3ObjectMetadata = s3ObjectMetadata.stream().filter(e -> excludedIds.contains(e.objectId())).collect(Collectors.toList()); - excludedIds.clear(); - request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedIds); + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1)); Assertions.assertEquals(-1, request.getObjectId()); - Assertions.assertEquals(Set.of(OBJECT_2), excludedIds); Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds()); Assertions.assertEquals(2, request.getStreamObjects().size()); - Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request)); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(1)), request)); - s3ObjectMetadata = s3ObjectMetadata.stream().filter(e -> excludedIds.contains(e.objectId())).collect(Collectors.toList()); - excludedIds.clear(); - request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedIds); + request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2)); Assertions.assertEquals(-1, request.getObjectId()); - Assertions.assertTrue(excludedIds.isEmpty()); Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds()); Assertions.assertEquals(2, request.getStreamObjects().size()); - Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request)); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request)); } @Test - @Disabled public void testForceSplitWithLimit() { - when(config.s3ObjectMaxStreamObjectNumPerCommit()).thenReturn(3); + when(config.s3ObjectCompactionCacheSize()).thenReturn(5L); List s3ObjectMetadata = this.objectManager.getServerObjects().join(); when(config.s3ObjectCompactionForceSplitPeriod()).thenReturn(0); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); - Set excludedObjects = new HashSet<>(); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedObjects); - Assertions.assertEquals(-1, request.getObjectId()); - Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds()); - Assertions.assertEquals(2, request.getStreamObjects().size()); - Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata.stream().filter(s -> !excludedObjects.contains(s.objectId())).toList(), request)); - s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> excludedObjects.contains(s.objectId())).toList(); + CommitWALObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0)); + Assertions.assertNull(request); + } - excludedObjects.clear(); - request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedObjects); - Assertions.assertEquals(-1, request.getObjectId()); - Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds()); - Assertions.assertEquals(2, request.getStreamObjects().size()); - Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata.stream().filter(s -> !excludedObjects.contains(s.objectId())).toList(), request)); - s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> excludedObjects.contains(s.objectId())).toList(); + @Test + public void testForceSplitWithLimit2() { + when(config.s3ObjectCompactionCacheSize()).thenReturn(150L); + List s3ObjectMetadata = this.objectManager.getServerObjects().join(); + when(config.s3ObjectCompactionForceSplitPeriod()).thenReturn(0); + compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); - excludedObjects.clear(); - request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedObjects); + List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); + CommitWALObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0)); Assertions.assertEquals(-1, request.getObjectId()); Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds()); Assertions.assertEquals(3, request.getStreamObjects().size()); - Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata.stream().filter(s -> !excludedObjects.contains(s.objectId())).toList(), request)); - - Assertions.assertTrue(excludedObjects.isEmpty()); + Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request)); } @Test @@ -164,7 +147,7 @@ public void testCompact() { List s3ObjectMetadata = this.objectManager.getServerObjects().join(); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, new HashSet<>()); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata); assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds()); assertEquals(OBJECT_0, request.getOrderId()); @@ -184,7 +167,7 @@ public void testCompactWithDataTrimmed() { new StreamMetadata(STREAM_2, 0, 30, 270, StreamState.OPENED)))); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, new HashSet<>()); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST); assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds()); assertEquals(OBJECT_0, request.getOrderId()); @@ -204,7 +187,7 @@ public void testCompactWithDataTrimmed2() { new StreamMetadata(STREAM_2, 0, 30, 270, StreamState.OPENED)))); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, new HashSet<>()); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST); assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds()); assertEquals(OBJECT_0, request.getOrderId()); @@ -223,7 +206,7 @@ public void testCompactWithNonExistStream() { new StreamMetadata(STREAM_2, 0, 30, 270, StreamState.OPENED)))); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, new HashSet<>()); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST); Set streamIds = request.getStreamObjects().stream().map(StreamObject::getStreamId).collect(Collectors.toSet()); streamIds.addAll(request.getStreamRanges().stream().map(ObjectStreamRange::getStreamId).collect(Collectors.toSet())); @@ -248,7 +231,7 @@ public void testCompactNoneExistObjects() { s3Operator.delete(s3ObjectMetadata.get(0).key()).join(); CommitWALObjectRequest request = new CommitWALObjectRequest(); Assertions.assertThrowsExactly(IllegalArgumentException.class, - () -> compactionManager.compactWALObjects(request, compactionPlans, s3ObjectMetadata)); + () -> compactionManager.executeCompactionPlans(request, compactionPlans, s3ObjectMetadata)); } private void testCompactWithNWInThrottle(long networkInThreshold) { @@ -261,7 +244,7 @@ private void testCompactWithNWInThrottle(long networkInThreshold) { System.out.printf("Expect to complete no less than %ds%n", expectedMinCompleteTime); long start = System.currentTimeMillis(); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, new HashSet<>()); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata); long cost = System.currentTimeMillis() - start; System.out.printf("Cost %ds%n", cost / 1000); assertTrue(cost > expectedMinCompleteTime * 1000); @@ -283,9 +266,8 @@ public void testCompactWithLimit() { when(config.s3ObjectMaxStreamObjectNumPerCommit()).thenReturn(2); List s3ObjectMetadata = this.objectManager.getServerObjects().join(); compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator); - Set excludedObjects = new HashSet<>(); List streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join(); - CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata, excludedObjects); + CommitWALObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadata); assertEquals(List.of(OBJECT_0, OBJECT_1), request.getCompactedObjectIds()); assertEquals(OBJECT_0, request.getOrderId()); @@ -294,7 +276,8 @@ public void testCompactWithLimit() { assertEquals(2, request.getStreamObjects().size()); assertEquals(1, request.getStreamRanges().size()); - s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> !excludedObjects.contains(s.objectId())).toList(); + Set compactedObjectIds = new HashSet<>(request.getCompactedObjectIds()); + s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).toList(); Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request)); }