Skip to content

Commit

Permalink
fix(s3stream): fix kos issue390 record out of order (#495)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Oct 28, 2023
1 parent d64fbed commit 9d7bfc5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
66 changes: 33 additions & 33 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class S3Storage implements Storage {
private final ExecutorService uploadWALExecutor = Threads.newFixedThreadPool(
4, ThreadUtils.createThreadFactory("s3-storage-upload-wal", true), LOGGER);

private final Queue<BackoffRecord> backoffRecords = new LinkedBlockingQueue<>();
private final Queue<WalWriteRequest> backoffRecords = new LinkedBlockingQueue<>();
private final ScheduledFuture<?> drainBackoffTask;
private long lastLogTimestamp = 0L;

Expand Down Expand Up @@ -203,8 +203,8 @@ LogCache.LogCacheBlock recoverContinuousRecords(Iterator<WriteAheadLog.RecoverRe
public void shutdown() {
drainBackoffTask.cancel(false);
FutureUtil.suppress(drainBackoffTask::get, LOGGER);
for (BackoffRecord backoffRecord : backoffRecords) {
backoffRecord.appendCf.completeExceptionally(new IOException("S3Storage is shutdown"));
for (WalWriteRequest request : backoffRecords) {
request.cf.completeExceptionally(new IOException("S3Storage is shutdown"));
}
log.shutdownGracefully();
backgroundExecutor.shutdown();
Expand All @@ -215,46 +215,57 @@ public void shutdown() {

@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
append0(streamRecord, cf);
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf);
handleAppendRequest(writeRequest);
append0(writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed());
});
return cf;
}

public void append0(StreamRecordBatch streamRecord, CompletableFuture<Void> cf) {
/**
* Append record to WAL.
* @return backoff status.
*/
public boolean append0(WalWriteRequest request, boolean fromBackoff) {
// TODO: storage status check, fast fail the request when storage closed.
TimerUtil timerUtil = new TimerUtil();
if (!fromBackoff && !backoffRecords.isEmpty()) {
backoffRecords.offer(request);
return true;
}
if (!tryAcquirePermit()) {
backoffRecords.offer(new BackoffRecord(streamRecord, cf));
backoffRecords.offer(request);
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE_LOG_CACHE_FULL).operationCount.inc();
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
lastLogTimestamp = System.currentTimeMillis();
}
return;
return true;
}
WriteAheadLog.AppendResult appendResult;
try {
StreamRecordBatch streamRecord = request.record;
streamRecord.encoded();
streamRecord.retain();
appendResult = log.append(streamRecord.encoded());
} catch (WriteAheadLog.OverCapacityException e) {
// the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen.
forceUpload(LogCache.MATCH_ALL_STREAMS);
backoffRecords.offer(new BackoffRecord(streamRecord, cf));
backoffRecords.offer(request);
if (System.currentTimeMillis() - lastLogTimestamp > 1000L) {
LOGGER.warn("[BACKOFF] log over capacity", e);
lastLogTimestamp = System.currentTimeMillis();
}
return;
return true;
}
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, appendResult.recordOffset(), cf);
handleAppendRequest(writeRequest);
appendResult.future().thenAccept(nil -> handleAppendCallback(writeRequest));
cf.whenComplete((nil, ex) -> {
streamRecord.release();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationCount.inc();
OperationMetricsStats.getOrCreateOperationMetrics(S3Operation.APPEND_STORAGE).operationTime.update(timerUtil.elapsed());
});
request.offset = appendResult.recordOffset();
appendResult.future().thenAccept(nil -> handleAppendCallback(request));
return false;
}

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
Expand All @@ -265,15 +276,14 @@ private boolean tryAcquirePermit() {
private void tryDrainBackoffRecords() {
try {
for (; ; ) {
BackoffRecord backoffRecord = backoffRecords.peek();
if (backoffRecord == null) {
WalWriteRequest request = backoffRecords.peek();
if (request == null) {
break;
}
if (!tryAcquirePermit()) {
LOGGER.warn("try drain backoff record fail, log cache size {} is larger than {}", logCache.size(), maxWALCacheSize);
if (append0(request, true)) {
LOGGER.warn("try drain backoff record fail, still backoff");
break;
}
append0(backoffRecord.record, backoffRecord.appendCf);
backoffRecords.poll();
}
} catch (Throwable e) {
Expand Down Expand Up @@ -464,16 +474,6 @@ private void freeCache(LogCache.LogCacheBlock cacheBlock) {
mainReadExecutor.execute(() -> logCache.markFree(cacheBlock));
}

static class BackoffRecord {
final StreamRecordBatch record;
final CompletableFuture<Void> appendCf;

public BackoffRecord(StreamRecordBatch record, CompletableFuture<Void> appendCf) {
this.record = record;
this.appendCf = appendCf;
}
}

/**
* WALCallbackSequencer is modified in single thread mainExecutor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class WalWriteRequest implements Comparable<WalWriteRequest> {
final StreamRecordBatch record;
final long offset;
long offset;
final CompletableFuture<Void> cf;
boolean persisted;

Expand Down

0 comments on commit 9d7bfc5

Please sign in to comment.