Skip to content

Commit

Permalink
refactor(s3stream/wal): init the sliding window during start (#708)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Nov 23, 2023
1 parent 95d491b commit d6bd72f
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) {
return new BlockWALServiceBuilder(path);
}

private void flushWALHeader(long windowMaxLength, ShutdownType shutdownType) throws IOException {
walHeader.setSlidingWindowMaxLength(windowMaxLength).setShutdownType(shutdownType);
private void flushWALHeader(ShutdownType shutdownType) throws IOException {
walHeader.setShutdownType(shutdownType);
flushWALHeader();
}

Expand Down Expand Up @@ -276,6 +276,7 @@ public WriteAheadLog start() throws IOException {

WALHeader header = tryReadWALHeader(walChannel);
if (null == header) {
assert !recoveryMode;
header = newWALHeader();
firstStart = true;
LOGGER.info("no available WALHeader, create a new one: {}", header);
Expand Down Expand Up @@ -316,10 +317,7 @@ private WALHeader tryReadWALHeader(WALChannel walChannel) {
}

private WALHeader newWALHeader() {
return new WALHeader()
.setCapacity(walChannel.capacity())
.setSlidingWindowMaxLength(initialWindowSize)
.setShutdownType(ShutdownType.UNGRACEFULLY);
return new WALHeader(walChannel.capacity(), initialWindowSize);
}

private void walHeaderReady(WALHeader header) throws IOException {
Expand Down Expand Up @@ -348,12 +346,11 @@ public void shutdownGracefully() {
walHeaderFlusher.shutdownNow();
}

long maxLength = slidingWindowService.initialized()
? slidingWindowService.getWindowCoreData().getWindowMaxLength()
: walHeader.getSlidingWindowMaxLength();
boolean gracefulShutdown = slidingWindowService.shutdown(1, TimeUnit.DAYS);
boolean gracefulShutdown = Optional.ofNullable(slidingWindowService)
.map(s -> s.shutdown(1, TimeUnit.DAYS))
.orElse(true);
try {
flushWALHeader(maxLength, gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
} catch (IOException e) {
LOGGER.error("failed to flush WALHeader when shutdown gracefully", e);
}
Expand Down Expand Up @@ -447,7 +444,10 @@ public CompletableFuture<Void> reset() {
checkStarted();
checkRecoverFinished();

slidingWindowService.start(walHeader.getSlidingWindowMaxLength(), recoveryCompleteOffset);
if (!recoveryMode) {
// in recovery mode, no need to start sliding window service
slidingWindowService.start(walHeader.getAtomicSlidingWindowMaxLength(), recoveryCompleteOffset);
}
LOGGER.info("reset sliding window to offset: {}", recoveryCompleteOffset);
return trim(recoveryCompleteOffset - 1, true).thenRun(() -> resetFinished.set(true));
}
Expand All @@ -462,16 +462,13 @@ private CompletableFuture<Void> trim(long offset, boolean internal) {
if (!internal) {
checkWriteMode();
checkResetFinished();
}

if (offset >= slidingWindowService.getWindowCoreData().getWindowStartOffset()) {
throw new IllegalArgumentException("failed to trim: record at offset " + offset + " has not been flushed yet");
if (offset >= slidingWindowService.getWindowCoreData().getStartOffset()) {
throw new IllegalArgumentException("failed to trim: record at offset " + offset + " has not been flushed yet");
}
}

walHeader.updateTrimOffset(offset);
return CompletableFuture.runAsync(() -> {
// TODO: more beautiful
this.walHeader.setSlidingWindowMaxLength(slidingWindowService.getWindowCoreData().getWindowMaxLength());
try {
flushWALHeader();
} catch (IOException e) {
Expand Down Expand Up @@ -505,7 +502,7 @@ private void checkResetFinished() {
}

private SlidingWindowService.WALHeaderFlusher flusher() {
return windowMaxLength -> flushWALHeader(windowMaxLength, ShutdownType.UNGRACEFULLY);
return () -> flushWALHeader(ShutdownType.UNGRACEFULLY);
}

public static class BlockWALServiceBuilder {
Expand Down Expand Up @@ -634,20 +631,22 @@ public BlockWALService build() {
LOGGER.info("block wal not using direct IO");
}

// TODO: in recovery mode, no need to create sliding window service
// make sure window size is less than capacity
slidingWindowInitialSize = Math.min(slidingWindowInitialSize, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY);
slidingWindowUpperLimit = Math.min(slidingWindowUpperLimit, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY);
blockWALService.initialWindowSize = slidingWindowInitialSize;
blockWALService.slidingWindowService = new SlidingWindowService(
blockWALService.walChannel,
ioThreadNums,
slidingWindowUpperLimit,
slidingWindowScaleUnit,
blockSoftLimit,
writeRateLimit,
blockWALService.flusher()
);
if (!recoveryMode) {
// in recovery mode, no need to create sliding window service
// make sure window size is less than capacity
slidingWindowInitialSize = Math.min(slidingWindowInitialSize, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY);
slidingWindowUpperLimit = Math.min(slidingWindowUpperLimit, blockDeviceCapacityWant - WAL_HEADER_TOTAL_CAPACITY);
blockWALService.initialWindowSize = slidingWindowInitialSize;
blockWALService.slidingWindowService = new SlidingWindowService(
blockWALService.walChannel,
ioThreadNums,
slidingWindowUpperLimit,
slidingWindowScaleUnit,
blockSoftLimit,
writeRateLimit,
blockWALService.flusher()
);
}

blockWALService.recoveryMode = recoveryMode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public class SlidingWindowService {
private final long minWriteIntervalNanos;
private final WALChannel walChannel;
private final WALHeaderFlusher walHeaderFlusher;
private final WindowCoreData windowCoreData = new WindowCoreData();

/**
* The lock of {@link #pendingBlocks}, {@link #writingBlocks}, {@link #currentBlock}.
Expand All @@ -77,6 +76,11 @@ public class SlidingWindowService {
* After the service is initialized, data in {@link #windowCoreData} is valid.
*/
private final AtomicBoolean initialized = new AtomicBoolean(false);

/**
* The core data of the sliding window. Initialized when the service is started.
*/
private WindowCoreData windowCoreData;
/**
* Blocks that are waiting to be written.
* All blocks in this queue are ordered by the start offset.
Expand Down Expand Up @@ -109,10 +113,8 @@ public WindowCoreData getWindowCoreData() {
return windowCoreData;
}

public void start(long windowMaxLength, long windowStartOffset) {
windowCoreData.setWindowMaxLength(windowMaxLength);
windowCoreData.setWindowStartOffset(windowStartOffset);
windowCoreData.setWindowNextWriteOffset(windowStartOffset);
public void start(AtomicLong windowMaxLength, long windowStartOffset) {
this.windowCoreData = new WindowCoreData(windowMaxLength, windowStartOffset, windowStartOffset);
this.ioExecutor = Threads.newFixedThreadPoolWithMonitor(ioThreadNums,
"block-wal-io-thread", false, LOGGER);
ScheduledExecutorService pollBlockScheduler = Threads.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -223,7 +225,7 @@ public Block getCurrentBlockLocked() {
assert initialized();
// The current block is null only when no record has been written
if (null == currentBlock) {
currentBlock = nextBlock(windowCoreData.getWindowNextWriteOffset());
currentBlock = nextBlock(windowCoreData.getNextWriteOffset());
}
return currentBlock;
}
Expand Down Expand Up @@ -341,8 +343,8 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException, OverCapacityException {
// align to block size
newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset);
long windowStartOffset = windowCoreData.getWindowStartOffset();
long windowMaxLength = windowCoreData.getWindowMaxLength();
long windowStartOffset = windowCoreData.getStartOffset();
long windowMaxLength = windowCoreData.getMaxLength();
if (newWindowEndOffset > windowStartOffset + windowMaxLength) {
long newWindowMaxLength = newWindowEndOffset - windowStartOffset + scaleUnit;
if (newWindowMaxLength > upperLimit) {
Expand All @@ -362,7 +364,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOExcept
}

public interface WALHeaderFlusher {
void flush(long windowMaxLength) throws IOException;
void flush() throws IOException;
}

public static class RecordHeaderCoreData {
Expand Down Expand Up @@ -453,61 +455,59 @@ public ByteBuf marshal() {

public static class WindowCoreData {
private final Lock scaleOutLock = new ReentrantLock();
private final AtomicLong windowMaxLength = new AtomicLong(0);
private final AtomicLong maxLength;
/**
* Next write offset of sliding window, always aligned to the {@link WALUtil#BLOCK_SIZE}.
*/
private final AtomicLong windowNextWriteOffset = new AtomicLong(0);
private final AtomicLong nextWriteOffset;
/**
* Start offset of sliding window, always aligned to the {@link WALUtil#BLOCK_SIZE}.
* The data before this offset has already been written to the disk.
*/
private final AtomicLong windowStartOffset = new AtomicLong(0);

public long getWindowMaxLength() {
return windowMaxLength.get();
}
private final AtomicLong startOffset;

public void setWindowMaxLength(long windowMaxLength) {
this.windowMaxLength.set(windowMaxLength);
public WindowCoreData(AtomicLong maxLength, long nextWriteOffset, long startOffset) {
this.maxLength = maxLength;
this.nextWriteOffset = new AtomicLong(nextWriteOffset);
this.startOffset = new AtomicLong(startOffset);
}

public long getWindowNextWriteOffset() {
return windowNextWriteOffset.get();
public long getMaxLength() {
return maxLength.get();
}

public void setWindowNextWriteOffset(long windowNextWriteOffset) {
this.windowNextWriteOffset.set(windowNextWriteOffset);
public void setMaxLength(long maxLength) {
this.maxLength.set(maxLength);
}

public long getWindowStartOffset() {
return windowStartOffset.get();
public long getNextWriteOffset() {
return nextWriteOffset.get();
}

public void setWindowStartOffset(long windowStartOffset) {
this.windowStartOffset.set(windowStartOffset);
public long getStartOffset() {
return startOffset.get();
}

public void updateWindowStartOffset(long offset) {
this.windowStartOffset.accumulateAndGet(offset, Math::max);
this.startOffset.accumulateAndGet(offset, Math::max);
}

public void scaleOutWindow(WALHeaderFlusher flusher, long newWindowMaxLength) throws IOException {
public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException {
boolean scaleWindowHappened = false;
scaleOutLock.lock();
try {
if (newWindowMaxLength < getWindowMaxLength()) {
if (newMaxLength < getMaxLength()) {
// Another thread has already scaled out the window.
return;
}

flusher.flush(newWindowMaxLength);
setWindowMaxLength(newWindowMaxLength);
setMaxLength(newMaxLength);
flusher.flush();
scaleWindowHappened = true;
} finally {
scaleOutLock.unlock();
if (scaleWindowHappened) {
LOGGER.info("window scale out to {}", newWindowMaxLength);
LOGGER.info("window scale out to {}", newMaxLength);
} else {
LOGGER.debug("window already scale out, ignore");
}
Expand Down Expand Up @@ -538,7 +538,7 @@ private void writeBlock(BlockBatch blocks) {
FutureUtil.complete(blocks.futures(), new AppendResult.CallbackResult() {
@Override
public long flushedOffset() {
return windowCoreData.getWindowStartOffset();
return windowCoreData.getStartOffset();
}

@Override
Expand Down
17 changes: 8 additions & 9 deletions s3stream/src/main/java/com/automq/stream/s3/wal/WALHeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ class WALHeader {
private long epoch7;
private int crc8;

public WALHeader(long capacity, long windowMaxLength) {
this.capacity1 = capacity;
this.slidingWindowMaxLength4.set(windowMaxLength);
}

public static WALHeader unmarshal(ByteBuf buf) throws UnmarshalException {
WALHeader walHeader = new WALHeader();
WALHeader walHeader = new WALHeader(0, 0);
buf.markReaderIndex();
walHeader.magicCode0 = buf.readInt();
walHeader.capacity1 = buf.readLong();
Expand Down Expand Up @@ -106,11 +111,6 @@ public long getCapacity() {
return capacity1;
}

public WALHeader setCapacity(long capacity) {
this.capacity1 = capacity;
return this;
}

public long getTrimOffset() {
return trimOffset2.get();
}
Expand Down Expand Up @@ -142,9 +142,8 @@ public long getSlidingWindowMaxLength() {
return slidingWindowMaxLength4.get();
}

public WALHeader setSlidingWindowMaxLength(long slidingWindowMaxLength) {
this.slidingWindowMaxLength4.set(slidingWindowMaxLength);
return this;
public AtomicLong getAtomicSlidingWindowMaxLength() {
return slidingWindowMaxLength4;
}

public ShutdownType getShutdownType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,8 @@ private void write(WALChannel walChannel, long logicOffset, int recordSize) thro
}

private void writeWALHeader(WALChannel walChannel, long trimOffset, long maxLength) throws IOException {
ByteBuf header = new WALHeader()
.setCapacity(walChannel.capacity())
ByteBuf header = new WALHeader(walChannel.capacity(), maxLength)
.updateTrimOffset(trimOffset)
.setSlidingWindowMaxLength(maxLength)
.marshal();
walChannel.writeAndFlush(header, 0);
}
Expand Down Expand Up @@ -1208,7 +1206,7 @@ private void testCapacityMismatchInHeader0(boolean directIO) throws IOException
.direct(directIO)
.build();
walChannel.open();
walChannel.writeAndFlush(new WALHeader().setCapacity(capacity2).marshal(), 0);
walChannel.writeAndFlush(new WALHeader(capacity2, 42).marshal(), 0);
walChannel.close();

// try to open it with capacity1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ public class WALHeaderTest {

@Test
public void test() throws UnmarshalException {
WALHeader header = new WALHeader();
header.setCapacity(128 * 1024);
WALHeader header = new WALHeader(128 * 1024, 100);
header.updateTrimOffset(10);
header.setLastWriteTimestamp(11);
header.setSlidingWindowMaxLength(100);
header.setShutdownType(ShutdownType.GRACEFULLY);
header.setNodeId(233);
header.setEpoch(234);
Expand Down

0 comments on commit d6bd72f

Please sign in to comment.