Skip to content

Commit

Permalink
fix(s3stream): bytebuf leak (#924)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Feb 6, 2024
1 parent 480eb04 commit 94b0216
Showing 1 changed file with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -220,7 +221,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
rangeRead0(path, start, end, cf);
}

Timeout timeout = timeoutDetect.newTimeout((t) -> LOGGER.warn("rangeRead {} {}-{} timeout", path, start, end), 1, TimeUnit.MINUTES);
Timeout timeout = timeoutDetect.newTimeout((t) -> LOGGER.warn("rangeRead {} {}-{} timeout", path, start, end), 3, TimeUnit.MINUTES);
return cf.whenComplete((rst, ex) -> timeout.cancel());
}

Expand Down Expand Up @@ -302,6 +303,17 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
TimerUtil timerUtil = new TimerUtil();
long size = end - start + 1;
GetObjectRequest request = GetObjectRequest.builder().bucket(bucket).key(path).range(range(start, end)).build();
Consumer<Throwable> failHandler = (ex) -> {
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
} else {
LOGGER.warn("GetObject for object {} [{}, {}) fail, retry later", path, start, end, ex);
scheduler.schedule(() -> mergedRangeRead0(path, start, end, cf), 100, TimeUnit.MILLISECONDS);
}
S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
};

readS3Client.getObject(request, AsyncResponseTransformer.toPublisher())
.thenAccept(responsePublisher -> {
S3ObjectStats.getInstance().objectDownloadSizeStats.record(MetricsLevel.INFO, size);
Expand All @@ -317,17 +329,14 @@ void mergedRangeRead0(String path, long start, long end, CompletableFuture<ByteB
S3OperationStats.getInstance().downloadSizeTotalStats.add(MetricsLevel.INFO, size);
S3OperationStats.getInstance().getObjectStats(size, true).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
cf.complete(buf);
}).exceptionally(ex -> {
buf.release();
failHandler.accept(ex);
return null;
});
})
.exceptionally(ex -> {
S3OperationStats.getInstance().getObjectStats(size, false).record(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (isUnrecoverable(ex)) {
LOGGER.error("GetObject for object {} [{}, {}) fail", path, start, end, ex);
cf.completeExceptionally(ex);
} else {
LOGGER.warn("GetObject for object {} [{}, {}) fail, retry later", path, start, end, ex);
scheduler.schedule(() -> mergedRangeRead0(path, start, end, cf), 100, TimeUnit.MILLISECONDS);
}
failHandler.accept(ex);
return null;
});
}
Expand Down

0 comments on commit 94b0216

Please sign in to comment.