Skip to content

Commit

Permalink
fix: retry to read/write/flush when IOException occurs (#933)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Feb 20, 2024
1 parent 498690b commit 717c58b
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 87 deletions.
9 changes: 8 additions & 1 deletion s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,14 @@ public boolean append0(AppendContext context, WalWriteRequest request, boolean f
request.cf.completeExceptionally(e);
return false;
}
appendResult.future().thenAccept(nil -> handleAppendCallback(request));
appendResult.future().whenComplete((nil, ex) -> {
if (ex != null) {
// no exception should be thrown from the WAL
LOGGER.error("[UNEXPECTED] append WAL fail, request {}", request, ex);
return;
}
handleAppendCallback(request);
});
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,13 @@ public WalWriteRequest(StreamRecordBatch record, long offset, CompletableFuture<
public int compareTo(WalWriteRequest o) {
return record.compareTo(o.record);
}

@Override
public String toString() {
return "WalWriteRequest{" +
"record=" + record +
", offset=" + offset +
", persisted=" + persisted +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -125,6 +124,9 @@ public class BlockWALService implements WriteAheadLog {
*/
private long recoveryCompleteOffset = -1;

private BlockWALService() {
}

public static BlockWALServiceBuilder builder(String path, long capacity) {
return new BlockWALServiceBuilder(path, capacity);
}
Expand All @@ -133,28 +135,20 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) {
return new BlockWALServiceBuilder(path);
}

private BlockWALService() {
}

private void flushWALHeader(ShutdownType shutdownType) throws IOException {
private void flushWALHeader(ShutdownType shutdownType) {
walHeader.setShutdownType(shutdownType);
flushWALHeader();
}

private synchronized void flushWALHeader() throws IOException {
private synchronized void flushWALHeader() {
long position = writeHeaderRoundTimes.getAndIncrement() % WAL_HEADER_COUNT * WAL_HEADER_CAPACITY;
try {
walHeader.setLastWriteTimestamp(System.nanoTime());
long trimOffset = walHeader.getTrimOffset();
ByteBuf buf = walHeader.marshal();
this.walChannel.writeAndFlush(buf, position);
buf.release();
walHeader.updateFlushedTrimOffset(trimOffset);
LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader);
} catch (IOException e) {
LOGGER.error("failed to flush WAL header, position: {}, header: {}", position, walHeader, e);
throw e;
}
walHeader.setLastWriteTimestamp(System.nanoTime());
long trimOffset = walHeader.getTrimOffset();
ByteBuf buf = walHeader.marshal();
this.walChannel.retryWriteAndFlush(buf, position);
buf.release();
walHeader.updateFlushedTrimOffset(trimOffset);
LOGGER.debug("WAL header flushed, position: {}, header: {}", position, walHeader);
}

/**
Expand Down Expand Up @@ -188,19 +182,11 @@ private ByteBuf readRecord(long recoverStartOffset,
private SlidingWindowService.RecordHeaderCoreData parseRecordHeader(long recoverStartOffset, ByteBuf recordHeader,
Function<Long, Long> logicalToPhysical) throws ReadRecordException {
final long position = logicalToPhysical.apply(recoverStartOffset);
try {
int read = walChannel.read(recordHeader, position);
if (read != RECORD_HEADER_SIZE) {
throw new ReadRecordException(
WALUtil.alignNextBlock(recoverStartOffset),
String.format("failed to read record header: expected %d bytes, actual %d bytes, recoverStartOffset: %d", RECORD_HEADER_SIZE, read, recoverStartOffset)
);
}
} catch (IOException e) {
LOGGER.error("failed to read record header, position: {}, recoverStartOffset: {}", position, recoverStartOffset, e);
int read = walChannel.retryRead(recordHeader, position);
if (read != RECORD_HEADER_SIZE) {
throw new ReadRecordException(
WALUtil.alignNextBlock(recoverStartOffset),
String.format("failed to read record header, recoverStartOffset: %d", recoverStartOffset)
String.format("failed to read record header: expected %d bytes, actual %d bytes, recoverStartOffset: %d", RECORD_HEADER_SIZE, read, recoverStartOffset)
);
}

Expand Down Expand Up @@ -243,20 +229,12 @@ private void parseRecordBody(long recoverStartOffset, SlidingWindowService.Recor
ByteBuf recordBody, Function<Long, Long> logicalToPhysical) throws ReadRecordException {
long recordBodyOffset = readRecordHeader.getRecordBodyOffset();
int recordBodyLength = readRecordHeader.getRecordBodyLength();
try {
long position = logicalToPhysical.apply(recordBodyOffset);
int read = walChannel.read(recordBody, position);
if (read != recordBodyLength) {
throw new ReadRecordException(
WALUtil.alignNextBlock(recoverStartOffset + RECORD_HEADER_SIZE + recordBodyLength),
String.format("failed to read record body: expected %d bytes, actual %d bytes, recoverStartOffset: %d", recordBodyLength, read, recoverStartOffset)
);
}
} catch (IOException e) {
LOGGER.error("failed to read record body, position: {}, recoverStartOffset: {}", recordBodyOffset, recoverStartOffset, e);
long position = logicalToPhysical.apply(recordBodyOffset);
int read = walChannel.retryRead(recordBody, position);
if (read != recordBodyLength) {
throw new ReadRecordException(
WALUtil.alignNextBlock(recoverStartOffset + RECORD_HEADER_SIZE + recordBodyLength),
String.format("failed to read record body, recoverStartOffset: %d", recoverStartOffset)
String.format("failed to read record body: expected %d bytes, actual %d bytes, recoverStartOffset: %d", recordBodyLength, read, recoverStartOffset)
);
}

Expand Down Expand Up @@ -324,15 +302,15 @@ private WALHeader tryReadWALHeader(WALChannel walChannel) {
for (int i = 0; i < WAL_HEADER_COUNT; i++) {
ByteBuf buf = DirectByteBufAlloc.byteBuffer(WALHeader.WAL_HEADER_SIZE);
try {
int read = walChannel.read(buf, i * WAL_HEADER_CAPACITY);
int read = walChannel.retryRead(buf, i * WAL_HEADER_CAPACITY);
if (read != WALHeader.WAL_HEADER_SIZE) {
continue;
}
WALHeader tmpHeader = WALHeader.unmarshal(buf);
if (header == null || header.getLastWriteTimestamp() < tmpHeader.getLastWriteTimestamp()) {
header = tmpHeader;
}
} catch (IOException | UnmarshalException ignored) {
} catch (UnmarshalException ignored) {
// failed to parse WALHeader, ignore
} finally {
buf.release();
Expand All @@ -345,7 +323,7 @@ private WALHeader newWALHeader() {
return new WALHeader(walChannel.capacity(), initialWindowSize);
}

private void walHeaderReady(WALHeader header) throws IOException {
private void walHeaderReady(WALHeader header) {
if (nodeId != NOOP_NODE_ID) {
header.setNodeId(nodeId);
header.setEpoch(epoch);
Expand Down Expand Up @@ -374,11 +352,7 @@ public void shutdownGracefully() {
boolean gracefulShutdown = Optional.ofNullable(slidingWindowService)
.map(s -> s.shutdown(1, TimeUnit.DAYS))
.orElse(true);
try {
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
} catch (IOException e) {
LOGGER.error("failed to flush WALHeader when shutdown gracefully", e);
}
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);

walChannel.close();

Expand Down Expand Up @@ -507,13 +481,7 @@ private CompletableFuture<Void> trim(long offset, boolean internal) {
}

walHeader.updateTrimOffset(offset);
return CompletableFuture.runAsync(() -> {
try {
flushWALHeader();
} catch (IOException e) {
throw new CompletionException(e);
}
}, walHeaderFlusher);
return CompletableFuture.runAsync(this::flushWALHeader, walHeaderFlusher);
}

private void checkStarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.PriorityQueue;
Expand Down Expand Up @@ -348,17 +347,17 @@ private long wroteBlocksLocked(BlockBatch wroteBlocks) {
return writingBlocks.peek();
}

private void writeBlockData(BlockBatch blocks) throws IOException {
private void writeBlockData(BlockBatch blocks) {
TimerUtil timer = new TimerUtil();
for (Block block : blocks.blocks()) {
long position = WALUtil.recordOffsetToPosition(block.startOffset(), walChannel.capacity(), WAL_HEADER_TOTAL_CAPACITY);
walChannel.write(block.data(), position);
walChannel.retryWrite(block.data(), position);
}
walChannel.flush();
walChannel.retryFlush();
StorageOperationStats.getInstance().appendWALWriteStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
}

private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException {
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) {
// align to block size
newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset);
long windowStartOffset = windowCoreData.getStartOffset();
Expand All @@ -372,7 +371,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOExcept
}

public interface WALHeaderFlusher {
void flush() throws IOException;
void flush();
}

public static class RecordHeaderCoreData {
Expand Down Expand Up @@ -500,7 +499,7 @@ public void updateWindowStartOffset(long offset) {
this.startOffset.accumulateAndGet(offset, Math::max);
}

public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException {
public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) {
boolean scaleWindowHappened = false;
scaleOutLock.lock();
try {
Expand Down Expand Up @@ -535,36 +534,37 @@ public WriteBlockProcessor(BlockBatch blocks) {
@Override
public void run() {
StorageOperationStats.getInstance().appendWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
writeBlock(this.blocks);
}

private void writeBlock(BlockBatch blocks) {
try {
makeWriteOffsetMatchWindow(blocks.endOffset());
writeBlockData(blocks);

TimerUtil timer = new TimerUtil();
// Update the start offset of the sliding window after finishing writing the record.
windowCoreData.updateWindowStartOffset(wroteBlocks(blocks));

FutureUtil.complete(blocks.futures(), new AppendResult.CallbackResult() {
@Override
public long flushedOffset() {
return windowCoreData.getStartOffset();
}

@Override
public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
StorageOperationStats.getInstance().appendWALAfterStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
writeBlock(this.blocks);
} catch (Exception e) {
// should not happen, but just in case
FutureUtil.completeExceptionally(blocks.futures(), e);
LOGGER.error(String.format("failed to write blocks, startOffset: %s", blocks.startOffset()), e);
} finally {
blocks.release();
}
}

private void writeBlock(BlockBatch blocks) {
makeWriteOffsetMatchWindow(blocks.endOffset());
writeBlockData(blocks);

TimerUtil timer = new TimerUtil();
// Update the start offset of the sliding window after finishing writing the record.
windowCoreData.updateWindowStartOffset(wroteBlocks(blocks));

FutureUtil.complete(blocks.futures(), new AppendResult.CallbackResult() {
@Override
public long flushedOffset() {
return windowCoreData.getStartOffset();
}

@Override
public String toString() {
return "CallbackResult{" + "flushedOffset=" + flushedOffset() + '}';
}
});
StorageOperationStats.getInstance().appendWALAfterStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS));
}
}
}
Loading

0 comments on commit 717c58b

Please sign in to comment.