Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): 0.5.4 #712

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.5.3-SNAPSHOT</s3stream.version>
<s3stream.version>0.5.4-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.5.3-SNAPSHOT</version>
<version>0.5.4-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
44 changes: 30 additions & 14 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public class S3Storage implements Storage {
private final long maxDeltaWALCacheSize;
private final Config config;
private final WriteAheadLog deltaWAL;
/** WAL log cache */
/**
* WAL log cache
*/
private final LogCache deltaWALCache;
/**
* WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety.
Expand Down Expand Up @@ -363,16 +365,12 @@ private void continuousCheck(List<StreamRecordBatch> records) {
* Force upload stream WAL cache to S3. Use group upload to avoid generate too many S3 objects when broker shutdown.
*/
@Override
public synchronized CompletableFuture<Void> forceUpload(long streamId) {
public CompletableFuture<Void> forceUpload(long streamId) {
CompletableFuture<Void> cf = new CompletableFuture<>();
List<CompletableFuture<Void>> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks);
// await inflight stream set object upload tasks to group force upload tasks.
CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> {
deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset());
Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);
if (blockOpt.isPresent()) {
blockOpt.ifPresent(this::uploadDeltaWAL);
}
uploadDeltaWAL(streamId);
FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf);
mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId));
});
Expand All @@ -389,30 +387,48 @@ private void handleAppendCallback(WalWriteRequest request) {

private void handleAppendCallback0(WalWriteRequest request) {
List<WalWriteRequest> waitingAckRequests = callbackSequencer.after(request);
long walConfirmOffset = callbackSequencer.getWALConfirmOffset();
waitingAckRequests.forEach(r -> r.record.retain());
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
if (deltaWALCache.put(waitingAckRequest.record)) {
// cache block is full, trigger WAL upload.
deltaWALCache.setConfirmOffset(walConfirmOffset);
LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock();
uploadDeltaWAL(logCacheBlock);
uploadDeltaWAL();
}
}
for (WalWriteRequest waitingAckRequest : waitingAckRequests) {
waitingAckRequest.cf.complete(null);
}
}

/**
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
@SuppressWarnings("UnusedReturnValue")
CompletableFuture<Void> uploadDeltaWAL() {
return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS);
}

CompletableFuture<Void> uploadDeltaWAL(long streamId) {
synchronized (deltaWALCache) {
deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset());
Optional<LogCache.LogCacheBlock> blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId);
if (blockOpt.isPresent()) {
LogCache.LogCacheBlock logCacheBlock = blockOpt.get();
DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock);
context.objectManager = this.objectManager;
return uploadDeltaWAL(context);
} else {
return CompletableFuture.completedFuture(null);
}
}
}

// only for test
CompletableFuture<Void> uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) {
DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock);
context.objectManager = this.objectManager;
return uploadDeltaWAL(context);
}

/**
* Upload cache block to S3. The earlier cache block will have smaller objectId and commit first.
*/
CompletableFuture<Void> uploadDeltaWAL(DeltaWALUploadTaskContext context) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
Expand Down
10 changes: 10 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,17 @@ public LogCacheBlock archiveCurrentBlock() {
}
}


public Optional<LogCacheBlock> archiveCurrentBlockIfContains(long streamId) {
writeLock.lock();
try {
return archiveCurrentBlockIfContains0(streamId);
} finally {
writeLock.unlock();
}
}

Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {
if (streamId == MATCH_ALL_STREAMS) {
if (activeBlock.size > 0) {
return Optional.of(archiveCurrentBlock());
Expand Down
Loading