Skip to content

Commit

Permalink
feat: use virtual threads in MQTT Subscribe
Browse files Browse the repository at this point in the history
This is safe as those threads are mainly doing I/O.
  • Loading branch information
loicmathieu committed Jun 17, 2024
1 parent 89248bb commit 4a813e9
Showing 1 changed file with 1 addition and 8 deletions.
9 changes: 1 addition & 8 deletions src/main/java/io/kestra/plugin/mqtt/Subscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.plugin.mqtt.services.Message;
import io.kestra.plugin.mqtt.services.MqttFactory;
import io.kestra.plugin.mqtt.services.MqttInterface;
import io.kestra.plugin.mqtt.services.SerdeType;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.io.BufferedOutputStream;
import java.io.File;
Expand Down Expand Up @@ -92,18 +89,14 @@ public Output run(RunContext runContext) throws Exception {
AtomicInteger total = new AtomicInteger();
ZonedDateTime started = ZonedDateTime.now();

thread = new Thread(throwRunnable(() -> {
thread = Thread.ofVirtual().name("mqtt-subscribe").start(throwRunnable(() -> {
connection.subscribe(runContext, this, throwConsumer(message -> {
FileSerde.write(output, message);

total.getAndIncrement();
count.compute(message.getTopic(), (s, integer) -> integer == null ? 1 : integer + 1);
}));

}));
thread.setDaemon(true);
thread.setName("mqtt-subscribe");
thread.start();

while (!this.ended(total, started)) {
//noinspection BusyWait
Expand Down

0 comments on commit 4a813e9

Please sign in to comment.