From 034f6446e79aed63cd12ab66919970368e3446be Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 23 Nov 2023 21:32:54 +0800 Subject: [PATCH 1/2] fix(issues462): commit wal to closed stream Signed-off-by: Robin Han --- .../main/java/com/automq/stream/s3/S3Storage.java | 14 ++++++++------ .../java/com/automq/stream/s3/cache/LogCache.java | 10 ++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 3dab89b0b..e05273e9d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -363,17 +363,19 @@ private void continuousCheck(List records) { * Force upload stream WAL cache to S3. Use group upload to avoid generate too many S3 objects when broker shutdown. */ @Override - public synchronized CompletableFuture forceUpload(long streamId) { + public CompletableFuture forceUpload(long streamId) { CompletableFuture cf = new CompletableFuture<>(); List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); // await inflight stream set object upload tasks to group force upload tasks. CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> { - deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset()); - Optional blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId); - if (blockOpt.isPresent()) { - blockOpt.ifPresent(this::uploadDeltaWAL); + synchronized (this.inflightWALUploadTasks) { + deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset()); + Optional blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId); + if (blockOpt.isPresent()) { + blockOpt.ifPresent(this::uploadDeltaWAL); + } + FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); } - FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId)); }); return cf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 45570f2d6..77335c893 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -184,7 +184,17 @@ public LogCacheBlock archiveCurrentBlock() { } } + public Optional archiveCurrentBlockIfContains(long streamId) { + writeLock.lock(); + try { + return archiveCurrentBlockIfContains0(streamId); + } finally { + writeLock.unlock(); + } + } + + Optional archiveCurrentBlockIfContains0(long streamId) { if (streamId == MATCH_ALL_STREAMS) { if (activeBlock.size > 0) { return Optional.of(archiveCurrentBlock()); From f37ae321e2e34ee1675c208df47dadc4e6bd6249 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 23 Nov 2023 21:33:38 +0800 Subject: [PATCH 2/2] feat(s3stream): 0.5.4 Signed-off-by: Robin Han --- pom.xml | 2 +- s3stream/pom.xml | 2 +- .../java/com/automq/stream/s3/S3Storage.java | 46 ++++++++++++------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pom.xml b/pom.xml index 4339bce6b..05c41f878 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.0.1-jre 2.0.9 2.2 - 0.5.3-SNAPSHOT + 0.5.4-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index c5d74eb11..07a0c5180 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.5.3-SNAPSHOT + 0.5.4-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index e05273e9d..90cc9dcd0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -64,7 +64,9 @@ public class S3Storage implements Storage { private final long maxDeltaWALCacheSize; private final Config config; private final WriteAheadLog deltaWAL; - /** WAL log cache */ + /** + * WAL log cache + */ private final LogCache deltaWALCache; /** * WAL out of order callback sequencer. Single thread mainWriteExecutor will ensure the memory safety. @@ -368,14 +370,8 @@ public CompletableFuture forceUpload(long streamId) { List> inflightWALUploadTasks = new ArrayList<>(this.inflightWALUploadTasks); // await inflight stream set object upload tasks to group force upload tasks. CompletableFuture.allOf(inflightWALUploadTasks.toArray(new CompletableFuture[0])).whenCompleteAsync((nil, ex) -> { - synchronized (this.inflightWALUploadTasks) { - deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset()); - Optional blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId); - if (blockOpt.isPresent()) { - blockOpt.ifPresent(this::uploadDeltaWAL); - } - FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); - } + uploadDeltaWAL(streamId); + FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.toArray(new CompletableFuture[0])), cf); mainWriteExecutor.execute(() -> callbackSequencer.tryFree(streamId)); }); return cf; @@ -391,14 +387,11 @@ private void handleAppendCallback(WalWriteRequest request) { private void handleAppendCallback0(WalWriteRequest request) { List waitingAckRequests = callbackSequencer.after(request); - long walConfirmOffset = callbackSequencer.getWALConfirmOffset(); waitingAckRequests.forEach(r -> r.record.retain()); for (WalWriteRequest waitingAckRequest : waitingAckRequests) { if (deltaWALCache.put(waitingAckRequest.record)) { // cache block is full, trigger WAL upload. - deltaWALCache.setConfirmOffset(walConfirmOffset); - LogCache.LogCacheBlock logCacheBlock = deltaWALCache.archiveCurrentBlock(); - uploadDeltaWAL(logCacheBlock); + uploadDeltaWAL(); } } for (WalWriteRequest waitingAckRequest : waitingAckRequests) { @@ -406,15 +399,36 @@ private void handleAppendCallback0(WalWriteRequest request) { } } - /** - * Upload cache block to S3. The earlier cache block will have smaller objectId and commit first. - */ + @SuppressWarnings("UnusedReturnValue") + CompletableFuture uploadDeltaWAL() { + return uploadDeltaWAL(LogCache.MATCH_ALL_STREAMS); + } + + CompletableFuture uploadDeltaWAL(long streamId) { + synchronized (deltaWALCache) { + deltaWALCache.setConfirmOffset(callbackSequencer.getWALConfirmOffset()); + Optional blockOpt = deltaWALCache.archiveCurrentBlockIfContains(streamId); + if (blockOpt.isPresent()) { + LogCache.LogCacheBlock logCacheBlock = blockOpt.get(); + DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock); + context.objectManager = this.objectManager; + return uploadDeltaWAL(context); + } else { + return CompletableFuture.completedFuture(null); + } + } + } + + // only for test CompletableFuture uploadDeltaWAL(LogCache.LogCacheBlock logCacheBlock) { DeltaWALUploadTaskContext context = new DeltaWALUploadTaskContext(logCacheBlock); context.objectManager = this.objectManager; return uploadDeltaWAL(context); } + /** + * Upload cache block to S3. The earlier cache block will have smaller objectId and commit first. + */ CompletableFuture uploadDeltaWAL(DeltaWALUploadTaskContext context) { TimerUtil timerUtil = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>();