Skip to content

Commit

Permalink
fix: fix RealtimeTrigger
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed May 31, 2024
1 parent 5258a47 commit f73d127
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 21 deletions.
26 changes: 21 additions & 5 deletions src/main/java/io/kestra/plugin/mqtt/RealtimeTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.*;
Expand All @@ -22,6 +21,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

@SuperBuilder
@ToString
Expand All @@ -44,7 +44,7 @@
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
message: "{{ trigger.payload }}"
triggers:
- id: realtime_trigger
Expand Down Expand Up @@ -122,16 +122,26 @@ public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerC
}

public Publisher<Message> publisher(final Subscribe task, final RunContext runContext) throws Exception {
MqttInterface connection = MqttFactory.create(runContext, task);
final MqttInterface connection = MqttFactory.create(runContext, task);

return Flux.create(emitter -> {
try {

final AtomicReference<Throwable> error = new AtomicReference<>();

// The MQTT client is automatically shutdown if an exception is thrown in the client
// e.g., while processing a message
connection.onDisconnected(throwable -> {
error.set(throwable);
isActive.set(false); // proactively stop consuming
});

emitter.onDispose(() -> {
try {
connection.unsubscribe(runContext, task);
connection.close();
} catch (Exception e) {
runContext.logger().warn("Error while closing connection: " + e.getMessage());
runContext.logger().debug("Error while closing connection: " + e.getMessage());
} finally {
this.waitForTermination.countDown();
}
Expand All @@ -140,9 +150,15 @@ public Publisher<Message> publisher(final Subscribe task, final RunContext runCo
connection.subscribe(runContext, task, emitter::next);

busyWait();
emitter.complete();

// dispose
if (error.get() != null) {
emitter.error(error.get());
} else {
emitter.complete();
}
} catch (Exception e) {
isActive.set(false);
emitter.error(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface MqttInterface {

void close() throws Exception;

void onDisconnected(final Consumer<Throwable> handler);

default MqttInterface create(AbstractMqttConnection.Version version) {
if (version == AbstractMqttConnection.Version.V5) {
return new MqttV5Service();
Expand Down
44 changes: 37 additions & 7 deletions src/main/java/io/kestra/plugin/mqtt/services/MqttV3Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,23 @@ public void subscribe(RunContext runContext, Subscribe subscribe, Consumer<Messa
String[] topics = subscribe.topics(runContext);

IMqttMessageListener messageListener = (topic, message) -> {
consumer.accept(Message.builder()
.topic(topic)
.id(message.getId())
.qos(message.getQos())
.payload(subscribe.getSerdeType().deserialize(message.getPayload()))
.retain(message.isRetained())
.build());
try {
consumer.accept(Message.builder()
.topic(topic)
.id(message.getId())
.qos(message.getQos())
.payload(subscribe.getSerdeType().deserialize(message.getPayload()))
.retain(message.isRetained())
.build());
} catch (Exception e) {
runContext.logger().error(
"Cannot process message {id: {}} from topic '{}'. Cause: {}",
message.getId(),
topic,
e.getMessage()
);
throw e;
}
};

IMqttMessageListener[] listeners = new IMqttMessageListener[topics.length];
Expand All @@ -115,6 +125,26 @@ public void unsubscribe(RunContext runContext, Subscribe subscribe) throws Excep
unsubscribe.waitForCompletion();
}

@Override
public void onDisconnected(final Consumer<Throwable> handler) {
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
handler.accept(cause);
}

@Override
public void messageArrived(String topic, MqttMessage message) {

}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {

}
});
}

@Override
public void close() throws Exception {
try {
Expand Down
66 changes: 57 additions & 9 deletions src/main/java/io/kestra/plugin/mqtt/services/MqttV5Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
Expand Down Expand Up @@ -99,15 +102,25 @@ public void subscribe(RunContext runContext, Subscribe subscribe, Consumer<Messa
}

client.subscribe(subscriptions, null, null, (topic, message) -> {
consumer.accept(Message.builder()
.topic(topic)
.id(message.getId())
.qos(message.getQos())
.payload(subscribe.getSerdeType().deserialize(message.getPayload()))
.retain(message.isRetained())
.properties(message.getProperties().getValidProperties())
.build()
);
try {
consumer.accept(Message.builder()
.topic(topic)
.id(message.getId())
.qos(message.getQos())
.payload(subscribe.getSerdeType().deserialize(message.getPayload()))
.retain(message.isRetained())
.properties(message.getProperties().getValidProperties())
.build()
);
} catch (Exception e) {
runContext.logger().error(
"Cannot process message {id: {}} from topic '{}'. Cause: {}",
message.getId(),
topic,
e.getMessage()
);
throw e;
}
}, props);
}

Expand All @@ -117,6 +130,41 @@ public void unsubscribe(RunContext runContext, Subscribe subscribe) throws Excep
unsubscribe.waitForCompletion();
}

@Override
public void onDisconnected(final Consumer<Throwable> handler) {
client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
handler.accept(disconnectResponse.getException().getCause());
}

@Override
public void mqttErrorOccurred(MqttException exception) {

}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {

}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {

}

@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {

}
});
}

@Override
public void close() throws Exception {
try {
Expand Down

0 comments on commit f73d127

Please sign in to comment.