diff --git a/embedding_moquette/pom.xml b/embedding_moquette/pom.xml index dd2a1753c..8c71768f6 100644 --- a/embedding_moquette/pom.xml +++ b/embedding_moquette/pom.xml @@ -18,6 +18,13 @@ moquette-broker ${project.version} + + + + org.slf4j + slf4j-reload4j + ${slf4j.version} + @@ -30,6 +37,15 @@ true + + + org.codehaus.mojo + exec-maven-plugin + 3.1.0 + + io.moquette.testembedded.EmbeddedLauncher + + diff --git a/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java b/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java index 0c0fb6290..fa512e39e 100644 --- a/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java +++ b/embedding_moquette/src/main/java/io/moquette/testembedded/EmbeddedLauncher.java @@ -15,6 +15,7 @@ */ package io.moquette.testembedded; +import io.moquette.broker.ClientDescriptor; import io.moquette.broker.Server; import io.moquette.interception.AbstractInterceptHandler; import io.moquette.interception.InterceptHandler; @@ -29,6 +30,7 @@ import io.netty.handler.codec.mqtt.MqttQoS; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -39,7 +41,7 @@ * */ public final class EmbeddedLauncher { - static class PublisherListener extends AbstractInterceptHandler { + private class PublisherListener extends AbstractInterceptHandler { @Override public String getID() { @@ -50,6 +52,17 @@ public String getID() { public void onPublish(InterceptPublishMessage msg) { final String decodedPayload = msg.getPayload().toString(UTF_8); System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload); + if ("/command".equals(msg.getTopicName())) { + switch (decodedPayload) { + case "exit": + System.out.println("EXITING broker by /command exit"); + shutdown(); + return; + case "list_clients": + listClients(); + return; + } + } } @Override @@ -59,22 +72,33 @@ public void onSessionLoopError(Throwable error) { } public static void main(String[] args) throws InterruptedException, IOException { + final EmbeddedLauncher launcher = new EmbeddedLauncher(); + launcher.start(); + } + + private Server mqttBroker; + + private EmbeddedLauncher() { + } + + private void start() throws IOException, InterruptedException { IResourceLoader classpathLoader = new ClasspathResourceLoader(); final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); - final Server mqttBroker = new Server(); + mqttBroker = new Server(); List userHandlers = Collections.singletonList(new PublisherListener()); mqttBroker.startServer(classPathConfig, userHandlers); System.out.println("Broker started press [CTRL+C] to stop"); - //Bind a shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("Stopping broker"); - mqttBroker.stopServer(); - System.out.println("Broker stopped"); - })); + + //Bind a shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); Thread.sleep(20000); + internalPublish("Hello World!!"); + } + + private void internalPublish(String messageText) { System.out.println("Before self publish"); MqttPublishMessage message = MqttMessageBuilders.publish() .topicName("/exit") @@ -82,13 +106,29 @@ public static void main(String[] args) throws InterruptedException, IOException // qos(MqttQoS.AT_MOST_ONCE); // qQos(MqttQoS.AT_LEAST_ONCE); .qos(MqttQoS.EXACTLY_ONCE) - .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8))) + .payload(Unpooled.copiedBuffer(messageText.getBytes(UTF_8))) .build(); mqttBroker.internalPublish(message, "INTRLPUB"); System.out.println("After self publish"); } - private EmbeddedLauncher() { + private void shutdown() { + listClients(); + + System.out.println("Stopping broker"); + mqttBroker.stopServer(); + System.out.println("Broker stopped"); + } + + private void listClients() { + final Collection connectedClients = mqttBroker.listConnectedClients(); + if (connectedClients.isEmpty()) { + System.out.println("No connected clients"); + } + for (ClientDescriptor client : connectedClients) { + System.out.println(client); + } } + } diff --git a/embedding_moquette/src/main/resources/log4j.properties b/embedding_moquette/src/main/resources/log4j.properties new file mode 100644 index 000000000..f157f7ed8 --- /dev/null +++ b/embedding_moquette/src/main/resources/log4j.properties @@ -0,0 +1,34 @@ +log4j.rootLogger=ERROR, stdout, file, messagelog + +log4j.logger.io.moquette=WARN + +# stdout appender is set to be a ConsoleAppender. +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Threshold=TRACE +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +#log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c{1} %x - %m%n +log4j.appender.stdout.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%X{channel}%X{client.id}%X{msg.type} %-5p %c{1} %M %L %x - %m%n + +#file appender +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.Threshold=INFO +log4j.appender.file.File=moquette.log +log4j.appender.file.MaxFileSize=100MB +log4j.appender.file.MaxBackupIndex=1 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +#log4j.appender.file.layout.ConversionPattern=%-4r [%t] %-5p %c{1} %x - %m%n +log4j.appender.file.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t]%X{channel}%X{client.id}%X{msg.type} %-5p %c{1} %M %L %x - %m%n + + +#################################### +# Message Logger Configuration # +##################################### +log4j.appender.messagelog=org.apache.log4j.RollingFileAppender +log4j.appender.messagelog.Threshold=DEBUG +log4j.appender.messagelog.File=moquette_messages.log +log4j.appender.messagelog.MaxFileSize=100MB +log4j.appender.messagelog.MaxBackupIndex=1 +log4j.appender.messagelog.layout=org.apache.log4j.PatternLayout +log4j.appender.messagelog.layout.ConversionPattern=%d{dd/MM/yyyy HH:mm:ss,SSS} [%t] %-5p %c{1} %L %x - %m%n + +log4j.category.io.moquette.broker.metrics.MQTTMessageLogger=DEBUG, messagelog