From 9d7bfc5c0fed7cdf5c71f3de1b7c995869bffedb Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 28 Oct 2023 09:28:57 +0800 Subject: [PATCH] fix(s3stream): fix kos issue390 record out of order (#495) Signed-off-by: Robin Han --- .../java/com/automq/stream/s3/S3Storage.java | 66 +++++++++---------- .../com/automq/stream/s3/WalWriteRequest.java | 2 +- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index b4b356001..c94fbd0cb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -84,7 +84,7 @@ public class S3Storage implements Storage { private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPool( 4, ThreadUtils.createThreadFactory("s3-storage-upload-wal", true), LOGGER); - private final Queue backoffRecords = new LinkedBlockingQueue<>(); + private final Queue backoffRecords = new LinkedBlockingQueue<>(); private final ScheduledFuture drainBackoffTask; private long lastLogTimestamp = 0L; @@ -203,8 +203,8 @@ LogCache.LogCacheBlock recoverContinuousRecords(Iterator append(StreamRecordBatch streamRecord) { + TimerUtil timerUtil = new TimerUtil(); CompletableFuture cf = new CompletableFuture<>(); - append0(streamRecord, cf); + WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf); + handleAppendRequest(writeRequest); + append0(writeRequest, false); + cf.whenComplete((nil, ex) -> { + streamRecord.release(); + OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc(); + OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed()); + }); return cf; } - public void append0(StreamRecordBatch streamRecord, CompletableFuture cf) { + /** + * Append record to WAL. + * @return backoff status. + */ + public boolean append0(WalWriteRequest request, boolean fromBackoff) { // TODO: storage status check, fast fail the request when storage closed. - TimerUtil timerUtil = new TimerUtil(); + if (!fromBackoff && !backoffRecords.isEmpty()) { + backoffRecords.offer(request); + return true; + } if (!tryAcquirePermit()) { - backoffRecords.offer(new BackoffRecord(streamRecord, cf)); + backoffRecords.offer(request); OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc(); if (System.currentTimeMillis() - lastLogTimestamp > 1000L) { LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize); lastLogTimestamp = System.currentTimeMillis(); } - return; + return true; } WriteAheadLog.AppendResult appendResult; try { + StreamRecordBatch streamRecord = request.record; streamRecord.encoded(); streamRecord.retain(); appendResult = log.append(streamRecord.encoded()); } catch (WriteAheadLog.OverCapacityException e) { // the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen. forceUpload(LogCache.MATCH_ALL_STREAMS); - backoffRecords.offer(new BackoffRecord(streamRecord, cf)); + backoffRecords.offer(request); if (System.currentTimeMillis() - lastLogTimestamp > 1000L) { LOGGER.warn("[BACKOFF] log over capacity", e); lastLogTimestamp = System.currentTimeMillis(); } - return; + return true; } - WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.recordOffset(), cf); - handleAppendRequest(writeRequest); - appendResult.future().thenAccept(nil -> handleAppendCallback(writeRequest)); - cf.whenComplete((nil, ex) -> { - streamRecord.release(); - OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc(); - OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed()); - }); + request.offset = appendResult.recordOffset(); + appendResult.future().thenAccept(nil -> handleAppendCallback(request)); + return false; } @SuppressWarnings("BooleanMethodIsAlwaysInverted") @@ -265,15 +276,14 @@ private boolean tryAcquirePermit() { private void tryDrainBackoffRecords() { try { for (; ; ) { - BackoffRecord backoffRecord = backoffRecords.peek(); - if (backoffRecord == null) { + WalWriteRequest request = backoffRecords.peek(); + if (request == null) { break; } - if (!tryAcquirePermit()) { - LOGGER.warn("try drain backoff record fail, log cache size {} is larger than {}", logCache.size(), maxWALCacheSize); + if (append0(request, true)) { + LOGGER.warn("try drain backoff record fail, still backoff"); break; } - append0(backoffRecord.record, backoffRecord.appendCf); backoffRecords.poll(); } } catch (Throwable e) { @@ -464,16 +474,6 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) { mainReadExecutor.execute(() -> logCache.markFree(cacheBlock)); } - static class BackoffRecord { - final StreamRecordBatch record; - final CompletableFuture appendCf; - - public BackoffRecord(StreamRecordBatch record, CompletableFuture appendCf) { - this.record = record; - this.appendCf = appendCf; - } - } - /** * WALCallbackSequencer is modified in single thread mainExecutor. */ diff --git a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java index fc3f994e8..844cbfb39 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java +++ b/s3stream/src/main/java/com/automq/stream/s3/WalWriteRequest.java @@ -24,7 +24,7 @@ public class WalWriteRequest implements Comparable { final StreamRecordBatch record; - final long offset; + long offset; final CompletableFuture cf; boolean persisted;