Skip to content

Commit

Permalink
feat(s3stream): add read block cache context to trace status (#835)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 15, 2023
1 parent 3a807e3 commit 848cdde
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@

public class DefaultS3BlockCache implements S3BlockCache {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class);
private final Map<ReadAheadTaskKey, CompletableFuture<Void>> inflightReadAheadTasks = new ConcurrentHashMap<>();
private final Map<ReadAheadTaskKey, ReadAheadTaskContext> inflightReadAheadTasks = new ConcurrentHashMap<>();
private final Map<ReadTaskKey, ReadTaskContext> inflightReadStatusMap = new ConcurrentHashMap<>();
private final BlockCache cache;
private final ExecutorService mainExecutor;
private final ReadAheadManager readAheadManager;
Expand Down Expand Up @@ -81,14 +82,17 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
ReadAheadAgent agent = this.readAheadManager.getOrCreateReadAheadAgent(streamId, startOffset);
UUID uuid = UUID.randomUUID();
ReadTaskKey key = new ReadTaskKey(streamId, startOffset, endOffset, maxBytes , uuid);
ReadTaskContext context = new ReadTaskContext(agent, ReadBlockCacheStatus.INIT);
this.inflightReadStatusMap.put(key, context);
// submit read task to mainExecutor to avoid read slower the caller thread.
mainExecutor.execute(() -> {
FutureUtil.exec(() -> {
read0(streamId, startOffset, endOffset, maxBytes, agent, uuid).whenComplete((ret, ex) -> {
try {
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, uuid, context).whenComplete((ret, ex) -> {
if (ex != null) {
LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex);
readCf.completeExceptionally(ex);
this.inflightReadThrottle.release(uuid);
this.inflightReadStatusMap.remove(key);
return;
}
int totalReturnedSize = ret.getRecords().stream().mapToInt(StreamRecordBatch::size).sum();
Expand All @@ -102,15 +106,22 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ",
ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize);
}
readCf.complete(ret);
this.inflightReadThrottle.release(uuid);
});
}, readCf, LOGGER, "read");
this.inflightReadStatusMap.remove(key);
}), readCf);
} catch (Exception e) {
LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail, {}", streamId, startOffset, endOffset, maxBytes, e);
this.inflightReadThrottle.release(uuid);
this.inflightReadStatusMap.remove(key);
readCf.completeExceptionally(e);
}
});
return readCf;
}

public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadAheadAgent agent, UUID uuid) {
public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid, ReadTaskContext context) {
ReadAheadAgent agent = context.agent;

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read0, stream={}, {}-{}, total bytes: {}, uuid: {} ", streamId, startOffset, endOffset, maxBytes, uuid);
}
Expand All @@ -122,15 +133,17 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
long nextStartOffset = startOffset;
int nextMaxBytes = maxBytes;

CompletableFuture<Void> inflightReadAheadTask = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, nextStartOffset));
if (inflightReadAheadTask != null) {
ReadAheadTaskContext inflightReadAheadTaskContext = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, nextStartOffset));
if (inflightReadAheadTaskContext != null) {
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
inflightReadAheadTask.whenComplete((nil, ex) -> FutureUtil.exec(() -> FutureUtil.propagate(
read0(streamId, startOffset, endOffset, maxBytes, agent, uuid), readCf), readCf, LOGGER, "read0"));
context.setStatus(ReadBlockCacheStatus.WAIT_INFLIGHT_RA);
inflightReadAheadTaskContext.cf.whenComplete((nil, ex) -> FutureUtil.exec(() -> FutureUtil.propagate(
read0(streamId, startOffset, endOffset, maxBytes, uuid, context), readCf), readCf, LOGGER, "read0"));
return readCf;
}

// 1. get from cache
context.setStatus(ReadBlockCacheStatus.GET_FROM_CACHE);
BlockCache.GetCacheResult cacheRst = cache.get(streamId, nextStartOffset, endOffset, nextMaxBytes);
List<StreamRecordBatch> cacheRecords = cacheRst.getRecords();
if (!cacheRecords.isEmpty()) {
Expand All @@ -148,7 +161,7 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data partially hit cache, stream={}, {}-{}, total bytes: {} ", streamId, nextStartOffset, endOffset, nextMaxBytes);
}
return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, agent, uuid).thenApply(rst -> {
return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, uuid, context).thenApply(rst -> {
List<StreamRecordBatch> records = new ArrayList<>(cacheRecords);
records.addAll(rst.getRecords());
return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS);
Expand All @@ -157,6 +170,7 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
}

// 2. get from s3
context.setStatus(ReadBlockCacheStatus.GET_FROM_S3);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read data cache miss, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes);
}
Expand Down Expand Up @@ -190,7 +204,61 @@ public record ReadAheadTaskKey(long streamId, long startOffset) {

}

public static class ReadAheadTaskContext {
final CompletableFuture<Void> cf;
ReadBlockCacheStatus status;

public ReadAheadTaskContext(CompletableFuture<Void> cf, ReadBlockCacheStatus status) {
this.cf = cf;
this.status = status;
}

void setStatus(ReadBlockCacheStatus status) {
this.status = status;
}
}

public record ReadTaskKey(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid) {
@Override
public String toString() {
return "ReadTaskKey{" +
"streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", maxBytes=" + maxBytes +
", uuid=" + uuid +
'}';
}
}

public static class ReadTaskContext {
final ReadAheadAgent agent;
ReadBlockCacheStatus status;

public ReadTaskContext(ReadAheadAgent agent, ReadBlockCacheStatus status) {
this.agent = agent;
this.status = status;
}

void setStatus(ReadBlockCacheStatus status) {
this.status = status;
}
}

public record ReadAheadRecord(long nextRAOffset) {
}

public enum ReadBlockCacheStatus {
/* Status for read request */
INIT,
WAIT_INFLIGHT_RA,
GET_FROM_CACHE,
GET_FROM_S3,

/* Status for read ahead request */
WAIT_DATA_INDEX,
WAIT_FETCH_DATA,
WAIT_THROTTLE,
}

}
Loading

0 comments on commit 848cdde

Please sign in to comment.