From d6bd72fe3d4f470b6683cf622cf7a1498fc0176e Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Thu, 23 Nov 2023 16:26:57 +0800 Subject: [PATCH] refactor(s3stream/wal): init the sliding window during start (#708) Signed-off-by: Ning Yu --- .../automq/stream/s3/wal/BlockWALService.java | 65 +++++++++--------- .../stream/s3/wal/SlidingWindowService.java | 66 +++++++++---------- .../com/automq/stream/s3/wal/WALHeader.java | 17 +++-- .../stream/s3/wal/BlockWALServiceTest.java | 6 +- .../automq/stream/s3/wal/WALHeaderTest.java | 4 +- 5 files changed, 76 insertions(+), 82 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index f347cf510..dcbd448fc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -135,8 +135,8 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) { return new BlockWALServiceBuilder(path); } - private void flushWALHeader(long windowMaxLength, ShutdownType shutdownType) throws IOException { - walHeader.setSlidingWindowMaxLength(windowMaxLength).setShutdownType(shutdownType); + private void flushWALHeader(ShutdownType shutdownType) throws IOException { + walHeader.setShutdownType(shutdownType); flushWALHeader(); } @@ -276,6 +276,7 @@ public WriteAheadLog start() throws IOException { WALHeader header = tryReadWALHeader(walChannel); if (null == header) { + assert !recoveryMode; header = newWALHeader(); firstStart = true; LOGGER.info("no available WALHeader, create a new one: {}", header); @@ -316,10 +317,7 @@ private WALHeader tryReadWALHeader(WALChannel walChannel) { } private WALHeader newWALHeader() { - return new WALHeader() - .setCapacity(walChannel.capacity()) - .setSlidingWindowMaxLength(initialWindowSize) - .setShutdownType(ShutdownType.UNGRACEFULLY); + return new WALHeader(walChannel.capacity(), initialWindowSize); } private void walHeaderReady(WALHeader header) throws IOException { @@ -348,12 +346,11 @@ public void shutdownGracefully() { walHeaderFlusher.shutdownNow(); } - long maxLength = slidingWindowService.initialized() - ? slidingWindowService.getWindowCoreData().getWindowMaxLength() - : walHeader.getSlidingWindowMaxLength(); - boolean gracefulShutdown = slidingWindowService.shutdown(1, TimeUnit.DAYS); + boolean gracefulShutdown = Optional.ofNullable(slidingWindowService) + .map(s -> s.shutdown(1, TimeUnit.DAYS)) + .orElse(true); try { - flushWALHeader(maxLength, gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY); + flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY); } catch (IOException e) { LOGGER.error("failed to flush WALHeader when shutdown gracefully", e); } @@ -447,7 +444,10 @@ public CompletableFuture reset() { checkStarted(); checkRecoverFinished(); - slidingWindowService.start(walHeader.getSlidingWindowMaxLength(), recoveryCompleteOffset); + if (!recoveryMode) { + // in recovery mode, no need to start sliding window service + slidingWindowService.start(walHeader.getAtomicSlidingWindowMaxLength(), recoveryCompleteOffset); + } LOGGER.info("reset sliding window to offset: {}", recoveryCompleteOffset); return trim(recoveryCompleteOffset - 1, true).thenRun(() -> resetFinished.set(true)); } @@ -462,16 +462,13 @@ private CompletableFuture trim(long offset, boolean internal) { if (!internal) { checkWriteMode(); checkResetFinished(); - } - - if (offset >= slidingWindowService.getWindowCoreData().getWindowStartOffset()) { - throw new IllegalArgumentException("failed to trim: record at offset " + offset + " has not been flushed yet"); + if (offset >= slidingWindowService.getWindowCoreData().getStartOffset()) { + throw new IllegalArgumentException("failed to trim: record at offset " + offset + " has not been flushed yet"); + } } walHeader.updateTrimOffset(offset); return CompletableFuture.runAsync(() -> { - // TODO: more beautiful - this.walHeader.setSlidingWindowMaxLength(slidingWindowService.getWindowCoreData().getWindowMaxLength()); try { flushWALHeader(); } catch (IOException e) { @@ -505,7 +502,7 @@ private void checkResetFinished() { } private SlidingWindowService.WALHeaderFlusher flusher() { - return windowMaxLength -> flushWALHeader(windowMaxLength, ShutdownType.UNGRACEFULLY); + return () -> flushWALHeader(ShutdownType.UNGRACEFULLY); } public static class BlockWALServiceBuilder { @@ -634,20 +631,22 @@ public BlockWALService build() { LOGGER.info("block wal not using direct IO"); } - // TODO: in recovery mode, no need to create sliding window service - // make sure window size is less than capacity - slidingWindowInitialSize = Math.min(slidingWindowInitialSize, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY); - slidingWindowUpperLimit = Math.min(slidingWindowUpperLimit, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY); - blockWALService.initialWindowSize = slidingWindowInitialSize; - blockWALService.slidingWindowService = new SlidingWindowService( - blockWALService.walChannel, - ioThreadNums, - slidingWindowUpperLimit, - slidingWindowScaleUnit, - blockSoftLimit, - writeRateLimit, - blockWALService.flusher() - ); + if (!recoveryMode) { + // in recovery mode, no need to create sliding window service + // make sure window size is less than capacity + slidingWindowInitialSize = Math.min(slidingWindowInitialSize, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY); + slidingWindowUpperLimit = Math.min(slidingWindowUpperLimit, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY); + blockWALService.initialWindowSize = slidingWindowInitialSize; + blockWALService.slidingWindowService = new SlidingWindowService( + blockWALService.walChannel, + ioThreadNums, + slidingWindowUpperLimit, + slidingWindowScaleUnit, + blockSoftLimit, + writeRateLimit, + blockWALService.flusher() + ); + } blockWALService.recoveryMode = recoveryMode; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java index 26ee6ce19..718d03064 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/SlidingWindowService.java @@ -62,7 +62,6 @@ public class SlidingWindowService { private final long minWriteIntervalNanos; private final WALChannel walChannel; private final WALHeaderFlusher walHeaderFlusher; - private final WindowCoreData windowCoreData = new WindowCoreData(); /** * The lock of {@link #pendingBlocks}, {@link #writingBlocks}, {@link #currentBlock}. @@ -77,6 +76,11 @@ public class SlidingWindowService { * After the service is initialized, data in {@link #windowCoreData} is valid. */ private final AtomicBoolean initialized = new AtomicBoolean(false); + + /** + * The core data of the sliding window. Initialized when the service is started. + */ + private WindowCoreData windowCoreData; /** * Blocks that are waiting to be written. * All blocks in this queue are ordered by the start offset. @@ -109,10 +113,8 @@ public WindowCoreData getWindowCoreData() { return windowCoreData; } - public void start(long windowMaxLength, long windowStartOffset) { - windowCoreData.setWindowMaxLength(windowMaxLength); - windowCoreData.setWindowStartOffset(windowStartOffset); - windowCoreData.setWindowNextWriteOffset(windowStartOffset); + public void start(AtomicLong windowMaxLength, long windowStartOffset) { + this.windowCoreData = new WindowCoreData(windowMaxLength, windowStartOffset, windowStartOffset); this.ioExecutor = Threads.newFixedThreadPoolWithMonitor(ioThreadNums, "block-wal-io-thread", false, LOGGER); ScheduledExecutorService pollBlockScheduler = Threads.newSingleThreadScheduledExecutor( @@ -223,7 +225,7 @@ public Block getCurrentBlockLocked() { assert initialized(); // The current block is null only when no record has been written if (null == currentBlock) { - currentBlock = nextBlock(windowCoreData.getWindowNextWriteOffset()); + currentBlock = nextBlock(windowCoreData.getNextWriteOffset()); } return currentBlock; } @@ -341,8 +343,8 @@ private void writeBlockData(BlockBatch blocks) throws IOException { private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException { // align to block size newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset); - long windowStartOffset = windowCoreData.getWindowStartOffset(); - long windowMaxLength = windowCoreData.getWindowMaxLength(); + long windowStartOffset = windowCoreData.getStartOffset(); + long windowMaxLength = windowCoreData.getMaxLength(); if (newWindowEndOffset > windowStartOffset + windowMaxLength) { long newWindowMaxLength = newWindowEndOffset - windowStartOffset + scaleUnit; if (newWindowMaxLength > upperLimit) { @@ -362,7 +364,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOExcept } public interface WALHeaderFlusher { - void flush(long windowMaxLength) throws IOException; + void flush() throws IOException; } public static class RecordHeaderCoreData { @@ -453,61 +455,59 @@ public ByteBuf marshal() { public static class WindowCoreData { private final Lock scaleOutLock = new ReentrantLock(); - private final AtomicLong windowMaxLength = new AtomicLong(0); + private final AtomicLong maxLength; /** * Next write offset of sliding window, always aligned to the {@link WALUtil#BLOCK_SIZE}. */ - private final AtomicLong windowNextWriteOffset = new AtomicLong(0); + private final AtomicLong nextWriteOffset; /** * Start offset of sliding window, always aligned to the {@link WALUtil#BLOCK_SIZE}. * The data before this offset has already been written to the disk. */ - private final AtomicLong windowStartOffset = new AtomicLong(0); - - public long getWindowMaxLength() { - return windowMaxLength.get(); - } + private final AtomicLong startOffset; - public void setWindowMaxLength(long windowMaxLength) { - this.windowMaxLength.set(windowMaxLength); + public WindowCoreData(AtomicLong maxLength, long nextWriteOffset, long startOffset) { + this.maxLength = maxLength; + this.nextWriteOffset = new AtomicLong(nextWriteOffset); + this.startOffset = new AtomicLong(startOffset); } - public long getWindowNextWriteOffset() { - return windowNextWriteOffset.get(); + public long getMaxLength() { + return maxLength.get(); } - public void setWindowNextWriteOffset(long windowNextWriteOffset) { - this.windowNextWriteOffset.set(windowNextWriteOffset); + public void setMaxLength(long maxLength) { + this.maxLength.set(maxLength); } - public long getWindowStartOffset() { - return windowStartOffset.get(); + public long getNextWriteOffset() { + return nextWriteOffset.get(); } - public void setWindowStartOffset(long windowStartOffset) { - this.windowStartOffset.set(windowStartOffset); + public long getStartOffset() { + return startOffset.get(); } public void updateWindowStartOffset(long offset) { - this.windowStartOffset.accumulateAndGet(offset, Math::max); + this.startOffset.accumulateAndGet(offset, Math::max); } - public void scaleOutWindow(WALHeaderFlusher flusher, long newWindowMaxLength) throws IOException { + public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException { boolean scaleWindowHappened = false; scaleOutLock.lock(); try { - if (newWindowMaxLength < getWindowMaxLength()) { + if (newMaxLength < getMaxLength()) { // Another thread has already scaled out the window. return; } - flusher.flush(newWindowMaxLength); - setWindowMaxLength(newWindowMaxLength); + setMaxLength(newMaxLength); + flusher.flush(); scaleWindowHappened = true; } finally { scaleOutLock.unlock(); if (scaleWindowHappened) { - LOGGER.info("window scale out to {}", newWindowMaxLength); + LOGGER.info("window scale out to {}", newMaxLength); } else { LOGGER.debug("window already scale out, ignore"); } @@ -538,7 +538,7 @@ private void writeBlock(BlockBatch blocks) { FutureUtil.complete(blocks.futures(), new AppendResult.CallbackResult() { @Override public long flushedOffset() { - return windowCoreData.getWindowStartOffset(); + return windowCoreData.getStartOffset(); } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java b/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java index af69b355d..0c2442456 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java @@ -74,8 +74,13 @@ class WALHeader { private long epoch7; private int crc8; + public WALHeader(long capacity, long windowMaxLength) { + this.capacity1 = capacity; + this.slidingWindowMaxLength4.set(windowMaxLength); + } + public static WALHeader unmarshal(ByteBuf buf) throws UnmarshalException { - WALHeader walHeader = new WALHeader(); + WALHeader walHeader = new WALHeader(0, 0); buf.markReaderIndex(); walHeader.magicCode0 = buf.readInt(); walHeader.capacity1 = buf.readLong(); @@ -106,11 +111,6 @@ public long getCapacity() { return capacity1; } - public WALHeader setCapacity(long capacity) { - this.capacity1 = capacity; - return this; - } - public long getTrimOffset() { return trimOffset2.get(); } @@ -142,9 +142,8 @@ public long getSlidingWindowMaxLength() { return slidingWindowMaxLength4.get(); } - public WALHeader setSlidingWindowMaxLength(long slidingWindowMaxLength) { - this.slidingWindowMaxLength4.set(slidingWindowMaxLength); - return this; + public AtomicLong getAtomicSlidingWindowMaxLength() { + return slidingWindowMaxLength4; } public ShutdownType getShutdownType() { diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java index 67c004f76..f16265406 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java @@ -611,10 +611,8 @@ private void write(WALChannel walChannel, long logicOffset, int recordSize) thro } private void writeWALHeader(WALChannel walChannel, long trimOffset, long maxLength) throws IOException { - ByteBuf header = new WALHeader() - .setCapacity(walChannel.capacity()) + ByteBuf header = new WALHeader(walChannel.capacity(), maxLength) .updateTrimOffset(trimOffset) - .setSlidingWindowMaxLength(maxLength) .marshal(); walChannel.writeAndFlush(header, 0); } @@ -1208,7 +1206,7 @@ private void testCapacityMismatchInHeader0(boolean directIO) throws IOException .direct(directIO) .build(); walChannel.open(); - walChannel.writeAndFlush(new WALHeader().setCapacity(capacity2).marshal(), 0); + walChannel.writeAndFlush(new WALHeader(capacity2, 42).marshal(), 0); walChannel.close(); // try to open it with capacity1 diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/WALHeaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/WALHeaderTest.java index 8e6b5a87c..1a41f601e 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/WALHeaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/WALHeaderTest.java @@ -25,11 +25,9 @@ public class WALHeaderTest { @Test public void test() throws UnmarshalException { - WALHeader header = new WALHeader(); - header.setCapacity(128 * 1024); + WALHeader header = new WALHeader(128 * 1024, 100); header.updateTrimOffset(10); header.setLastWriteTimestamp(11); - header.setSlidingWindowMaxLength(100); header.setShutdownType(ShutdownType.GRACEFULLY); header.setNodeId(233); header.setEpoch(234);