Skip to content

Commit

Permalink
feat: use FileSerde.writeAll and buffering for improved performances
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 13, 2024
1 parent 53241dc commit 6d23aa5
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/main/java/io/kestra/plugin/redis/list/ListPop.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.*;
import java.net.URI;
Expand All @@ -21,6 +23,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -71,18 +75,17 @@ public Output run(RunContext runContext) throws Exception {
throw new Exception("maxDuration or maxRecords must be set to avoid infinite loop");
}

try (BufferedOutputStream output = new BufferedOutputStream(new FileOutputStream(tempFile))) {
try (var output = new BufferedWriter(new FileWriter(tempFile), FileSerde.BUFFER_SIZE)) {
AtomicInteger total = new AtomicInteger();
ZonedDateTime started = ZonedDateTime.now();

boolean empty;
do {
List<String> data = factory.listPop(key, count);
empty = data.isEmpty();
for (String str : data) {
FileSerde.write(output, this.serdeType.deserialize(str));
total.getAndIncrement();
}
var flux = Flux.fromIterable(data).map(throwFunction(str -> this.serdeType.deserialize(str)));
Mono<Long> longMono = FileSerde.writeAll(output, flux);
total.addAndGet(longMono.block().intValue());
}
while (!this.ended(empty, total, started));

Expand Down

0 comments on commit 6d23aa5

Please sign in to comment.