From 02f4c451185d3899cf8984e900d035a57ad83d56 Mon Sep 17 00:00:00 2001 From: Thomas BOUSSELIN Date: Mon, 10 Jun 2024 15:22:19 +0200 Subject: [PATCH] feat: test suscribe to actual mqtt broker to verify the request was received --- .../stellio/subscription/service/mqtt/Mqtt.kt | 1 + .../service/mqtt/MqttNotificationData.kt | 14 +- .../service/mqtt/MqttNotificationService.kt | 52 +++++--- .../mqtt/MqttNotificationServiceTest.kt | 123 +++++++++++++++--- .../support/WithMosquittoContainer.kt | 7 +- .../test/resources/mosquitto/mosquitto.conf | 1 - 6 files changed, 154 insertions(+), 44 deletions(-) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt index a258e8a95f..7512b6b5d5 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt @@ -11,6 +11,7 @@ object Mqtt { object QualityOfService { const val KEY = "MQTT-QoS" const val AT_MOST_ONCE = 0 + const val EXACTLY_ONCE = 2 } object SCHEME { diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationData.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationData.kt index 3d6ed21672..d5eb79721b 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationData.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationData.kt @@ -4,12 +4,9 @@ import com.egm.stellio.subscription.model.Notification data class MqttNotificationData( val topic: String, - val mqttMessage: MqttMessage, + val message: MqttMessage, val qos: Int, - val brokerUrl: String, - val clientId: String, - val username: String, - val password: String? = null, + val connection: MqttConnectionData ) { data class MqttMessage( @@ -17,3 +14,10 @@ data class MqttNotificationData( val metadata: Map = emptyMap(), ) } + +data class MqttConnectionData( + val brokerUrl: String, + val clientId: String, + val username: String, + val password: String? = null, +) diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationService.kt index 5290f9a1fb..e101fc6bcc 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationService.kt @@ -47,12 +47,14 @@ class MqttNotificationService( val data = MqttNotificationData( topic = uri.path, - brokerUrl = brokerUrl, - clientId = clientId, qos = qos, - mqttMessage = MqttNotificationData.MqttMessage(notification, headers), - username = username, - password = password + message = MqttNotificationData.MqttMessage(notification, headers), + connection = MqttConnectionData( + brokerUrl = brokerUrl, + clientId = clientId, + username = username, + password = password + ) ) try { @@ -62,18 +64,28 @@ class MqttNotificationService( Mqtt.Version.V5 -> callMqttV5(data) else -> callMqttV5(data) } - logger.info("successfull mqtt notification for uri : ${data.brokerUrl} version: $mqttVersion") + logger.info("successfull mqtt notification for uri : $uri version: $mqttVersion") return true } catch (e: MqttExceptionV3) { - logger.error("failed mqttv3 notification for uri : ${data.brokerUrl}", e) + logger.error("failed mqttv3 notification for uri : $uri", e) return false } catch (e: MqttExceptionV5) { - logger.error("failed mqttv5 notification for uri : ${data.brokerUrl}", e) + logger.error("failed mqttv5 notification for uri : $uri", e) return false } } internal suspend fun callMqttV3(data: MqttNotificationData) { + val mqttClient = connectMqttv3(data.connection) + val message = MqttMessage( + serializeObject(data.message).toByteArray() + ) + message.qos = data.qos + mqttClient.publish(data.topic, message) + mqttClient.disconnect() + } + + internal suspend fun connectMqttv3(data: MqttConnectionData): MqttClient { val persistence = MemoryPersistence() val mqttClient = MqttClient(data.brokerUrl, data.clientId, persistence) val connOpts = MqttConnectOptions() @@ -81,28 +93,28 @@ class MqttNotificationService( connOpts.userName = data.username connOpts.password = data.password?.toCharArray() ?: "".toCharArray() mqttClient.connect(connOpts) - val message = MqttMessage( - serializeObject(data.mqttMessage).toByteArray() - ) + return mqttClient + } + + internal suspend fun callMqttV5(data: MqttNotificationData) { + val mqttClient = connectMqttv5(data.connection) + val message = org.eclipse.paho.mqttv5.common.MqttMessage(serializeObject(data.message).toByteArray()) message.qos = data.qos - mqttClient.publish(data.topic, message) + val token = mqttClient.publish(data.topic, message) + token.waitForCompletion() mqttClient.disconnect() + mqttClient.close() } - internal suspend fun callMqttV5(data: MqttNotificationData) { + internal suspend fun connectMqttv5(data: MqttConnectionData): MqttAsyncClient { val persistence = org.eclipse.paho.mqttv5.client.persist.MemoryPersistence() val mqttClient = MqttAsyncClient(data.brokerUrl, data.clientId, persistence) val connOpts = MqttConnectionOptions() connOpts.isCleanStart = true connOpts.userName = data.username connOpts.password = data.password?.toByteArray() - var token: IMqttToken = mqttClient.connect(connOpts) - token.waitForCompletion() - val message = org.eclipse.paho.mqttv5.common.MqttMessage(serializeObject(data.mqttMessage).toByteArray()) - message.qos = data.qos - token = mqttClient.publish(data.topic, message) + val token: IMqttToken = mqttClient.connect(connOpts) token.waitForCompletion() - mqttClient.disconnect() - mqttClient.close() + return mqttClient } } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationServiceTest.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationServiceTest.kt index 93640adfea..d5370f8234 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationServiceTest.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/mqtt/MqttNotificationServiceTest.kt @@ -1,6 +1,7 @@ package com.egm.stellio.subscription.service.mqtt import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM +import com.egm.stellio.shared.util.JsonUtils.serializeObject import com.egm.stellio.shared.util.toUri import com.egm.stellio.subscription.model.* import com.egm.stellio.subscription.support.WithMosquittoContainer @@ -8,13 +9,23 @@ import com.ninjasquad.springmockk.SpykBean import io.mockk.coEvery import io.mockk.coVerify import kotlinx.coroutines.test.runTest +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.mqttv5.client.IMqttToken +import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse +import org.eclipse.paho.mqttv5.common.MqttSubscription +import org.eclipse.paho.mqttv5.common.packet.MqttProperties +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow import org.junit.jupiter.api.assertThrows import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.context.ActiveProfiles import java.net.URI +import org.eclipse.paho.client.mqttv3.MqttCallback as MqttCallbackV3 +import org.eclipse.paho.client.mqttv3.MqttMessage as MqttMessageV3 +import org.eclipse.paho.mqttv5.client.MqttCallback as MqttCallbackV5 +import org.eclipse.paho.mqttv5.common.MqttMessage as MqttMessageV5 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = [MqttNotificationService::class]) @ActiveProfiles("test") @@ -23,9 +34,8 @@ class MqttNotificationServiceTest : WithMosquittoContainer { @SpykBean private lateinit var mqttNotificationService: MqttNotificationService - private val mqttContainerPort = WithMosquittoContainer.mosquittoContainer.getMappedPort( - Mqtt.SCHEME.MQTT_DEFAULT_PORT - ) + private val mqttContainerPort = WithMosquittoContainer.getBasicPort() + private val mqttSubscriptionV3 = Subscription( type = NGSILD_SUBSCRIPTION_TERM, subscriptionName = "My Subscription", @@ -38,7 +48,7 @@ class MqttNotificationServiceTest : WithMosquittoContainer { accept = Endpoint.AcceptType.JSONLD, notifierInfo = listOf( EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V3), - EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.AT_MOST_ONCE.toString()) + EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.EXACTLY_ONCE.toString()) ) ) ), @@ -53,19 +63,22 @@ class MqttNotificationServiceTest : WithMosquittoContainer { endpoint = Endpoint( uri = "mqtt://test@localhost:$mqttContainerPort/notification".toUri(), notifierInfo = listOf( - EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V5) + EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V5), + EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.EXACTLY_ONCE.toString()) ) ) ), contexts = emptyList() ) private val validMqttNotificationData = MqttNotificationData( - brokerUrl = "tcp://localhost:$mqttContainerPort", - clientId = "clientId", - username = "test", + connection = MqttConnectionData( + brokerUrl = "tcp://localhost:$mqttContainerPort", + clientId = "clientId", + username = "test", + ), topic = "/notification", qos = 0, - mqttMessage = MqttNotificationData.MqttMessage(getNotificationForSubscription(mqttSubscriptionV3), emptyMap()) + message = MqttNotificationData.MqttMessage(getNotificationForSubscription(mqttSubscriptionV3), emptyMap()) ) private val notification = Notification( @@ -74,12 +87,17 @@ class MqttNotificationServiceTest : WithMosquittoContainer { ) private val invalidUriMqttNotificationData = MqttNotificationData( - brokerUrl = "tcp://badHost:1883", - clientId = "clientId", - username = "test", + connection = MqttConnectionData( + brokerUrl = "tcp://badHost:1883", + clientId = "clientId", + username = "test", + ), topic = "notification", qos = 0, - mqttMessage = MqttNotificationData.MqttMessage(notification, emptyMap()) + message = MqttNotificationData.MqttMessage( + notification, + emptyMap(), + ) ) private fun getNotificationForSubscription(subscription: Subscription) = Notification( @@ -88,7 +106,7 @@ class MqttNotificationServiceTest : WithMosquittoContainer { ) @Test - fun `mqttNotifier should process endpoint uri to get connexion information`() = runTest { + fun `mqttNotifier should process endpoint uri to get connection information`() = runTest { val subscription = mqttSubscriptionV3 coEvery { mqttNotificationService.callMqttV3(any()) } returns Unit assert( @@ -102,10 +120,10 @@ class MqttNotificationServiceTest : WithMosquittoContainer { coVerify { mqttNotificationService.callMqttV3( match { - it.username == validMqttNotificationData.username && - it.password == validMqttNotificationData.password && + it.connection.username == validMqttNotificationData.connection.username && + it.connection.password == validMqttNotificationData.connection.password && it.topic == validMqttNotificationData.topic && - it.brokerUrl == validMqttNotificationData.brokerUrl + it.connection.brokerUrl == validMqttNotificationData.connection.brokerUrl } ) } @@ -152,7 +170,21 @@ class MqttNotificationServiceTest : WithMosquittoContainer { @Test fun `sending mqttV3 notification with good uri should succeed`() = runTest { + // if we give the same clientId the mqtt server close the connection + val testConnectionData = validMqttNotificationData.connection.copy(clientId = "test-broker") + val mqttClient = mqttNotificationService.connectMqttv3(testConnectionData) + val messageReceiver = MqttV3MessageReceiver() + mqttClient.setCallback(messageReceiver) + mqttClient.subscribe(validMqttNotificationData.topic) + assertDoesNotThrow { mqttNotificationService.callMqttV3(validMqttNotificationData) } + Thread.sleep(10) // wait to receive notification in message receiver + assertEquals( + serializeObject(validMqttNotificationData.message), + messageReceiver.lastReceivedMessage + ) + mqttClient.disconnect() + mqttClient.close() } @Test @@ -162,7 +194,20 @@ class MqttNotificationServiceTest : WithMosquittoContainer { @Test fun `sending mqttV5 notification with good uri should succeed`() = runTest { + val testConnectionData = validMqttNotificationData.connection.copy(clientId = "test-broker") + val mqttClient = mqttNotificationService.connectMqttv5(testConnectionData) + val messageReceiver = MqttV5MessageReceiver() + mqttClient.setCallback(messageReceiver) + mqttClient.subscribe(MqttSubscription(validMqttNotificationData.topic)) + assertDoesNotThrow { mqttNotificationService.callMqttV5(validMqttNotificationData) } + + assertEquals( + serializeObject(validMqttNotificationData.message), + messageReceiver.lastReceivedMessage + ) + mqttClient.disconnect() + mqttClient.close() } @Test @@ -173,4 +218,48 @@ class MqttNotificationServiceTest : WithMosquittoContainer { ) } } + + private class MqttV3MessageReceiver : MqttCallbackV3 { + var lastReceivedMessage: String? = null + + override fun messageArrived(topic: String, message: MqttMessageV3) { + lastReceivedMessage = message.payload?.decodeToString() + } + + override fun deliveryComplete(p0: IMqttDeliveryToken?) { + println("delivery complete") + } + + override fun connectionLost(p0: Throwable?) { + println("connection lost") + } + } + + private class MqttV5MessageReceiver : MqttCallbackV5 { + var lastReceivedMessage: String? = null + + override fun messageArrived(topic: String?, message: MqttMessageV5?) { + lastReceivedMessage = message?.payload?.decodeToString() + } + + override fun disconnected(p0: MqttDisconnectResponse?) { + println("connection lost") + } + + override fun mqttErrorOccurred(p0: org.eclipse.paho.mqttv5.common.MqttException?) { + println("mqtt error occured") + } + + override fun deliveryComplete(p0: IMqttToken?) { + println("delivery complete") + } + + override fun connectComplete(p0: Boolean, p1: String?) { + println("connection success") + } + + override fun authPacketArrived(p0: Int, p1: MqttProperties?) { + println("auth") + } + } } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt index 5db96c764a..fcef13bd45 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt @@ -1,5 +1,6 @@ package com.egm.stellio.subscription.support +import com.egm.stellio.subscription.service.mqtt.Mqtt import org.testcontainers.containers.GenericContainer import org.testcontainers.utility.DockerImageName import org.testcontainers.utility.MountableFile @@ -13,13 +14,17 @@ interface WithMosquittoContainer { val mosquittoContainer = GenericContainer(mosquittoImage).apply { withReuse(true) - withExposedPorts(1883) + withExposedPorts(Mqtt.SCHEME.MQTT_DEFAULT_PORT) withCopyFileToContainer( MountableFile.forClasspathResource("/mosquitto/mosquitto.conf"), "/mosquitto/config/mosquitto.conf" ) } + fun getBasicPort() = mosquittoContainer.getMappedPort( + Mqtt.SCHEME.MQTT_DEFAULT_PORT + ) + init { mosquittoContainer.start() } diff --git a/subscription-service/src/test/resources/mosquitto/mosquitto.conf b/subscription-service/src/test/resources/mosquitto/mosquitto.conf index 65adc872cb..2bfe79bd5b 100644 --- a/subscription-service/src/test/resources/mosquitto/mosquitto.conf +++ b/subscription-service/src/test/resources/mosquitto/mosquitto.conf @@ -1,4 +1,3 @@ # see possible config : https://mosquitto.org/man/mosquitto-conf-5.html allow_anonymous true listener 1883 -listener 9001