Skip to content

Commit

Permalink
feat(issues801): clean expired stream objects when compact
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Feb 18, 2024
1 parent cff8a96 commit e13e151
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 49 deletions.
2 changes: 1 addition & 1 deletion s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ default CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
* Fetch recordBatch list from stream. Note the startOffset may be in the middle in the first recordBatch.
* It is strongly recommended to handle the completion of the returned CompletableFuture in a separate thread.
*
* @param context fetch context, {@link FetchContext}.
* @param startOffset start offset, if the startOffset in middle of a recordBatch, the recordBatch will be returned.
* @param endOffset exclusive end offset, if the endOffset in middle of a recordBatch, the recordBatch will be returned.
* @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint.
* @param readOptions {@link ReadOptions}.
* @return - complete success with {@link FetchResult}, when fetch success.
* - complete exception with {@link StreamClientException}, when startOffset is bigger than stream end offset.
*/
Expand Down
18 changes: 5 additions & 13 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -66,7 +65,6 @@ public class S3Stream implements Stream {
private final Storage storage;
private final StreamManager streamManager;
private final Status status;
private final Consumer<Long> closeHook;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
Expand All @@ -79,13 +77,12 @@ public class S3Stream implements Stream {
private CompletableFuture<Void> lastPendingTrim = CompletableFuture.completedFuture(null);

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
StreamManager streamManager, Consumer<Long> closeHook) {
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, closeHook, null, null);
StreamManager streamManager) {
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, null, null);
}

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
StreamManager streamManager, Consumer<Long> closeHook,
AsyncNetworkBandwidthLimiter networkInboundLimiter, AsyncNetworkBandwidthLimiter networkOutboundLimiter) {
StreamManager streamManager, AsyncNetworkBandwidthLimiter networkInboundLimiter, AsyncNetworkBandwidthLimiter networkOutboundLimiter) {
this.streamId = streamId;
this.epoch = epoch;
this.startOffset = startOffset;
Expand All @@ -95,7 +92,6 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St
this.status = new Status();
this.storage = storage;
this.streamManager = streamManager;
this.closeHook = closeHook;
this.networkInboundLimiter = networkInboundLimiter;
this.networkOutboundLimiter = networkOutboundLimiter;
}
Expand Down Expand Up @@ -260,9 +256,7 @@ public CompletableFuture<Void> trim(long newStartOffset) {
CompletableFuture<Void> cf = new CompletableFuture<>();
lastPendingTrim.whenComplete((nil, ex) -> propagate(trim0(newStartOffset), cf));
this.lastPendingTrim = cf;
cf.whenComplete((nil, ex) -> {
StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
});
cf.whenComplete((nil, ex) -> StreamOperationStats.getInstance().trimStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS)));
return cf;
}, LOGGER, "trim");
} finally {
Expand Down Expand Up @@ -328,8 +322,7 @@ public CompletableFuture<Void> close() {

private CompletableFuture<Void> close0() {
return storage.forceUpload(streamId)
.thenCompose(nil -> streamManager.closeStream(streamId, epoch))
.whenComplete((nil, ex) -> closeHook.accept(streamId));
.thenCompose(nil -> streamManager.closeStream(streamId, epoch));
}

@Override
Expand All @@ -352,7 +345,6 @@ public CompletableFuture<Void> destroy() {

private CompletableFuture<Void> destroy0() {
status.markDestroy();
closeHook.accept(streamId);
startOffset = this.confirmOffset.get();
return streamManager.deleteStream(streamId, epoch);
}
Expand Down
125 changes: 113 additions & 12 deletions s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@

package com.automq.stream.s3;

import com.automq.stream.api.AppendResult;
import com.automq.stream.api.CreateStreamOptions;
import com.automq.stream.api.FetchResult;
import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.Stream;
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
Expand All @@ -25,7 +30,7 @@
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -42,7 +47,7 @@ public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);
private final ScheduledExecutorService streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true);
private final Map<Long, S3Stream> openedStreams;
private final Map<Long, StreamWrapper> openedStreams;
private final StreamManager streamManager;
private final Storage storage;
private final ObjectManager objectManager;
Expand All @@ -52,6 +57,7 @@ public class S3StreamClient implements StreamClient {
private final AsyncNetworkBandwidthLimiter networkOutboundBucket;
private ScheduledFuture<?> scheduledCompactionTaskFuture;

@SuppressWarnings("unused")
public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager,
S3Operator s3Operator, Config config) {
this(streamManager, storage, objectManager, s3Operator, config, null, null);
Expand Down Expand Up @@ -95,25 +101,19 @@ public Optional<Stream> getStream(long streamId) {
*/
private void startStreamObjectsCompactions() {
scheduledCompactionTaskFuture = streamObjectCompactionScheduler.scheduleWithFixedDelay(() -> {
List<S3Stream> operationStreams = new LinkedList<>(openedStreams.values());
operationStreams.forEach(stream -> {
StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).stream(stream)
.s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build();
task.compact();
});
List<StreamWrapper> operationStreams = new ArrayList<>(openedStreams.values());
operationStreams.forEach(StreamWrapper::compactStreamObject);
}, config.streamObjectCompactionIntervalMinutes(), config.streamObjectCompactionIntervalMinutes(), TimeUnit.MINUTES);
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
StreamObjectCompactor.Builder builder = StreamObjectCompactor.builder().objectManager(objectManager).s3Operator(s3Operator)
.maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes());
S3Stream stream = new S3Stream(
StreamWrapper stream = new StreamWrapper(new S3Stream(
metadata.streamId(), metadata.epoch(),
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, openedStreams::remove, networkInboundBucket, networkOutboundBucket);
storage, streamManager, networkInboundBucket, networkOutboundBucket));
openedStreams.put(streamId, stream);
StreamOperationStats.getInstance().openStreamStats.record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return stream;
Expand Down Expand Up @@ -149,4 +149,105 @@ public void shutdown() {
}
LOGGER.info("wait streams[{}] closed cost {}ms", streamCloseFutures.keySet(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}

class StreamWrapper implements Stream {
private final S3Stream stream;
private volatile boolean compacting = false;

public StreamWrapper(S3Stream stream) {
this.stream = stream;
}

@Override
public long streamId() {
return stream.streamId();
}

@Override
public long streamEpoch() {
return stream.streamEpoch();
}

@Override
public long startOffset() {
return stream.startOffset();
}

@Override
public long confirmOffset() {
return stream.confirmOffset();
}

@Override
public long nextOffset() {
return stream.nextOffset();
}

@Override
public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
return stream.append(context, recordBatch);
}

@Override
public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset,
int maxBytesHint) {
return stream.fetch(context, startOffset, endOffset, maxBytesHint);
}

@Override
public CompletableFuture<Void> trim(long newStartOffset) {
return stream.trim(newStartOffset).whenComplete((nil, ex) -> {
if (compacting) {
// skip compacting if the stream is compacting
// to avoid streamObjectCompactionScheduler task queue overflow.
return;
}
// trigger compaction after trim to clean up the expired stream objects.
streamObjectCompactionScheduler.execute(this::cleanupStreamObject);
});

}

@Override
public CompletableFuture<Void> close() {
return stream.close().whenComplete((v, e) -> openedStreams.remove(streamId(), this));
}

@Override
public CompletableFuture<Void> destroy() {
return stream.destroy().whenComplete((v, e) -> openedStreams.remove(streamId(), this));
}

public boolean isClosed() {
return stream.isClosed();
}

public void cleanupStreamObject() {
compactStreamObject0(true);
}

public void compactStreamObject() {
compactStreamObject0(false);
}

public void compactStreamObject0(boolean onlyCleanup) {
if (isClosed()) {
// the compaction task may be taking a long time,
// so we need to check if the stream is closed before starting the compaction.
return;
}
try {
compacting = true;
StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager).stream(stream)
.s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build();
if (onlyCleanup) {
task.cleanup();
} else {
task.compact();
}
} finally {
compacting = false;
}
}
}
}
Loading

0 comments on commit e13e151

Please sign in to comment.