Skip to content

Commit

Permalink
fix(store): fix end offset of fetch result (#725)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 26, 2023
1 parent 69254d2 commit e6c2bba
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,14 @@ public CompletableFuture<PopResult> popRetry(StoreContext context, long consumer
return retryStreamIdFuture.thenCompose(retryStreamId -> pop(context, consumerGroupId, retryStreamId, offset, PopOperation.PopOperationType.POP_RETRY, filter, batchSize, invisibleDuration));
}

record FetchResult(List<FlatMessageExt> messageList, long endOffset) {
public int size() {
return messageList.size();
}
}

@WithSpan(kind = SpanKind.SERVER)
private CompletableFuture<List<FlatMessageExt>> fetchMessages(StoreContext context, @SpanAttribute long streamId,
private CompletableFuture<FetchResult> fetchMessages(StoreContext context, @SpanAttribute long streamId,
@SpanAttribute long offset, @SpanAttribute int batchSize) {
long startOffset = streamStore.startOffset(streamId);
if (offset < startOffset) {
Expand All @@ -395,13 +401,14 @@ private CompletableFuture<List<FlatMessageExt>> fetchMessages(StoreContext conte

long confirmOffset = streamStore.confirmOffset(streamId);
if (offset >= confirmOffset) {
return CompletableFuture.completedFuture(Collections.emptyList());
return CompletableFuture.completedFuture(new FetchResult(Collections.emptyList(), confirmOffset));
}

if (offset + batchSize > confirmOffset) {
batchSize = (int) (confirmOffset - offset);
}

long finalOffset = offset;
return streamStore.fetch(context, streamId, offset, batchSize)
.thenApply(fetchResult -> {
AtomicLong fetchBytes = new AtomicLong();
Expand All @@ -422,7 +429,7 @@ private CompletableFuture<List<FlatMessageExt>> fetchMessages(StoreContext conte

context.span().ifPresent(span -> span.setAttribute("fetchBytes", fetchBytes.get()));

return resultList;
return new FetchResult(resultList, finalOffset + resultList.size());
});
}

Expand All @@ -436,23 +443,23 @@ private CompletableFuture<FilterFetchResult> fetchAndFilterMessages(StoreContext
return fetchMessages(context, streamId, offset, fetchBatchSize)
.thenCompose(fetchResult -> {
// Add filter result to message list.
List<FlatMessageExt> matchedMessageList = filter.doFilter(fetchResult);
List<FlatMessageExt> matchedMessageList = filter.doFilter(fetchResult.messageList());
// Update end offset
int index = batchSize - result.size();
if (matchedMessageList.size() > index) {
FlatMessageExt messageExt = matchedMessageList.get(index);
result.setEndOffset(messageExt.offset());
result.addMessageList(matchedMessageList.subList(0, index));
} else {
result.setEndOffset(offset + fetchResult.size());
result.setEndOffset(fetchResult.endOffset());
result.addMessageList(matchedMessageList);
}
// If not enough messages after applying filter, fetch more messages.
boolean needToFetch = result.size() < batchSize;
boolean hasMoreMessages = fetchResult.size() >= fetchBatchSize;

int newFetchCount = fetchCount + fetchResult.size();
long newFetchBytes = fetchBytes + fetchResult.stream()
long newFetchBytes = fetchBytes + fetchResult.messageList().stream()
.map(messageExt -> (long) messageExt.message().getByteBuffer().limit())
.reduce(0L, Long::sum);
boolean notExceedLimit = newFetchCount < config.maxFetchCount() &&
Expand Down

0 comments on commit e6c2bba

Please sign in to comment.