diff --git a/embedding_moquette/pom.xml b/embedding_moquette/pom.xml
index dd2a1753c..8c71768f6 100644
--- a/embedding_moquette/pom.xml
+++ b/embedding_moquette/pom.xml
@@ -18,6 +18,13 @@
moquette-broker
${project.version}
+
+
+
+ org.slf4j
+ slf4j-reload4j
+ ${slf4j.version}
+
@@ -30,6 +37,15 @@
true
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.1.0
+
+ io.moquette.testembedded.EmbeddedLauncher
+
+
diff --git a/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java b/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java
index 0c0fb6290..fa512e39e 100644
--- a/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java
+++ b/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java
@@ -15,6 +15,7 @@
*/
package io.moquette.testembedded;
+import io.moquette.broker.ClientDescriptor;
import io.moquette.broker.Server;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
@@ -29,6 +30,7 @@
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -39,7 +41,7 @@
* */
public final class EmbeddedLauncher {
- static class PublisherListener extends AbstractInterceptHandler {
+ private class PublisherListener extends AbstractInterceptHandler {
@Override
public String getID() {
@@ -50,6 +52,17 @@ public String getID() {
public void onPublish(InterceptPublishMessage msg) {
final String decodedPayload = msg.getPayload().toString(UTF_8);
System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
+ if ("/command".equals(msg.getTopicName())) {
+ switch (decodedPayload) {
+ case "exit":
+ System.out.println("EXITING broker by /command exit");
+ shutdown();
+ return;
+ case "list_clients":
+ listClients();
+ return;
+ }
+ }
}
@Override
@@ -59,22 +72,33 @@ public void onSessionLoopError(Throwable error) {
}
public static void main(String[] args) throws InterruptedException, IOException {
+ final EmbeddedLauncher launcher = new EmbeddedLauncher();
+ launcher.start();
+ }
+
+ private Server mqttBroker;
+
+ private EmbeddedLauncher() {
+ }
+
+ private void start() throws IOException, InterruptedException {
IResourceLoader classpathLoader = new ClasspathResourceLoader();
final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);
- final Server mqttBroker = new Server();
+ mqttBroker = new Server();
List extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener());
mqttBroker.startServer(classPathConfig, userHandlers);
System.out.println("Broker started press [CTRL+C] to stop");
- //Bind a shutdown hook
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- System.out.println("Stopping broker");
- mqttBroker.stopServer();
- System.out.println("Broker stopped");
- }));
+
+ //Bind a shutdown hook
+ Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
Thread.sleep(20000);
+ internalPublish("Hello World!!");
+ }
+
+ private void internalPublish(String messageText) {
System.out.println("Before self publish");
MqttPublishMessage message = MqttMessageBuilders.publish()
.topicName("/exit")
@@ -82,13 +106,29 @@ public static void main(String[] args) throws InterruptedException, IOException
// qos(MqttQoS.AT_MOST_ONCE);
// qQos(MqttQoS.AT_LEAST_ONCE);
.qos(MqttQoS.EXACTLY_ONCE)
- .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
+ .payload(Unpooled.copiedBuffer(messageText.getBytes(UTF_8)))
.build();
mqttBroker.internalPublish(message, "INTRLPUB");
System.out.println("After self publish");
}
- private EmbeddedLauncher() {
+ private void shutdown() {
+ listClients();
+
+ System.out.println("Stopping broker");
+ mqttBroker.stopServer();
+ System.out.println("Broker stopped");
+ }
+
+ private void listClients() {
+ final Collection connectedClients = mqttBroker.listConnectedClients();
+ if (connectedClients.isEmpty()) {
+ System.out.println("No connected clients");
+ }
+ for (ClientDescriptor client : connectedClients) {
+ System.out.println(client);
+ }
}
+
}
diff --git a/embedding_moquette/src/main/resources/log4j.properties b/embedding_moquette/src/main/resources/log4j.properties
new file mode 100644
index 000000000..f157f7ed8
--- /dev/null
+++ b/embedding_moquette/src/main/resources/log4j.properties
@@ -0,0 +1,34 @@
+log4j.rootLogger=ERROR, stdout, file, messagelog
+
+log4j.logger.io.moquette=WARN
+
+# stdout appender is set to be a ConsoleAppender.
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Threshold=TRACE
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+#log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c{1} %x - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%X{channel}%X{client.id}%X{msg.type} %-5p %c{1} %M %L %x - %m%n
+
+#file appender
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.Threshold=INFO
+log4j.appender.file.File=moquette.log
+log4j.appender.file.MaxFileSize=100MB
+log4j.appender.file.MaxBackupIndex=1
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+#log4j.appender.file.layout.ConversionPattern=%-4r [%t] %-5p %c{1} %x - %m%n
+log4j.appender.file.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%X{channel}%X{client.id}%X{msg.type} %-5p %c{1} %M %L %x - %m%n
+
+
+####################################
+# Message Logger Configuration #
+#####################################
+log4j.appender.messagelog=org.apache.log4j.RollingFileAppender
+log4j.appender.messagelog.Threshold=DEBUG
+log4j.appender.messagelog.File=moquette_messages.log
+log4j.appender.messagelog.MaxFileSize=100MB
+log4j.appender.messagelog.MaxBackupIndex=1
+log4j.appender.messagelog.layout=org.apache.log4j.PatternLayout
+log4j.appender.messagelog.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t] %-5p %c{1} %L %x - %m%n
+
+log4j.category.io.moquette.broker.metrics.MQTTMessageLogger=DEBUG, messagelog