Skip to content

Commit

Permalink
feat: test suscribe to actual mqtt broker to verify the request was r…
Browse files Browse the repository at this point in the history
…eceived
  • Loading branch information
thomasBousselin committed Jun 10, 2024
1 parent 49e288d commit 02f4c45
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ object Mqtt {
object QualityOfService {
const val KEY = "MQTT-QoS"
const val AT_MOST_ONCE = 0
const val EXACTLY_ONCE = 2
}

object SCHEME {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ import com.egm.stellio.subscription.model.Notification

data class MqttNotificationData(
val topic: String,
val mqttMessage: MqttMessage,
val message: MqttMessage,
val qos: Int,
val brokerUrl: String,
val clientId: String,
val username: String,
val password: String? = null,
val connection: MqttConnectionData
) {

data class MqttMessage(
val body: Notification,
val metadata: Map<String, String> = emptyMap(),
)
}

data class MqttConnectionData(
val brokerUrl: String,
val clientId: String,
val username: String,
val password: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ class MqttNotificationService(

val data = MqttNotificationData(
topic = uri.path,
brokerUrl = brokerUrl,
clientId = clientId,
qos = qos,
mqttMessage = MqttNotificationData.MqttMessage(notification, headers),
username = username,
password = password
message = MqttNotificationData.MqttMessage(notification, headers),
connection = MqttConnectionData(
brokerUrl = brokerUrl,
clientId = clientId,
username = username,
password = password
)
)

try {
Expand All @@ -62,47 +64,57 @@ class MqttNotificationService(
Mqtt.Version.V5 -> callMqttV5(data)
else -> callMqttV5(data)
}
logger.info("successfull mqtt notification for uri : ${data.brokerUrl} version: $mqttVersion")
logger.info("successfull mqtt notification for uri : $uri version: $mqttVersion")
return true
} catch (e: MqttExceptionV3) {
logger.error("failed mqttv3 notification for uri : ${data.brokerUrl}", e)
logger.error("failed mqttv3 notification for uri : $uri", e)
return false
} catch (e: MqttExceptionV5) {
logger.error("failed mqttv5 notification for uri : ${data.brokerUrl}", e)
logger.error("failed mqttv5 notification for uri : $uri", e)
return false
}
}

internal suspend fun callMqttV3(data: MqttNotificationData) {
val mqttClient = connectMqttv3(data.connection)
val message = MqttMessage(
serializeObject(data.message).toByteArray()
)
message.qos = data.qos
mqttClient.publish(data.topic, message)
mqttClient.disconnect()
}

internal suspend fun connectMqttv3(data: MqttConnectionData): MqttClient {
val persistence = MemoryPersistence()
val mqttClient = MqttClient(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = data.username
connOpts.password = data.password?.toCharArray() ?: "".toCharArray()
mqttClient.connect(connOpts)
val message = MqttMessage(
serializeObject(data.mqttMessage).toByteArray()
)
return mqttClient
}

internal suspend fun callMqttV5(data: MqttNotificationData) {
val mqttClient = connectMqttv5(data.connection)
val message = org.eclipse.paho.mqttv5.common.MqttMessage(serializeObject(data.message).toByteArray())
message.qos = data.qos
mqttClient.publish(data.topic, message)
val token = mqttClient.publish(data.topic, message)
token.waitForCompletion()
mqttClient.disconnect()
mqttClient.close()
}

internal suspend fun callMqttV5(data: MqttNotificationData) {
internal suspend fun connectMqttv5(data: MqttConnectionData): MqttAsyncClient {
val persistence = org.eclipse.paho.mqttv5.client.persist.MemoryPersistence()
val mqttClient = MqttAsyncClient(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectionOptions()
connOpts.isCleanStart = true
connOpts.userName = data.username
connOpts.password = data.password?.toByteArray()
var token: IMqttToken = mqttClient.connect(connOpts)
token.waitForCompletion()
val message = org.eclipse.paho.mqttv5.common.MqttMessage(serializeObject(data.mqttMessage).toByteArray())
message.qos = data.qos
token = mqttClient.publish(data.topic, message)
val token: IMqttToken = mqttClient.connect(connOpts)
token.waitForCompletion()
mqttClient.disconnect()
mqttClient.close()
return mqttClient
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package com.egm.stellio.subscription.service.mqtt

import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.egm.stellio.shared.util.toUri
import com.egm.stellio.subscription.model.*
import com.egm.stellio.subscription.support.WithMosquittoContainer
import com.ninjasquad.springmockk.SpykBean
import io.mockk.coEvery
import io.mockk.coVerify
import kotlinx.coroutines.test.runTest
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.mqttv5.client.IMqttToken
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse
import org.eclipse.paho.mqttv5.common.MqttSubscription
import org.eclipse.paho.mqttv5.common.packet.MqttProperties
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.ActiveProfiles
import java.net.URI
import org.eclipse.paho.client.mqttv3.MqttCallback as MqttCallbackV3
import org.eclipse.paho.client.mqttv3.MqttMessage as MqttMessageV3
import org.eclipse.paho.mqttv5.client.MqttCallback as MqttCallbackV5
import org.eclipse.paho.mqttv5.common.MqttMessage as MqttMessageV5

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = [MqttNotificationService::class])
@ActiveProfiles("test")
Expand All @@ -23,9 +34,8 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
@SpykBean
private lateinit var mqttNotificationService: MqttNotificationService

private val mqttContainerPort = WithMosquittoContainer.mosquittoContainer.getMappedPort(
Mqtt.SCHEME.MQTT_DEFAULT_PORT
)
private val mqttContainerPort = WithMosquittoContainer.getBasicPort()

private val mqttSubscriptionV3 = Subscription(
type = NGSILD_SUBSCRIPTION_TERM,
subscriptionName = "My Subscription",
Expand All @@ -38,7 +48,7 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
accept = Endpoint.AcceptType.JSONLD,
notifierInfo = listOf(
EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V3),
EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.AT_MOST_ONCE.toString())
EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.EXACTLY_ONCE.toString())
)
)
),
Expand All @@ -53,19 +63,22 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
endpoint = Endpoint(
uri = "mqtt://test@localhost:$mqttContainerPort/notification".toUri(),
notifierInfo = listOf(
EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V5)
EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V5),
EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.EXACTLY_ONCE.toString())
)
)
),
contexts = emptyList()
)
private val validMqttNotificationData = MqttNotificationData(
brokerUrl = "tcp://localhost:$mqttContainerPort",
clientId = "clientId",
username = "test",
connection = MqttConnectionData(
brokerUrl = "tcp://localhost:$mqttContainerPort",
clientId = "clientId",
username = "test",
),
topic = "/notification",
qos = 0,
mqttMessage = MqttNotificationData.MqttMessage(getNotificationForSubscription(mqttSubscriptionV3), emptyMap())
message = MqttNotificationData.MqttMessage(getNotificationForSubscription(mqttSubscriptionV3), emptyMap())
)

private val notification = Notification(
Expand All @@ -74,12 +87,17 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
)

private val invalidUriMqttNotificationData = MqttNotificationData(
brokerUrl = "tcp://badHost:1883",
clientId = "clientId",
username = "test",
connection = MqttConnectionData(
brokerUrl = "tcp://badHost:1883",
clientId = "clientId",
username = "test",
),
topic = "notification",
qos = 0,
mqttMessage = MqttNotificationData.MqttMessage(notification, emptyMap())
message = MqttNotificationData.MqttMessage(
notification,
emptyMap(),
)
)

private fun getNotificationForSubscription(subscription: Subscription) = Notification(
Expand All @@ -88,7 +106,7 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
)

@Test
fun `mqttNotifier should process endpoint uri to get connexion information`() = runTest {
fun `mqttNotifier should process endpoint uri to get connection information`() = runTest {
val subscription = mqttSubscriptionV3
coEvery { mqttNotificationService.callMqttV3(any()) } returns Unit
assert(
Expand All @@ -102,10 +120,10 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
coVerify {
mqttNotificationService.callMqttV3(
match {
it.username == validMqttNotificationData.username &&
it.password == validMqttNotificationData.password &&
it.connection.username == validMqttNotificationData.connection.username &&
it.connection.password == validMqttNotificationData.connection.password &&
it.topic == validMqttNotificationData.topic &&
it.brokerUrl == validMqttNotificationData.brokerUrl
it.connection.brokerUrl == validMqttNotificationData.connection.brokerUrl
}
)
}
Expand Down Expand Up @@ -152,7 +170,21 @@ class MqttNotificationServiceTest : WithMosquittoContainer {

@Test
fun `sending mqttV3 notification with good uri should succeed`() = runTest {
// if we give the same clientId the mqtt server close the connection
val testConnectionData = validMqttNotificationData.connection.copy(clientId = "test-broker")
val mqttClient = mqttNotificationService.connectMqttv3(testConnectionData)
val messageReceiver = MqttV3MessageReceiver()
mqttClient.setCallback(messageReceiver)
mqttClient.subscribe(validMqttNotificationData.topic)

assertDoesNotThrow { mqttNotificationService.callMqttV3(validMqttNotificationData) }
Thread.sleep(10) // wait to receive notification in message receiver
assertEquals(
serializeObject(validMqttNotificationData.message),
messageReceiver.lastReceivedMessage
)
mqttClient.disconnect()
mqttClient.close()
}

@Test
Expand All @@ -162,7 +194,20 @@ class MqttNotificationServiceTest : WithMosquittoContainer {

@Test
fun `sending mqttV5 notification with good uri should succeed`() = runTest {
val testConnectionData = validMqttNotificationData.connection.copy(clientId = "test-broker")
val mqttClient = mqttNotificationService.connectMqttv5(testConnectionData)
val messageReceiver = MqttV5MessageReceiver()
mqttClient.setCallback(messageReceiver)
mqttClient.subscribe(MqttSubscription(validMqttNotificationData.topic))

assertDoesNotThrow { mqttNotificationService.callMqttV5(validMqttNotificationData) }

assertEquals(
serializeObject(validMqttNotificationData.message),
messageReceiver.lastReceivedMessage
)
mqttClient.disconnect()
mqttClient.close()
}

@Test
Expand All @@ -173,4 +218,48 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
)
}
}

private class MqttV3MessageReceiver : MqttCallbackV3 {
var lastReceivedMessage: String? = null

override fun messageArrived(topic: String, message: MqttMessageV3) {
lastReceivedMessage = message.payload?.decodeToString()
}

override fun deliveryComplete(p0: IMqttDeliveryToken?) {
println("delivery complete")
}

override fun connectionLost(p0: Throwable?) {
println("connection lost")
}
}

private class MqttV5MessageReceiver : MqttCallbackV5 {
var lastReceivedMessage: String? = null

override fun messageArrived(topic: String?, message: MqttMessageV5?) {
lastReceivedMessage = message?.payload?.decodeToString()
}

override fun disconnected(p0: MqttDisconnectResponse?) {
println("connection lost")
}

override fun mqttErrorOccurred(p0: org.eclipse.paho.mqttv5.common.MqttException?) {
println("mqtt error occured")
}

override fun deliveryComplete(p0: IMqttToken?) {
println("delivery complete")
}

override fun connectComplete(p0: Boolean, p1: String?) {
println("connection success")
}

override fun authPacketArrived(p0: Int, p1: MqttProperties?) {
println("auth")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.egm.stellio.subscription.support

import com.egm.stellio.subscription.service.mqtt.Mqtt
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import org.testcontainers.utility.MountableFile
Expand All @@ -13,13 +14,17 @@ interface WithMosquittoContainer {

val mosquittoContainer = GenericContainer<Nothing>(mosquittoImage).apply {
withReuse(true)
withExposedPorts(1883)
withExposedPorts(Mqtt.SCHEME.MQTT_DEFAULT_PORT)
withCopyFileToContainer(
MountableFile.forClasspathResource("/mosquitto/mosquitto.conf"),
"/mosquitto/config/mosquitto.conf"
)
}

fun getBasicPort() = mosquittoContainer.getMappedPort(
Mqtt.SCHEME.MQTT_DEFAULT_PORT
)

init {
mosquittoContainer.start()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# see possible config : https://mosquitto.org/man/mosquitto-conf-5.html
allow_anonymous true
listener 1883
listener 9001

0 comments on commit 02f4c45

Please sign in to comment.