From 6d23aa5101ccfa68c1767e419df156ed8a5072cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Tue, 13 Aug 2024 11:02:10 +0200 Subject: [PATCH] feat: use FileSerde.writeAll and buffering for improved performances --- .../java/io/kestra/plugin/redis/list/ListPop.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/kestra/plugin/redis/list/ListPop.java b/src/main/java/io/kestra/plugin/redis/list/ListPop.java index 288d64b..18a7a26 100644 --- a/src/main/java/io/kestra/plugin/redis/list/ListPop.java +++ b/src/main/java/io/kestra/plugin/redis/list/ListPop.java @@ -13,6 +13,8 @@ import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.io.*; import java.net.URI; @@ -21,6 +23,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import static io.kestra.core.utils.Rethrow.throwFunction; + @SuperBuilder @ToString @EqualsAndHashCode @@ -71,7 +75,7 @@ public Output run(RunContext runContext) throws Exception { throw new Exception("maxDuration or maxRecords must be set to avoid infinite loop"); } - try (BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile))) { + try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) { AtomicInteger total = new AtomicInteger(); ZonedDateTime started = ZonedDateTime.now(); @@ -79,10 +83,9 @@ public Output run(RunContext runContext) throws Exception { do { List data = factory.listPop(key, count); empty = data.isEmpty(); - for (String str : data) { - FileSerde.write(output, this.serdeType.deserialize(str)); - total.getAndIncrement(); - } + var flux = Flux.fromIterable(data).map(throwFunction(str -> this.serdeType.deserialize(str))); + Mono longMono = FileSerde.writeAll(output, flux); + total.addAndGet(longMono.block().intValue()); } while (!this.ended(empty, total, started));