Skip to content

Commit

Permalink
fix(s3stream): gracefully shutdown compaction manager
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Feb 19, 2024
1 parent 5b1cd92 commit 4ffb596
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ public void shutdown() {
}
streamObjectCompactionScheduler.shutdown();
try {
if (streamObjectCompactionScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
if (!streamObjectCompactionScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.warn("await streamObjectCompactionExecutor timeout 10s");
streamObjectCompactionScheduler.shutdownNow();
}
} catch (InterruptedException e) {
streamObjectCompactionScheduler.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -63,7 +65,7 @@ public class CompactionManager {
private final StreamManager streamManager;
private final S3Operator s3Operator;
private final CompactionAnalyzer compactionAnalyzer;
private final ScheduledExecutorService compactScheduledExecutor;
private final ScheduledExecutorService compactionScheduledExecutor;
private final ScheduledExecutorService bucketCallbackScheduledExecutor;
private final ExecutorService compactThreadPool;
private final ExecutorService forceSplitThreadPool;
Expand All @@ -77,6 +79,9 @@ public class CompactionManager {
private final long networkBandwidth;
private final boolean s3ObjectLogEnable;
private final long compactionCacheSize;
private final AtomicBoolean running = new AtomicBoolean(false);
private volatile CompletableFuture<Void> forceSplitCf = null;
private volatile CompletableFuture<Void> compactionCf = null;
private Bucket compactionBucket = null;

public CompactionManager(Config config, ObjectManager objectManager, StreamManager streamManager,
Expand All @@ -100,12 +105,13 @@ public CompactionManager(Config config, ObjectManager objectManager, StreamManag
maxStreamObjectNumPerCommit = config.maxStreamObjectNumPerCommit();
this.compactionAnalyzer = new CompactionAnalyzer(compactionCacheSize, streamSplitSize, maxStreamNumPerStreamSetObject,
maxStreamObjectNumPerCommit, new LogContext(String.format("[CompactionAnalyzer id=%d] ", config.nodeId())));
this.compactScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger);
this.compactionScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("schedule-compact-executor-%d", true), logger, true, false);
this.bucketCallbackScheduledExecutor = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), logger);
ThreadUtils.createThreadFactory("s3-data-block-reader-bucket-cb-%d", true), logger, true, false);
this.compactThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("object-compaction-manager"));
this.forceSplitThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("force-split-executor"));
this.running.set(true);
this.logger.info("Compaction manager initialized with config: compactionInterval: {} min, compactionCacheSize: {} bytes, " +
"streamSplitSize: {} bytes, forceSplitObjectPeriod: {} min, maxObjectNumToCompact: {}, maxStreamNumInStreamSet: {}, maxStreamObjectNum: {}",
compactionInterval, compactionCacheSize, streamSplitSize, forceSplitObjectPeriod, maxObjectNumToCompact, maxStreamNumPerStreamSetObject, maxStreamObjectNumPerCommit);
Expand All @@ -115,9 +121,13 @@ public void start() {
scheduleNextCompaction((long) this.compactionInterval * 60 * 1000);
}

private void scheduleNextCompaction(long delayMillis) {
void scheduleNextCompaction(long delayMillis) {
if (!running.get()) {
logger.info("Compaction manager is shutdown, skip scheduling next compaction");
return;
}
logger.info("Next Compaction started in {} ms", delayMillis);
this.compactScheduledExecutor.schedule(() -> {
this.compactionScheduledExecutor.schedule(() -> {
TimerUtil timerUtil = new TimerUtil();
try {
logger.info("Compaction started");
Expand All @@ -136,9 +146,37 @@ private void scheduleNextCompaction(long delayMillis) {
}

public void shutdown() {
this.compactScheduledExecutor.shutdown();
if (!running.compareAndSet(true, false)) {
logger.warn("Compaction manager is already shutdown");
return;
}
logger.info("Shutting down compaction manager");
synchronized (this) {
if (forceSplitCf != null) {
// prevent block-waiting for force splitting objects
forceSplitCf.cancel(true);
}
if (compactionCf != null) {
// prevent block-waiting for uploading compacted objects
compactionCf.cancel(true);
}
}
this.compactionScheduledExecutor.shutdown();
try {
if (!this.compactionScheduledExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
this.compactionScheduledExecutor.shutdownNow();
}
} catch (InterruptedException ignored) {
}
this.bucketCallbackScheduledExecutor.shutdown();
try {
if (!this.bucketCallbackScheduledExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
this.bucketCallbackScheduledExecutor.shutdownNow();
}
} catch (InterruptedException ignored) {
}
this.uploader.shutdown();
logger.info("Compaction manager shutdown complete");
}

public CompletableFuture<Void> compact() {
Expand All @@ -152,6 +190,10 @@ public CompletableFuture<Void> compact() {

private void compact(List<StreamMetadata> streamMetadataList,
List<S3ObjectMetadata> objectMetadataList) throws CompletionException {
if (!running.get()) {
logger.info("Compaction manager is shutdown, skip compaction");
return;
}
logger.info("Get {} stream set objects from metadata", objectMetadataList.size());
if (objectMetadataList.isEmpty()) {
return;
Expand Down Expand Up @@ -188,6 +230,10 @@ void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMet
logger.info("Force split {} stream set objects", objectsToForceSplit.size());
TimerUtil timerUtil = new TimerUtil();
for (int i = 0; i < objectsToForceSplit.size(); i++) {
if (!running.get()) {
logger.info("Compaction manager is shutdown, abort force split progress");
return;
}
timerUtil.reset();
S3ObjectMetadata objectToForceSplit = objectsToForceSplit.get(i);
logger.info("Force split progress {}/{}, splitting object {}, object size {}", i + 1, objectsToForceSplit.size(),
Expand Down Expand Up @@ -222,6 +268,10 @@ void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMet

private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToCompact)
throws CompletionException {
if (!running.get()) {
logger.info("Compaction manager is shutdown, skip compacting objects");
return;
}
if (objectsToCompact.isEmpty()) {
return;
}
Expand All @@ -234,6 +284,10 @@ private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3Obje
logger.info("Compact {} stream set objects", objectsToCompact.size());
TimerUtil timerUtil = new TimerUtil();
CommitStreamSetObjectRequest request = buildCompactRequest(streamMetadataList, objectsToCompact);
if (!running.get()) {
logger.info("Compaction manager is shutdown, skip committing compaction request");
return;
}
if (request == null) {
return;
}
Expand Down Expand Up @@ -282,7 +336,7 @@ private void logCompactionPlans(List<CompactionPlan> compactionPlans, Set<Long>
public CompletableFuture<Void> forceSplitAll() {
CompletableFuture<Void> cf = new CompletableFuture<>();
//TODO: deal with metadata delay
this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
this.compactionScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> {
List<Long> streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream())
.map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList());
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
Expand Down Expand Up @@ -408,8 +462,7 @@ Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3Obje
}

CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectToSplit)
throws CompletionException {
S3ObjectMetadata objectToSplit) throws CompletionException {
List<CompletableFuture<StreamObject>> cfs = new ArrayList<>();
boolean status = splitStreamSetObject(streamMetadataList, objectToSplit, cfs);
if (!status) {
Expand All @@ -421,6 +474,20 @@ CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetada
request.setObjectId(-1L);

// wait for all force split objects to complete
synchronized (this) {
if (!running.get()) {
logger.info("Compaction manager is shutdown, skip waiting for force splitting objects");
return null;
}
forceSplitCf = CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0]));
}
try {
forceSplitCf.join();
} catch (CancellationException exception) {
logger.info("Force split objects cancelled");
return null;
}
forceSplitCf = null;
cfs.stream().map(e -> {
try {
return e.join();
Expand Down Expand Up @@ -463,6 +530,12 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta
logCompactionPlans(compactionPlans, excludedObjectIds);
objectsToCompact = objectsToCompact.stream().filter(e -> !excludedObjectIds.contains(e.objectId())).collect(Collectors.toList());
executeCompactionPlans(request, compactionPlans, objectsToCompact);

if (!running.get()) {
logger.info("Compaction manager is shutdown, skip constructing compaction request");
return null;
}

compactionPlans.forEach(c -> c.streamDataBlocksMap().values().forEach(v -> v.forEach(b -> compactedObjectIds.add(b.getObjectId()))));

// compact out-dated objects directly
Expand Down Expand Up @@ -576,6 +649,10 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
.collect(Collectors.toMap(S3ObjectMetadata::objectId, e -> e));
List<StreamDataBlock> sortedStreamDataBlocks = new ArrayList<>();
for (int i = 0; i < compactionPlans.size(); i++) {
if (!running.get()) {
logger.info("Compaction manager is shutdown, abort compaction progress");
return;
}
// iterate over each compaction plan
CompactionPlan compactionPlan = compactionPlans.get(i);
long totalSize = compactionPlan.streamDataBlocksMap().values().stream().flatMap(List::stream)
Expand All @@ -602,18 +679,31 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
List<CompletableFuture<?>> cfList = new ArrayList<>();
cfList.add(streamSetObjectChainWriteCf);
cfList.addAll(streamObjectCfList);
// wait for all stream objects and stream set object part to be uploaded
CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.thenAccept(v -> uploader.forceUploadStreamSetObject())
.exceptionally(ex -> {
logger.error("Error while uploading compaction objects", ex);
uploader.release().thenAccept(v -> {
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
}).join();
throw new IllegalStateException("Error while uploading compaction objects", ex);
}).join();
synchronized (this) {
if (!running.get()) {
logger.info("Compaction manager is shutdown, skip waiting for uploading objects");
return;
}
// wait for all stream objects and stream set object part to be uploaded
compactionCf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.thenAccept(v -> uploader.forceUploadStreamSetObject())
.exceptionally(ex -> {
uploader.release().thenAccept(v -> {
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
}).join();
throw new IllegalStateException("Error while uploading compaction objects", ex);
});
}
try {
compactionCf.join();
} catch (CancellationException ex) {
logger.warn("Compaction progress {}/{} is cancelled", i + 1, compactionPlans.size());
return;
}
compactionCf = null;

streamObjectCfList.stream().map(CompletableFuture::join).forEach(request::addStreamObject);
}
List<ObjectStreamRange> objectStreamRanges = CompactionUtils.buildObjectStreamRangeFromGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,20 @@ public CompactionUploader(ObjectManager objectManager, S3Operator s3Operator, Co
public void shutdown() {
this.isShutdown = true;
this.streamSetObjectUploadPool.shutdown();
try {
if (!this.streamSetObjectUploadPool.awaitTermination(10, TimeUnit.SECONDS)) {
this.streamSetObjectUploadPool.shutdownNow();
}
} catch (InterruptedException ignored) {
}

this.streamObjectUploadPool.shutdown();
try {
if (!this.streamObjectUploadPool.awaitTermination(10, TimeUnit.SECONDS)) {
this.streamObjectUploadPool.shutdownNow();
}
} catch (InterruptedException ignored) {
}
}

public CompletableFuture<Void> chainWriteStreamSetObject(CompletableFuture<Void> prev,
Expand Down Expand Up @@ -108,10 +121,7 @@ public CompletableFuture<StreamObject> writeStreamObject(CompactedObject compact
return streamObject;
}).whenComplete((ret, ex) -> {
if (ex != null) {
if (isShutdown) {
// TODO: remove this when we're able to abort object uploading gracefully
LOGGER.warn("write to stream object {} failed", objectId, ex);
} else {
if (!isShutdown) {
LOGGER.error("write to stream object {} failed", objectId, ex);
}
dataBlockWriter.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,11 @@ private void readContinuousBlocks0(List<StreamDataBlock> streamDataBlocks) {
}

private CompletableFuture<ByteBuf> rangeRead(long start, long end) {
return rangeRead0(start, end).whenComplete((ret, ex) ->
CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes()));
return rangeRead0(start, end).whenComplete((ret, ex) -> {
if (ex == null) {
CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes());
}
});
}

private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
Expand Down
10 changes: 8 additions & 2 deletions s3stream/src/main/java/com/automq/stream/utils/Threads.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,21 @@ public static ExecutorService newFixedThreadPoolWithMonitor(int nThreads, String

public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name, boolean daemon,
Logger logger) {
return newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(name, true), logger, false);
return newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory(name, true), logger, false, true);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
Logger logger) {
return newSingleThreadScheduledExecutor(threadFactory, logger, false);
return newSingleThreadScheduledExecutor(threadFactory, logger, false, true);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
Logger logger, boolean removeOnCancelPolicy) {
return newSingleThreadScheduledExecutor(threadFactory, logger, removeOnCancelPolicy, true);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory,
Logger logger, boolean removeOnCancelPolicy, boolean executeExistingDelayedTasksAfterShutdownPolicy) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory) {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
Expand All @@ -80,6 +85,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
}
};
executor.setRemoveOnCancelPolicy(removeOnCancelPolicy);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdownPolicy);
return executor;
}

Expand Down
Loading

0 comments on commit 4ffb596

Please sign in to comment.