From d7902233d3d7477e68180162d337a32b231cb252 Mon Sep 17 00:00:00 2001 From: Thomas BOUSSELIN Date: Wed, 29 May 2024 18:22:09 +0200 Subject: [PATCH] feat(mqtt notification): --- .../egm/stellio/shared/model/ApiExceptions.kt | 2 + subscription-service/build.gradle.kts | 2 + .../service/NotificationService.kt | 69 +- .../service/mqtt/MQTTNotificationData.kt | 19 + .../service/mqtt/MQTTNotificationService.kt | 68 ++ .../service/mqtt/MQTTVersionService.kt | 50 + .../stellio/subscription/service/mqtt/Mqtt.kt | 26 + .../src/main/resources/application.properties | 21 +- .../service/MqttNotificationServiceTest.kt | 138 +++ .../service/MqttVersionServiceTest.kt | 72 ++ .../service/NotificationServiceTests.kt | 62 ++ .../service/SubscriptionServiceTests.kt | 22 +- .../support/WithMosquittoContainer.kt | 27 + .../test/resources/mosquitto/mosquitto.conf | 906 ++++++++++++++++++ 14 files changed, 1436 insertions(+), 48 deletions(-) create mode 100644 subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationData.kt create mode 100644 subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationService.kt create mode 100644 subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTVersionService.kt create mode 100644 subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt create mode 100644 subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttNotificationServiceTest.kt create mode 100644 subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttVersionServiceTest.kt create mode 100644 subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt create mode 100644 subscription-service/src/test/resources/mosquitto/mosquitto.conf diff --git a/shared/src/main/kotlin/com/egm/stellio/shared/model/ApiExceptions.kt b/shared/src/main/kotlin/com/egm/stellio/shared/model/ApiExceptions.kt index e96b511082..41d0a796c8 100644 --- a/shared/src/main/kotlin/com/egm/stellio/shared/model/ApiExceptions.kt +++ b/shared/src/main/kotlin/com/egm/stellio/shared/model/ApiExceptions.kt @@ -20,6 +20,7 @@ data class NotImplementedException(override val message: String) : APIException( data class LdContextNotAvailableException(override val message: String) : APIException(message) data class NonexistentTenantException(override val message: String) : APIException(message) data class NotAcceptableException(override val message: String) : APIException(message) +data class BadSchemeException(override val message: String) : APIException(message) fun Throwable.toAPIException(specificMessage: String? = null): APIException = when (this) { @@ -28,5 +29,6 @@ fun Throwable.toAPIException(specificMessage: String? = null): APIException = if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED) LdContextNotAvailableException(specificMessage ?: "Unable to load remote context (cause was: $this)") else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)") + else -> BadRequestDataException(specificMessage ?: this.localizedMessage) } diff --git a/subscription-service/build.gradle.kts b/subscription-service/build.gradle.kts index 4b8eec5dff..cee56e3201 100644 --- a/subscription-service/build.gradle.kts +++ b/subscription-service/build.gradle.kts @@ -20,6 +20,8 @@ dependencies { implementation("org.postgresql:r2dbc-postgresql") implementation("com.jayway.jsonpath:json-path:2.9.0") implementation(project(":shared")) + implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5") + implementation("org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5") detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:1.23.6") diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt index bef5e57996..d2a632a208 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/NotificationService.kt @@ -13,6 +13,8 @@ import com.egm.stellio.subscription.model.Notification import com.egm.stellio.subscription.model.NotificationParams import com.egm.stellio.subscription.model.NotificationTrigger import com.egm.stellio.subscription.model.Subscription +import com.egm.stellio.subscription.service.mqtt.MQTTNotificationService +import com.egm.stellio.subscription.service.mqtt.Mqtt import org.slf4j.LoggerFactory import org.springframework.http.HttpHeaders import org.springframework.http.HttpStatus @@ -23,7 +25,8 @@ import org.springframework.web.reactive.function.client.awaitExchange @Service class NotificationService( - private val subscriptionService: SubscriptionService + private val subscriptionService: SubscriptionService, + private val mqttNotificationService: MQTTNotificationService ) { private val logger = LoggerFactory.getLogger(javaClass) @@ -70,32 +73,50 @@ class NotificationService( ) val uri = subscription.notification.endpoint.uri.toString() logger.info("Notification is about to be sent to $uri for subscription ${subscription.id}") - val request = - WebClient.create(uri).post().contentType(mediaType).headers { - if (mediaType == MediaType.APPLICATION_JSON) { - it.set(HttpHeaders.LINK, subscriptionService.getContextsLink(subscription)) - } - if (tenantName != DEFAULT_TENANT_NAME) - it.set(NGSILD_TENANT_HEADER, tenantName) - subscription.notification.endpoint.receiverInfo?.forEach { endpointInfo -> - it.set(endpointInfo.key, endpointInfo.value) + + val headerMap: MutableMap = emptyMap().toMutableMap() + if (mediaType == MediaType.APPLICATION_JSON) { + headerMap[HttpHeaders.LINK] = subscriptionService.getContextsLink(subscription) + } + if (tenantName != DEFAULT_TENANT_NAME) + headerMap[NGSILD_TENANT_HEADER] = tenantName + subscription.notification.endpoint.receiverInfo?.forEach { endpointInfo -> + headerMap[endpointInfo.key] = endpointInfo.value + } + + val result = + kotlin.runCatching { + if (uri.startsWith(Mqtt.SCHEME.MQTT)) { + headerMap["Content-Type"] = mediaType.toString() // could be common with line 99 ? + Triple( + subscription, + notification, + mqttNotificationService.mqttNotifier( + notification = notification, + subscription = subscription, + headers = headerMap + ) + ) + } else { + val request = + WebClient.create(uri).post().contentType(mediaType).headers { it.setAll(headerMap) } + request + .bodyValue(serializeObject(notification)) + .awaitExchange { response -> + val success = response.statusCode() == HttpStatus.OK + logger.info( + "The notification sent has been received with ${if (success) "success" else "failure"}" + ) + if (!success) { + logger.error("Failed to send notification to $uri: ${response.statusCode()}") + } + Triple(subscription, notification, success) + } } + }.getOrElse { + Triple(subscription, notification, false) } - val result = kotlin.runCatching { - request - .bodyValue(serializeObject(notification)) - .awaitExchange { response -> - val success = response.statusCode() == HttpStatus.OK - logger.info("The notification sent has been received with ${if (success) "success" else "failure"}") - if (!success) { - logger.error("Failed to send notification to $uri: ${response.statusCode()}") - } - Triple(subscription, notification, success) - } - }.getOrElse { - Triple(subscription, notification, false) - } subscriptionService.updateSubscriptionNotification(result.first, result.second, result.third) return result } diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationData.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationData.kt new file mode 100644 index 0000000000..9c68832e5d --- /dev/null +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationData.kt @@ -0,0 +1,19 @@ +package com.egm.stellio.subscription.service.mqtt + +import com.egm.stellio.subscription.model.Notification + +data class MQTTNotificationData( + val topic: String, + val mqttMessage: MqttMessage, + val qos: Int, + val brokerUrl: String, + val clientId: String, + val username: String, + val password: String? = null, +) { + + data class MqttMessage( + val body: Notification, + val metadata: Map = emptyMap(), + ) +} diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationService.kt new file mode 100644 index 0000000000..fd440fb900 --- /dev/null +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTNotificationService.kt @@ -0,0 +1,68 @@ +package com.egm.stellio.subscription.service.mqtt + +import com.egm.stellio.shared.model.BadSchemeException +import com.egm.stellio.subscription.model.Notification +import com.egm.stellio.subscription.model.Subscription +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import org.eclipse.paho.client.mqttv3.MqttException as MqttExceptionV3 +import org.eclipse.paho.mqttv5.common.MqttException as MqttExceptionV5 + +@Service +class MQTTNotificationService( + @Value("\${mqtt.clientId}") + private val clientId: String = "stellio-context-brokerUrl", + private val mqttVersionService: MQTTVersionService +) { + + private val logger = LoggerFactory.getLogger(javaClass) + + suspend fun mqttNotifier( + subscription: Subscription, + notification: Notification, + headers: Map + ): Boolean { + val endpoint = subscription.notification.endpoint + val uri = endpoint.uri + val userInfo = uri.userInfo.split(':') + val username = userInfo.getOrNull(0) ?: "" + val password = userInfo.getOrNull(1) + val brokerScheme = Mqtt.SCHEME.brokerSchemeMap[uri.scheme] + ?: throw BadSchemeException("${uri.scheme} is not a valid mqtt scheme") + + val brokerPort = if (uri.port != -1) uri.port else Mqtt.SCHEME.defaultPortMap[uri.scheme] + + val brokerUrl = "$brokerScheme://${uri.host}:$brokerPort" + val notifierInfo = endpoint.notifierInfo?.map { it.key to it.value }?.toMap() ?: emptyMap() + val qos = + notifierInfo[Mqtt.QualityOfService.KEY]?.let { Integer.parseInt(it) } ?: Mqtt.QualityOfService.AT_MOST_ONCE + + val data = MQTTNotificationData( + topic = uri.path, + brokerUrl = brokerUrl, + clientId = clientId, + qos = qos, + mqttMessage = MQTTNotificationData.MqttMessage(notification, headers), + username = username, + password = password + ) + + try { + val mqttVersion = notifierInfo[Mqtt.Version.KEY] + when (mqttVersion) { + Mqtt.Version.V3 -> mqttVersionService.callMqttV3(data) + Mqtt.Version.V5 -> mqttVersionService.callMqttV5(data) + else -> mqttVersionService.callMqttV5(data) + } + logger.info("successfull mqtt notification for uri : ${data.brokerUrl} version: $mqttVersion") + return true + } catch (e: MqttExceptionV3) { + logger.error("failed mqttv3 notification for uri : ${data.brokerUrl}", e) + return false + } catch (e: MqttExceptionV5) { + logger.error("failed mqttv5 notification for uri : ${data.brokerUrl}", e) + return false + } + } +} diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTVersionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTVersionService.kt new file mode 100644 index 0000000000..2d05bccd40 --- /dev/null +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/MQTTVersionService.kt @@ -0,0 +1,50 @@ +package com.egm.stellio.subscription.service.mqtt + +import com.egm.stellio.shared.util.JsonUtils.serializeObject +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions +import org.springframework.stereotype.Service +import org.eclipse.paho.client.mqttv3.MqttClient as MqttClientv3 +import org.eclipse.paho.client.mqttv3.MqttConnectOptions as MqttConnectOptionsv3 +import org.eclipse.paho.client.mqttv3.MqttMessage as MqttMessagev3 +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence as MemoryPersistencev3 +import org.eclipse.paho.mqttv5.client.IMqttToken as IMqttTokenv5 +import org.eclipse.paho.mqttv5.client.MqttAsyncClient as MqttAsyncClientv5 +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence as MemoryPersistencev5 +import org.eclipse.paho.mqttv5.common.MqttMessage as MqttMessagev5 + +@Service +class MQTTVersionService { + + internal suspend fun callMqttV3(data: MQTTNotificationData) { + val persistence = MemoryPersistencev3() + val sampleClient = MqttClientv3(data.brokerUrl, data.clientId, persistence) + val connOpts = MqttConnectOptionsv3() + connOpts.isCleanSession = true + connOpts.userName = data.username + connOpts.password = data.password?.toCharArray() ?: "".toCharArray() + sampleClient.connect(connOpts) + val message = MqttMessagev3( + serializeObject(data.mqttMessage).toByteArray() + ) + message.qos = data.qos + sampleClient.publish(data.topic, message) + sampleClient.disconnect() + } + + internal suspend fun callMqttV5(data: MQTTNotificationData) { + val persistence = MemoryPersistencev5() + val sampleClient = MqttAsyncClientv5(data.brokerUrl, data.clientId, persistence) + val connOpts = MqttConnectionOptions() + connOpts.isCleanStart = true + connOpts.userName = data.username + connOpts.password = data.password?.toByteArray() + var token: IMqttTokenv5 = sampleClient.connect(connOpts) + token.waitForCompletion() + val message = MqttMessagev5(serializeObject(data.mqttMessage).toByteArray()) + message.qos = data.qos + token = sampleClient.publish(data.topic, message) + token.waitForCompletion() + sampleClient.disconnect() + sampleClient.close() + } +} diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt new file mode 100644 index 0000000000..610385daea --- /dev/null +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/mqtt/Mqtt.kt @@ -0,0 +1,26 @@ +package com.egm.stellio.subscription.service.mqtt + +object Mqtt { + + object Version { + const val KEY = "MQTT-Version" + const val V3 = "mqtt3.1.1" + const val V5 = "mqtt5.0" + } + + object QualityOfService { + const val KEY = "MQTT-QoS" + const val AT_MOST_ONCE = 0 +// const val AT_LEAST_ONCE = 1 +// const val EXACTLY_ONCE = 2 + } + + object SCHEME { + const val MQTT = "mqtt" + const val MQTTS = "mqtts" + const val MQTT_DEFAULT_PORT = 1883 + const val MQTTS_DEFAULT_PORT = 8883 + val defaultPortMap = mapOf(MQTT to MQTT_DEFAULT_PORT, MQTTS to MQTTS_DEFAULT_PORT) + val brokerSchemeMap = mapOf(MQTT to "tcp", MQTTS to "ssl") + } +} diff --git a/subscription-service/src/main/resources/application.properties b/subscription-service/src/main/resources/application.properties index 39ddb7865b..730f430621 100644 --- a/subscription-service/src/main/resources/application.properties +++ b/subscription-service/src/main/resources/application.properties @@ -1,23 +1,18 @@ spring.config.import=classpath:/shared.properties - -spring.r2dbc.url = r2dbc:postgresql://localhost/stellio_subscription -spring.r2dbc.username = stellio -spring.r2dbc.password = stellio_password - +spring.r2dbc.url=r2dbc:postgresql://localhost/stellio_subscription +spring.r2dbc.username=stellio +spring.r2dbc.password=stellio_password # Required for Flyway to know where the DB is located -spring.flyway.url = jdbc:postgresql://localhost/stellio_subscription -spring.flyway.user = ${spring.r2dbc.username} -spring.flyway.password = ${spring.r2dbc.password} - +spring.flyway.url=jdbc:postgresql://localhost/stellio_subscription +spring.flyway.user=${spring.r2dbc.username} +spring.flyway.password=${spring.r2dbc.password} # Client registration used to get entities from search-service spring.security.oauth2.client.registration.keycloak.authorization-grant-type=client_credentials spring.security.oauth2.client.registration.keycloak.client-id=client-id spring.security.oauth2.client.registration.keycloak.client-secret=client-secret spring.security.oauth2.client.provider.keycloak.token-uri=https://my.sso/token - subscription.entity-service-url=http://localhost:8083 - # Stellio url used to form the link to get the contexts associated to a notification subscription.stellio-url=http://localhost:8080 - -server.port = 8084 +server.port=8084 +mqtt.clientId=stellio-mqtt-client diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttNotificationServiceTest.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttNotificationServiceTest.kt new file mode 100644 index 0000000000..2968e93216 --- /dev/null +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttNotificationServiceTest.kt @@ -0,0 +1,138 @@ +package com.egm.stellio.subscription.service + +import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM +import com.egm.stellio.shared.util.toUri +import com.egm.stellio.subscription.model.* +import com.egm.stellio.subscription.service.mqtt.MQTTNotificationData +import com.egm.stellio.subscription.service.mqtt.MQTTNotificationService +import com.egm.stellio.subscription.service.mqtt.MQTTVersionService +import com.egm.stellio.subscription.service.mqtt.Mqtt +import com.ninjasquad.springmockk.MockkBean +import io.mockk.coEvery +import io.mockk.coVerify +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = [MQTTNotificationService::class]) +@ActiveProfiles("test") +class MqttNotificationServiceTest { + + @Autowired + private lateinit var mqttNotificationService: MQTTNotificationService + + @MockkBean + private lateinit var mqttVersionService: MQTTVersionService + + private val mqttSubscription = Subscription( + type = NGSILD_SUBSCRIPTION_TERM, + subscriptionName = "My Subscription", + description = "My beautiful subscription", + entities = emptySet(), + notification = NotificationParams( + attributes = emptyList(), + endpoint = Endpoint( + uri = "mqtt://test@localhost:1883/notification".toUri(), + accept = Endpoint.AcceptType.JSONLD, + notifierInfo = listOf( + EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V3), + EndpointInfo(Mqtt.QualityOfService.KEY, Mqtt.QualityOfService.AT_MOST_ONCE.toString()) + ) + ) + ), + contexts = emptyList() + ) + + private val mqttSubscriptionV5 = Subscription( + type = NGSILD_SUBSCRIPTION_TERM, + entities = emptySet(), + notification = NotificationParams( + attributes = emptyList(), + endpoint = Endpoint( + uri = "mqtt://test@localhost:1883/notification".toUri(), + notifierInfo = listOf( + EndpointInfo(Mqtt.Version.KEY, Mqtt.Version.V5) + ) + ) + ), + contexts = emptyList() + ) + private val validMqttNotificationData = MQTTNotificationData( + brokerUrl = "tcp://localhost:1883", + clientId = "clientId", + username = "test", + topic = "/notification", + qos = 0, + mqttMessage = MQTTNotificationData.MqttMessage(getNotificationForSubscription(mqttSubscription), emptyMap()) + ) + + private fun getNotificationForSubscription(subscription: Subscription) = Notification( + subscriptionId = subscription.id, + data = listOf(mapOf("hello" to "world")) + ) + + @Test + fun `mqttNotifier should process endpoint uri to get connexion information`() = runTest { + val subscription = mqttSubscription + coEvery { mqttVersionService.callMqttV3(any()) } returns Unit + assert( + mqttNotificationService.mqttNotifier( + subscription, + getNotificationForSubscription(subscription), + mapOf() + ) + ) + + coVerify { + mqttVersionService.callMqttV3( + match { + it.username == validMqttNotificationData.username && + it.password == validMqttNotificationData.password && + it.topic == validMqttNotificationData.topic && + it.brokerUrl == validMqttNotificationData.brokerUrl + } + ) + } + } + + @Test + fun `mqttNotifier should use notifier info to choose the mqtt version`() = runTest { + val subscription = mqttSubscription + coEvery { mqttVersionService.callMqttV3(any()) } returns Unit + coEvery { mqttVersionService.callMqttV5(any()) } returns Unit + + mqttNotificationService.mqttNotifier( + subscription, + getNotificationForSubscription(subscription), + mapOf() + ) + coVerify(exactly = 1) { + mqttVersionService.callMqttV3( + any() + ) + } + coVerify(exactly = 0) { + mqttVersionService.callMqttV5( + any() + ) + } + + mqttNotificationService.mqttNotifier( + mqttSubscriptionV5, + getNotificationForSubscription(subscription), + mapOf() + ) + coVerify(exactly = 1) { + mqttVersionService.callMqttV5( + any() + ) + } + coVerify(exactly = 1) { + mqttVersionService.callMqttV3( + any() + ) + } + } +} diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttVersionServiceTest.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttVersionServiceTest.kt new file mode 100644 index 0000000000..d3bbea23ce --- /dev/null +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/MqttVersionServiceTest.kt @@ -0,0 +1,72 @@ +package com.egm.stellio.subscription.service + +import com.egm.stellio.subscription.model.Notification +import com.egm.stellio.subscription.service.mqtt.MQTTNotificationData +import com.egm.stellio.subscription.service.mqtt.MQTTVersionService +import com.egm.stellio.subscription.service.mqtt.Mqtt +import com.egm.stellio.subscription.support.WithMosquittoContainer +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.junit.jupiter.api.assertThrows +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles +import java.net.URI +import org.eclipse.paho.client.mqttv3.MqttException as MqttExceptionV3 +import org.eclipse.paho.mqttv5.common.MqttException as MqttExceptionV5 + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = [MQTTVersionService::class]) +@ActiveProfiles("test") +class MqttVersionServiceTest : WithMosquittoContainer { + + @Autowired + private lateinit var mqttVersionService: MQTTVersionService + + private val mqttContainerPort = WithMosquittoContainer.mosquittoContainer.getMappedPort( + Mqtt.SCHEME.MQTT_DEFAULT_PORT + ) + + private val notification = Notification( + subscriptionId = URI("1"), + data = listOf(mapOf("hello" to "world")) + ) + + private val validMqttNotificationData = MQTTNotificationData( + brokerUrl = "tcp://localhost:$mqttContainerPort", + clientId = "clientId", + username = "test", + topic = "notification", + qos = 0, + mqttMessage = MQTTNotificationData.MqttMessage(notification, emptyMap()) + ) + + private val invalidUriMqttNotificationData = MQTTNotificationData( + brokerUrl = "tcp://badHost:1883", + clientId = "clientId", + username = "test", + topic = "notification", + qos = 0, + mqttMessage = MQTTNotificationData.MqttMessage(notification, emptyMap()) + ) + + @Test + fun `sending mqttV3 notification with good uri should succeed`() = runTest { + assertDoesNotThrow { mqttVersionService.callMqttV3(validMqttNotificationData) } + } + + @Test + fun `sending mqttV3 notification with bad uri should throw`() = runTest { + assertThrows { mqttVersionService.callMqttV3(invalidUriMqttNotificationData) } + } + + @Test + fun `sending mqttV5 notification with good uri should succeed`() = runTest { + assertDoesNotThrow { mqttVersionService.callMqttV5(validMqttNotificationData) } + } + + @Test + fun `sending mqttV5 notification with bad uri should throw`() = runTest { + assertThrows { mqttVersionService.callMqttV5(invalidUriMqttNotificationData) } + } +} diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt index ef72765580..c98228fa5c 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/NotificationServiceTests.kt @@ -13,9 +13,11 @@ import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_PROPERTY_TERM import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap import com.egm.stellio.shared.web.NGSILD_TENANT_HEADER import com.egm.stellio.subscription.model.Endpoint +import com.egm.stellio.subscription.model.EndpointInfo import com.egm.stellio.subscription.model.NotificationParams import com.egm.stellio.subscription.model.NotificationParams.FormatType import com.egm.stellio.subscription.model.NotificationTrigger.* +import com.egm.stellio.subscription.service.mqtt.MQTTNotificationService import com.egm.stellio.subscription.support.gimmeRawSubscription import com.github.tomakehurst.wiremock.client.WireMock.* import com.github.tomakehurst.wiremock.junit5.WireMockTest @@ -42,6 +44,9 @@ class NotificationServiceTests { @MockkBean private lateinit var subscriptionService: SubscriptionService + @MockkBean + private lateinit var mqttNotificationService: MQTTNotificationService + @Autowired private lateinit var notificationService: NotificationService @@ -558,4 +563,61 @@ class NotificationServiceTests { } } } + + @Test + fun `callSuscriber should ask mqttNotifier if the brokerUrl startWith mqtt`() = runTest { + val subscription = gimmeRawSubscription().copy( + notification = NotificationParams( + attributes = emptyList(), + endpoint = Endpoint( + uri = "mqtt://localhost:8089/notification".toUri(), + accept = Endpoint.AcceptType.JSON + ) + ) + ) + + coEvery { subscriptionService.getContextsLink(any()) } returns buildContextLinkHeader(NGSILD_TEST_CORE_CONTEXT) + coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1 + + coEvery { mqttNotificationService.mqttNotifier(any(), any(), any()) } returns true + + notificationService.callSubscriber(subscription, rawEntity.deserializeAsMap()) + + coVerify(exactly = 1) { mqttNotificationService.mqttNotifier(any(), any(), any()) } + } + + @Test + fun `callSuscriber should generate headers for mqtt notification`() = runTest { + val infoKey = "hello" + val infoValue = "world" + val subscription = gimmeRawSubscription().copy( + notification = NotificationParams( + attributes = emptyList(), + endpoint = Endpoint( + uri = "mqtt://localhost:8089/notification".toUri(), + accept = Endpoint.AcceptType.JSON, + receiverInfo = listOf(EndpointInfo(key = infoKey, value = infoValue)), + ) + ) + ) + + coEvery { mqttNotificationService.mqttNotifier(any(), any(), any()) } returns true + + coEvery { subscriptionService.getContextsLink(any()) } returns buildContextLinkHeader(NGSILD_TEST_CORE_CONTEXT) + coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1 + + notificationService.callSubscriber(subscription, rawEntity.deserializeAsMap()) + + coVerify { + mqttNotificationService.mqttNotifier( + any(), + any(), + match { + it["Content-Type"] == "application/json" && + it[infoKey] == infoValue && + it["Link"] != null + } + ) + } + } } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt index 867c2726be..c77b3a6c9e 100644 --- a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/service/SubscriptionServiceTests.kt @@ -38,7 +38,7 @@ import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.TestPropertySource import java.net.URI import java.time.ZonedDateTime -import java.util.UUID +import java.util.* import kotlin.time.Duration @SpringBootTest @@ -300,10 +300,10 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { ) && it.watchedAttributes == listOf(INCOMING_PROPERTY) && it.notificationTrigger == listOf( - ENTITY_CREATED.notificationTrigger, - ATTRIBUTE_UPDATED.notificationTrigger, - ENTITY_DELETED.notificationTrigger - ) && + ENTITY_CREATED.notificationTrigger, + ATTRIBUTE_UPDATED.notificationTrigger, + ENTITY_DELETED.notificationTrigger + ) && it.timeInterval == null && it.q == "foodQuantity<150;foodName=='dietary fibres'" && ( @@ -318,10 +318,10 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.notification.attributes == listOf(INCOMING_PROPERTY, OUTGOING_PROPERTY) && it.notification.format == FormatType.NORMALIZED && it.notification.endpoint == Endpoint( - URI("http://localhost:8084"), - Endpoint.AcceptType.JSON, - listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) - ) && + URI("http://localhost:8084"), + Endpoint.AcceptType.JSON, + listOf(EndpointInfo("Authorization-token", "Authorization-token-value")) + ) && it.notification.sysAttrs && it.expiresAt == ZonedDateTime.parse("2100-01-01T00:00:00Z") && it.throttling == 60 && @@ -769,8 +769,8 @@ class SubscriptionServiceTests : WithTimescaleContainer, WithKafkaContainer { it.notification.endpoint.accept.name == "JSONLD" && it.notification.endpoint.uri.toString() == "http://localhost:8080" && it.notification.endpoint.receiverInfo == listOf( - EndpointInfo("Authorization-token", "Authorization-token-newValue") - ) + EndpointInfo("Authorization-token", "Authorization-token-newValue") + ) } } diff --git a/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt new file mode 100644 index 0000000000..5db96c764a --- /dev/null +++ b/subscription-service/src/test/kotlin/com/egm/stellio/subscription/support/WithMosquittoContainer.kt @@ -0,0 +1,27 @@ +package com.egm.stellio.subscription.support + +import org.testcontainers.containers.GenericContainer +import org.testcontainers.utility.DockerImageName +import org.testcontainers.utility.MountableFile + +interface WithMosquittoContainer { + + companion object { + + private val mosquittoImage: DockerImageName = + DockerImageName.parse("eclipse-mosquitto:2.0.18") + + val mosquittoContainer = GenericContainer(mosquittoImage).apply { + withReuse(true) + withExposedPorts(1883) + withCopyFileToContainer( + MountableFile.forClasspathResource("/mosquitto/mosquitto.conf"), + "/mosquitto/config/mosquitto.conf" + ) + } + + init { + mosquittoContainer.start() + } + } +} diff --git a/subscription-service/src/test/resources/mosquitto/mosquitto.conf b/subscription-service/src/test/resources/mosquitto/mosquitto.conf new file mode 100644 index 0000000000..5b0f2e9f29 --- /dev/null +++ b/subscription-service/src/test/resources/mosquitto/mosquitto.conf @@ -0,0 +1,906 @@ +# Config file for mosquitto +# +# See mosquitto.conf(5) for more information. +# +# Default values are shown, uncomment to change. +# +# Use the # character to indicate a comment, but only if it is the +# very first character on the line. + +# ================================================================= +# General configuration +# ================================================================= + +# Use per listener security settings. +# +# It is recommended this option be set before any other options. +# +# If this option is set to true, then all authentication and access control +# options are controlled on a per listener basis. The following options are +# affected: +# +# acl_file +allow_anonymous true +listener 1883 +listener 9001 +# allow_zero_length_clientid +# auto_id_prefix +# password_file +# plugin +# plugin_opt_* +# psk_file +# +# Note that if set to true, then a durable client (i.e. with clean session set +# to false) that has disconnected will use the ACL settings defined for the +# listener that it was most recently connected to. +# +# The default behaviour is for this to be set to false, which maintains the +# setting behaviour from previous versions of mosquitto. +#per_listener_settings false + + +# This option controls whether a client is allowed to connect with a zero +# length client id or not. This option only affects clients using MQTT v3.1.1 +# and later. If set to false, clients connecting with a zero length client id +# are disconnected. If set to true, clients will be allocated a client id by +# the broker. This means it is only useful for clients with clean session set +# to true. +#allow_zero_length_clientid true + +# If allow_zero_length_clientid is true, this option allows you to set a prefix +# to automatically generated client ids to aid visibility in logs. +# Defaults to 'auto-' +#auto_id_prefix auto- + +# This option affects the scenario when a client subscribes to a topic that has +# retained messages. It is possible that the client that published the retained +# message to the topic had access at the time they published, but that access +# has been subsequently removed. If check_retain_source is set to true, the +# default, the source of a retained message will be checked for access rights +# before it is republished. When set to false, no check will be made and the +# retained message will always be published. This affects all listeners. +#check_retain_source true + +# QoS 1 and 2 messages will be allowed inflight per client until this limit +# is exceeded. Defaults to 0. (No maximum) +# See also max_inflight_messages +#max_inflight_bytes 0 + +# The maximum number of QoS 1 and 2 messages currently inflight per +# client. +# This includes messages that are partway through handshakes and +# those that are being retried. Defaults to 20. Set to 0 for no +# maximum. Setting to 1 will guarantee in-order delivery of QoS 1 +# and 2 messages. +#max_inflight_messages 20 + +# For MQTT v5 clients, it is possible to have the server send a "server +# keepalive" value that will override the keepalive value set by the client. +# This is intended to be used as a mechanism to say that the server will +# disconnect the client earlier than it anticipated, and that the client should +# use the new keepalive value. The max_keepalive option allows you to specify +# that clients may only connect with keepalive less than or equal to this +# value, otherwise they will be sent a server keepalive telling them to use +# max_keepalive. This only applies to MQTT v5 clients. The default, and maximum +# value allowable, is 65535. +# +# Set to 0 to allow clients to set keepalive = 0, which means no keepalive +# checks are made and the client will never be disconnected by the broker if no +# messages are received. You should be very sure this is the behaviour that you +# want. +# +# For MQTT v3.1.1 and v3.1 clients, there is no mechanism to tell the client +# what keepalive value they should use. If an MQTT v3.1.1 or v3.1 client +# specifies a keepalive time greater than max_keepalive they will be sent a +# CONNACK message with the "identifier rejected" reason code, and disconnected. +# +#max_keepalive 65535 + +# For MQTT v5 clients, it is possible to have the server send a "maximum packet +# size" value that will instruct the client it will not accept MQTT packets +# with size greater than max_packet_size bytes. This applies to the full MQTT +# packet, not just the payload. Setting this option to a positive value will +# set the maximum packet size to that number of bytes. If a client sends a +# packet which is larger than this value, it will be disconnected. This applies +# to all clients regardless of the protocol version they are using, but v3.1.1 +# and earlier clients will of course not have received the maximum packet size +# information. Defaults to no limit. Setting below 20 bytes is forbidden +# because it is likely to interfere with ordinary client operation, even with +# very small payloads. +#max_packet_size 0 + +# QoS 1 and 2 messages above those currently in-flight will be queued per +# client until this limit is exceeded. Defaults to 0. (No maximum) +# See also max_queued_messages. +# If both max_queued_messages and max_queued_bytes are specified, packets will +# be queued until the first limit is reached. +#max_queued_bytes 0 + +# Set the maximum QoS supported. Clients publishing at a QoS higher than +# specified here will be disconnected. +#max_qos 2 + +# The maximum number of QoS 1 and 2 messages to hold in a queue per client +# above those that are currently in-flight. Defaults to 1000. Set +# to 0 for no maximum (not recommended). +# See also queue_qos0_messages. +# See also max_queued_bytes. +#max_queued_messages 1000 +# +# This option sets the maximum number of heap memory bytes that the broker will +# allocate, and hence sets a hard limit on memory use by the broker. Memory +# requests that exceed this value will be denied. The effect will vary +# depending on what has been denied. If an incoming message is being processed, +# then the message will be dropped and the publishing client will be +# disconnected. If an outgoing message is being sent, then the individual +# message will be dropped and the receiving client will be disconnected. +# Defaults to no limit. +#memory_limit 0 + +# This option sets the maximum publish payload size that the broker will allow. +# Received messages that exceed this size will not be accepted by the broker. +# The default value is 0, which means that all valid MQTT messages are +# accepted. MQTT imposes a maximum payload size of 268435455 bytes. +#message_size_limit 0 + +# This option allows the session of persistent clients (those with clean +# session set to false) that are not currently connected to be removed if they +# do not reconnect within a certain time frame. This is a non-standard option +# in MQTT v3.1. MQTT v3.1.1 and v5.0 allow brokers to remove client sessions. +# +# Badly designed clients may set clean session to false whilst using a randomly +# generated client id. This leads to persistent clients that connect once and +# never reconnect. This option allows these clients to be removed. This option +# allows persistent clients (those with clean session set to false) to be +# removed if they do not reconnect within a certain time frame. +# +# The expiration period should be an integer followed by one of h d w m y for +# hour, day, week, month and year respectively. For example +# +# persistent_client_expiration 2m +# persistent_client_expiration 14d +# persistent_client_expiration 1y +# +# The default if not set is to never expire persistent clients. +#persistent_client_expiration + +# Write process id to a file. Default is a blank string which means +# a pid file shouldn't be written. +# This should be set to /var/run/mosquitto/mosquitto.pid if mosquitto is +# being run automatically on boot with an init script and +# start-stop-daemon or similar. +#pid_file + +# Set to true to queue messages with QoS 0 when a persistent client is +# disconnected. These messages are included in the limit imposed by +# max_queued_messages and max_queued_bytes +# Defaults to false. +# This is a non-standard option for the MQTT v3.1 spec but is allowed in +# v3.1.1. +#queue_qos0_messages false + +# Set to false to disable retained message support. If a client publishes a +# message with the retain bit set, it will be disconnected if this is set to +# false. +#retain_available true + +# Disable Nagle's algorithm on client sockets. This has the effect of reducing +# latency of individual messages at the potential cost of increasing the number +# of packets being sent. +#set_tcp_nodelay false + +# Time in seconds between updates of the $SYS tree. +# Set to 0 to disable the publishing of the $SYS tree. +#sys_interval 10 + +# The MQTT specification requires that the QoS of a message delivered to a +# subscriber is never upgraded to match the QoS of the subscription. Enabling +# this option changes this behaviour. If upgrade_outgoing_qos is set true, +# messages sent to a subscriber will always match the QoS of its subscription. +# This is a non-standard option explicitly disallowed by the spec. +#upgrade_outgoing_qos false + +# When run as root, drop privileges to this user and its primary +# group. +# Set to root to stay as root, but this is not recommended. +# If set to "mosquitto", or left unset, and the "mosquitto" user does not exist +# then it will drop privileges to the "nobody" user instead. +# If run as a non-root user, this setting has no effect. +# Note that on Windows this has no effect and so mosquitto should be started by +# the user you wish it to run as. +#user mosquitto + +# ================================================================= +# Listeners +# ================================================================= + +# Listen on a port/ip address combination. By using this variable +# multiple times, mosquitto can listen on more than one port. If +# this variable is used and neither bind_address nor port given, +# then the default listener will not be started. +# The port number to listen on must be given. Optionally, an ip +# address or host name may be supplied as a second argument. In +# this case, mosquitto will attempt to bind the listener to that +# address and so restrict access to the associated network and +# interface. By default, mosquitto will listen on all interfaces. +# Note that for a websockets listener it is not possible to bind to a host +# name. +# +# On systems that support Unix Domain Sockets, it is also possible +# to create a # Unix socket rather than opening a TCP socket. In +# this case, the port number should be set to 0 and a unix socket +# path must be provided, e.g. +# listener 0 /tmp/mosquitto.sock +# +# listener port-number [ip address/host name/unix socket path] +#listener + +# By default, a listener will attempt to listen on all supported IP protocol +# versions. If you do not have an IPv4 or IPv6 interface you may wish to +# disable support for either of those protocol versions. In particular, note +# that due to the limitations of the websockets library, it will only ever +# attempt to open IPv6 sockets if IPv6 support is compiled in, and so will fail +# if IPv6 is not available. +# +# Set to `ipv4` to force the listener to only use IPv4, or set to `ipv6` to +# force the listener to only use IPv6. If you want support for both IPv4 and +# IPv6, then do not use the socket_domain option. +# +#socket_domain + +# Bind the listener to a specific interface. This is similar to +# the [ip address/host name] part of the listener definition, but is useful +# when an interface has multiple addresses or the address may change. If used +# with the [ip address/host name] part of the listener definition, then the +# bind_interface option will take priority. +# Not available on Windows. +# +# Example: bind_interface eth0 +#bind_interface + +# When a listener is using the websockets protocol, it is possible to serve +# http data as well. Set http_dir to a directory which contains the files you +# wish to serve. If this option is not specified, then no normal http +# connections will be possible. +#http_dir + +# The maximum number of client connections to allow. This is +# a per listener setting. +# Default is -1, which means unlimited connections. +# Note that other process limits mean that unlimited connections +# are not really possible. Typically the default maximum number of +# connections possible is around 1024. +#max_connections -1 + +# The listener can be restricted to operating within a topic hierarchy using +# the mount_point option. This is achieved be prefixing the mount_point string +# to all topics for any clients connected to this listener. This prefixing only +# happens internally to the broker; the client will not see the prefix. +#mount_point + +# Choose the protocol to use when listening. +# This can be either mqtt or websockets. +# Certificate based TLS may be used with websockets, except that only the +# cafile, certfile, keyfile, ciphers, and ciphers_tls13 options are supported. +#protocol mqtt + +# Set use_username_as_clientid to true to replace the clientid that a client +# connected with with its username. This allows authentication to be tied to +# the clientid, which means that it is possible to prevent one client +# disconnecting another by using the same clientid. +# If a client connects with no username it will be disconnected as not +# authorised when this option is set to true. +# Do not use in conjunction with clientid_prefixes. +# See also use_identity_as_username. +# This does not apply globally, but on a per-listener basis. +#use_username_as_clientid + +# Change the websockets headers size. This is a global option, it is not +# possible to set per listener. This option sets the size of the buffer used in +# the libwebsockets library when reading HTTP headers. If you are passing large +# header data such as cookies then you may need to increase this value. If left +# unset, or set to 0, then the default of 1024 bytes will be used. +#websockets_headers_size + +# ----------------------------------------------------------------- +# Certificate based SSL/TLS support +# ----------------------------------------------------------------- +# The following options can be used to enable certificate based SSL/TLS support +# for this listener. Note that the recommended port for MQTT over TLS is 8883, +# but this must be set manually. +# +# See also the mosquitto-tls man page and the "Pre-shared-key based SSL/TLS +# support" section. Only one of certificate or PSK encryption support can be +# enabled for any listener. + +# Both of certfile and keyfile must be defined to enable certificate based +# TLS encryption. + +# Path to the PEM encoded server certificate. +#certfile + +# Path to the PEM encoded keyfile. +#keyfile + +# If you wish to control which encryption ciphers are used, use the ciphers +# option. The list of available ciphers can be optained using the "openssl +# ciphers" command and should be provided in the same format as the output of +# that command. This applies to TLS 1.2 and earlier versions only. Use +# ciphers_tls1.3 for TLS v1.3. +#ciphers + +# Choose which TLS v1.3 ciphersuites are used for this listener. +# Defaults to "TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256" +#ciphers_tls1.3 + +# If you have require_certificate set to true, you can create a certificate +# revocation list file to revoke access to particular client certificates. If +# you have done this, use crlfile to point to the PEM encoded revocation file. +#crlfile + +# To allow the use of ephemeral DH key exchange, which provides forward +# security, the listener must load DH parameters. This can be specified with +# the dhparamfile option. The dhparamfile can be generated with the command +# e.g. "openssl dhparam -out dhparam.pem 2048" +#dhparamfile + +# By default an TLS enabled listener will operate in a similar fashion to a +# https enabled web server, in that the server has a certificate signed by a CA +# and the client will verify that it is a trusted certificate. The overall aim +# is encryption of the network traffic. By setting require_certificate to true, +# the client must provide a valid certificate in order for the network +# connection to proceed. This allows access to the broker to be controlled +# outside of the mechanisms provided by MQTT. +#require_certificate false + +# cafile and capath define methods of accessing the PEM encoded +# Certificate Authority certificates that will be considered trusted when +# checking incoming client certificates. +# cafile defines the path to a file containing the CA certificates. +# capath defines a directory that will be searched for files +# containing the CA certificates. For capath to work correctly, the +# certificate files must have ".crt" as the file ending and you must run +# "openssl rehash " each time you add/remove a certificate. +#cafile +#capath + + +# If require_certificate is true, you may set use_identity_as_username to true +# to use the CN value from the client certificate as a username. If this is +# true, the password_file option will not be used for this listener. +#use_identity_as_username false + +# ----------------------------------------------------------------- +# Pre-shared-key based SSL/TLS support +# ----------------------------------------------------------------- +# The following options can be used to enable PSK based SSL/TLS support for +# this listener. Note that the recommended port for MQTT over TLS is 8883, but +# this must be set manually. +# +# See also the mosquitto-tls man page and the "Certificate based SSL/TLS +# support" section. Only one of certificate or PSK encryption support can be +# enabled for any listener. + +# The psk_hint option enables pre-shared-key support for this listener and also +# acts as an identifier for this listener. The hint is sent to clients and may +# be used locally to aid authentication. The hint is a free form string that +# doesn't have much meaning in itself, so feel free to be creative. +# If this option is provided, see psk_file to define the pre-shared keys to be +# used or create a security plugin to handle them. +#psk_hint + +# When using PSK, the encryption ciphers used will be chosen from the list of +# available PSK ciphers. If you want to control which ciphers are available, +# use the "ciphers" option. The list of available ciphers can be optained +# using the "openssl ciphers" command and should be provided in the same format +# as the output of that command. +#ciphers + +# Set use_identity_as_username to have the psk identity sent by the client used +# as its username. Authentication will be carried out using the PSK rather than +# the MQTT username/password and so password_file will not be used for this +# listener. +#use_identity_as_username false + + +# ================================================================= +# Persistence +# ================================================================= + +# If persistence is enabled, save the in-memory database to disk +# every autosave_interval seconds. If set to 0, the persistence +# database will only be written when mosquitto exits. See also +# autosave_on_changes. +# Note that writing of the persistence database can be forced by +# sending mosquitto a SIGUSR1 signal. +#autosave_interval 1800 + +# If true, mosquitto will count the number of subscription changes, retained +# messages received and queued messages and if the total exceeds +# autosave_interval then the in-memory database will be saved to disk. +# If false, mosquitto will save the in-memory database to disk by treating +# autosave_interval as a time in seconds. +#autosave_on_changes false + +# Save persistent message data to disk (true/false). +# This saves information about all messages, including +# subscriptions, currently in-flight messages and retained +# messages. +# retained_persistence is a synonym for this option. +#persistence false + +# The filename to use for the persistent database, not including +# the path. +#persistence_file mosquitto.db + +# Location for persistent database. +# Default is an empty string (current directory). +# Set to e.g. /var/lib/mosquitto if running as a proper service on Linux or +# similar. +#persistence_location + + +# ================================================================= +# Logging +# ================================================================= + +# Places to log to. Use multiple log_dest lines for multiple +# logging destinations. +# Possible destinations are: stdout stderr syslog topic file dlt +# +# stdout and stderr log to the console on the named output. +# +# syslog uses the userspace syslog facility which usually ends up +# in /var/log/messages or similar. +# +# topic logs to the broker topic '$SYS/broker/log/', +# where severity is one of D, E, W, N, I, M which are debug, error, +# warning, notice, information and message. Message type severity is used by +# the subscribe/unsubscribe log_types and publishes log messages to +# $SYS/broker/log/M/susbcribe or $SYS/broker/log/M/unsubscribe. +# +# The file destination requires an additional parameter which is the file to be +# logged to, e.g. "log_dest file /var/log/mosquitto.log". The file will be +# closed and reopened when the broker receives a HUP signal. Only a single file +# destination may be configured. +# +# The dlt destination is for the automotive `Diagnostic Log and Trace` tool. +# This requires that Mosquitto has been compiled with DLT support. +# +# Note that if the broker is running as a Windows service it will default to +# "log_dest none" and neither stdout nor stderr logging is available. +# Use "log_dest none" if you wish to disable logging. +#log_dest stderr + +# Types of messages to log. Use multiple log_type lines for logging +# multiple types of messages. +# Possible types are: debug, error, warning, notice, information, +# none, subscribe, unsubscribe, websockets, all. +# Note that debug type messages are for decoding the incoming/outgoing +# network packets. They are not logged in "topics". +#log_type error +#log_type warning +#log_type notice +#log_type information + + +# If set to true, client connection and disconnection messages will be included +# in the log. +#connection_messages true + +# If using syslog logging (not on Windows), messages will be logged to the +# "daemon" facility by default. Use the log_facility option to choose which of +# local0 to local7 to log to instead. The option value should be an integer +# value, e.g. "log_facility 5" to use local5. +#log_facility + +# If set to true, add a timestamp value to each log message. +#log_timestamp true + +# Set the format of the log timestamp. If left unset, this is the number of +# seconds since the Unix epoch. +# This is a free text string which will be passed to the strftime function. To +# get an ISO 8601 datetime, for example: +# log_timestamp_format %Y-%m-%dT%H:%M:%S +#log_timestamp_format + +# Change the websockets logging level. This is a global option, it is not +# possible to set per listener. This is an integer that is interpreted by +# libwebsockets as a bit mask for its lws_log_levels enum. See the +# libwebsockets documentation for more details. "log_type websockets" must also +# be enabled. +#websockets_log_level 0 + + +# ================================================================= +# Security +# ================================================================= + +# If set, only clients that have a matching prefix on their +# clientid will be allowed to connect to the broker. By default, +# all clients may connect. +# For example, setting "secure-" here would mean a client "secure- +# client" could connect but another with clientid "mqtt" couldn't. +#clientid_prefixes + +# Boolean value that determines whether clients that connect +# without providing a username are allowed to connect. If set to +# false then a password file should be created (see the +# password_file option) to control authenticated client access. +# +# Defaults to false, unless there are no listeners defined in the configuration +# file, in which case it is set to true, but connections are only allowed from +# the local machine. +#allow_anonymous false + +# ----------------------------------------------------------------- +# Default authentication and topic access control +# ----------------------------------------------------------------- + +# Control access to the broker using a password file. This file can be +# generated using the mosquitto_passwd utility. If TLS support is not compiled +# into mosquitto (it is recommended that TLS support should be included) then +# plain text passwords are used, in which case the file should be a text file +# with lines in the format: +# username:password +# The password (and colon) may be omitted if desired, although this +# offers very little in the way of security. +# +# See the TLS client require_certificate and use_identity_as_username options +# for alternative authentication options. If a plugin is used as well as +# password_file, the plugin check will be made first. +#password_file + +# Access may also be controlled using a pre-shared-key file. This requires +# TLS-PSK support and a listener configured to use it. The file should be text +# lines in the format: +# identity:key +# The key should be in hexadecimal format without a leading "0x". +# If an plugin is used as well, the plugin check will be made first. +#psk_file + +# Control access to topics on the broker using an access control list +# file. If this parameter is defined then only the topics listed will +# have access. +# If the first character of a line of the ACL file is a # it is treated as a +# comment. +# Topic access is added with lines of the format: +# +# topic [read|write|readwrite|deny] +# +# The access type is controlled using "read", "write", "readwrite" or "deny". +# This parameter is optional (unless contains a space character) - if +# not given then the access is read/write. can contain the + or # +# wildcards as in subscriptions. +# +# The "deny" option can used to explicity deny access to a topic that would +# otherwise be granted by a broader read/write/readwrite statement. Any "deny" +# topics are handled before topics that grant read/write access. +# +# The first set of topics are applied to anonymous clients, assuming +# allow_anonymous is true. User specific topic ACLs are added after a +# user line as follows: +# +# user +# +# The username referred to here is the same as in password_file. It is +# not the clientid. +# +# +# If is also possible to define ACLs based on pattern substitution within the +# topic. The patterns available for substition are: +# +# %c to match the client id of the client +# %u to match the username of the client +# +# The substitution pattern must be the only text for that level of hierarchy. +# +# The form is the same as for the topic keyword, but using pattern as the +# keyword. +# Pattern ACLs apply to all users even if the "user" keyword has previously +# been given. +# +# If using bridges with usernames and ACLs, connection messages can be allowed +# with the following pattern: +# pattern write $SYS/broker/connection/%c/state +# +# pattern [read|write|readwrite] +# +# Example: +# +# pattern write sensor/%u/data +# +# If an plugin is used as well as acl_file, the plugin check will be +# made first. +#acl_file + +# ----------------------------------------------------------------- +# External authentication and topic access plugin options +# ----------------------------------------------------------------- + +# External authentication and access control can be supported with the +# plugin option. This is a path to a loadable plugin. See also the +# plugin_opt_* options described below. +# +# The plugin option can be specified multiple times to load multiple +# plugins. The plugins will be processed in the order that they are specified +# here. If the plugin option is specified alongside either of +# password_file or acl_file then the plugin checks will be made first. +# +# If the per_listener_settings option is false, the plugin will be apply to all +# listeners. If per_listener_settings is true, then the plugin will apply to +# the current listener being defined only. +# +# This option is also available as `auth_plugin`, but this use is deprecated +# and will be removed in the future. +# +#plugin + +# If the plugin option above is used, define options to pass to the +# plugin here as described by the plugin instructions. All options named +# using the format plugin_opt_* will be passed to the plugin, for example: +# +# This option is also available as `auth_opt_*`, but this use is deprecated +# and will be removed in the future. +# +# plugin_opt_db_host +# plugin_opt_db_port +# plugin_opt_db_username +# plugin_opt_db_password + + +# ================================================================= +# Bridges +# ================================================================= + +# A bridge is a way of connecting multiple MQTT brokers together. +# Create a new bridge using the "connection" option as described below. Set +# options for the bridges using the remaining parameters. You must specify the +# address and at least one topic to subscribe to. +# +# Each connection must have a unique name. +# +# The address line may have multiple host address and ports specified. See +# below in the round_robin description for more details on bridge behaviour if +# multiple addresses are used. Note that if you use an IPv6 address, then you +# are required to specify a port. +# +# The direction that the topic will be shared can be chosen by +# specifying out, in or both, where the default value is out. +# The QoS level of the bridged communication can be specified with the next +# topic option. The default QoS level is 0, to change the QoS the topic +# direction must also be given. +# +# The local and remote prefix options allow a topic to be remapped when it is +# bridged to/from the remote broker. This provides the ability to place a topic +# tree in an appropriate location. +# +# For more details see the mosquitto.conf man page. +# +# Multiple topics can be specified per connection, but be careful +# not to create any loops. +# +# If you are using bridges with cleansession set to false (the default), then +# you may get unexpected behaviour from incoming topics if you change what +# topics you are subscribing to. This is because the remote broker keeps the +# subscription for the old topic. If you have this problem, connect your bridge +# with cleansession set to true, then reconnect with cleansession set to false +# as normal. +#connection +#address [:] [[:]] +#topic [[[out | in | both] qos-level] local-prefix remote-prefix] + +# If you need to have the bridge connect over a particular network interface, +# use bridge_bind_address to tell the bridge which local IP address the socket +# should bind to, e.g. `bridge_bind_address 192.168.1.10` +#bridge_bind_address + +# If a bridge has topics that have "out" direction, the default behaviour is to +# send an unsubscribe request to the remote broker on that topic. This means +# that changing a topic direction from "in" to "out" will not keep receiving +# incoming messages. Sending these unsubscribe requests is not always +# desirable, setting bridge_attempt_unsubscribe to false will disable sending +# the unsubscribe request. +#bridge_attempt_unsubscribe true + +# Set the version of the MQTT protocol to use with for this bridge. Can be one +# of mqttv50, mqttv311 or mqttv31. Defaults to mqttv311. +#bridge_protocol_version mqttv311 + +# Set the clean session variable for this bridge. +# When set to true, when the bridge disconnects for any reason, all +# messages and subscriptions will be cleaned up on the remote +# broker. Note that with cleansession set to true, there may be a +# significant amount of retained messages sent when the bridge +# reconnects after losing its connection. +# When set to false, the subscriptions and messages are kept on the +# remote broker, and delivered when the bridge reconnects. +#cleansession false + +# Set the amount of time a bridge using the lazy start type must be idle before +# it will be stopped. Defaults to 60 seconds. +#idle_timeout 60 + +# Set the keepalive interval for this bridge connection, in +# seconds. +#keepalive_interval 60 + +# Set the clientid to use on the local broker. If not defined, this defaults to +# 'local.'. If you are bridging a broker to itself, it is important +# that local_clientid and clientid do not match. +#local_clientid + +# If set to true, publish notification messages to the local and remote brokers +# giving information about the state of the bridge connection. Retained +# messages are published to the topic $SYS/broker/connection//state +# unless the notification_topic option is used. +# If the message is 1 then the connection is active, or 0 if the connection has +# failed. +# This uses the last will and testament feature. +#notifications true + +# Choose the topic on which notification messages for this bridge are +# published. If not set, messages are published on the topic +# $SYS/broker/connection//state +#notification_topic + +# Set the client id to use on the remote end of this bridge connection. If not +# defined, this defaults to 'name.hostname' where name is the connection name +# and hostname is the hostname of this computer. +# This replaces the old "clientid" option to avoid confusion. "clientid" +# remains valid for the time being. +#remote_clientid + +# Set the password to use when connecting to a broker that requires +# authentication. This option is only used if remote_username is also set. +# This replaces the old "password" option to avoid confusion. "password" +# remains valid for the time being. +#remote_password + +# Set the username to use when connecting to a broker that requires +# authentication. +# This replaces the old "username" option to avoid confusion. "username" +# remains valid for the time being. +#remote_username + +# Set the amount of time a bridge using the automatic start type will wait +# until attempting to reconnect. +# This option can be configured to use a constant delay time in seconds, or to +# use a backoff mechanism based on "Decorrelated Jitter", which adds a degree +# of randomness to when the restart occurs. +# +# Set a constant timeout of 20 seconds: +# restart_timeout 20 +# +# Set backoff with a base (start value) of 10 seconds and a cap (upper limit) of +# 60 seconds: +# restart_timeout 10 30 +# +# Defaults to jitter with a base of 5 and cap of 30 +#restart_timeout 5 30 + +# If the bridge has more than one address given in the address/addresses +# configuration, the round_robin option defines the behaviour of the bridge on +# a failure of the bridge connection. If round_robin is false, the default +# value, then the first address is treated as the main bridge connection. If +# the connection fails, the other secondary addresses will be attempted in +# turn. Whilst connected to a secondary bridge, the bridge will periodically +# attempt to reconnect to the main bridge until successful. +# If round_robin is true, then all addresses are treated as equals. If a +# connection fails, the next address will be tried and if successful will +# remain connected until it fails +#round_robin false + +# Set the start type of the bridge. This controls how the bridge starts and +# can be one of three types: automatic, lazy and once. Note that RSMB provides +# a fourth start type "manual" which isn't currently supported by mosquitto. +# +# "automatic" is the default start type and means that the bridge connection +# will be started automatically when the broker starts and also restarted +# after a short delay (30 seconds) if the connection fails. +# +# Bridges using the "lazy" start type will be started automatically when the +# number of queued messages exceeds the number set with the "threshold" +# parameter. It will be stopped automatically after the time set by the +# "idle_timeout" parameter. Use this start type if you wish the connection to +# only be active when it is needed. +# +# A bridge using the "once" start type will be started automatically when the +# broker starts but will not be restarted if the connection fails. +#start_type automatic + +# Set the number of messages that need to be queued for a bridge with lazy +# start type to be restarted. Defaults to 10 messages. +# Must be less than max_queued_messages. +#threshold 10 + +# If try_private is set to true, the bridge will attempt to indicate to the +# remote broker that it is a bridge not an ordinary client. If successful, this +# means that loop detection will be more effective and that retained messages +# will be propagated correctly. Not all brokers support this feature so it may +# be necessary to set try_private to false if your bridge does not connect +# properly. +#try_private true + +# Some MQTT brokers do not allow retained messages. MQTT v5 gives a mechanism +# for brokers to tell clients that they do not support retained messages, but +# this is not possible for MQTT v3.1.1 or v3.1. If you need to bridge to a +# v3.1.1 or v3.1 broker that does not support retained messages, set the +# bridge_outgoing_retain option to false. This will remove the retain bit on +# all outgoing messages to that bridge, regardless of any other setting. +#bridge_outgoing_retain true + +# If you wish to restrict the size of messages sent to a remote bridge, use the +# bridge_max_packet_size option. This sets the maximum number of bytes for +# the total message, including headers and payload. +# Note that MQTT v5 brokers may provide their own maximum-packet-size property. +# In this case, the smaller of the two limits will be used. +# Set to 0 for "unlimited". +#bridge_max_packet_size 0 + + +# ----------------------------------------------------------------- +# Certificate based SSL/TLS support +# ----------------------------------------------------------------- +# Either bridge_cafile or bridge_capath must be defined to enable TLS support +# for this bridge. +# bridge_cafile defines the path to a file containing the +# Certificate Authority certificates that have signed the remote broker +# certificate. +# bridge_capath defines a directory that will be searched for files containing +# the CA certificates. For bridge_capath to work correctly, the certificate +# files must have ".crt" as the file ending and you must run "openssl rehash +# " each time you add/remove a certificate. +#bridge_cafile +#bridge_capath + + +# If the remote broker has more than one protocol available on its port, e.g. +# MQTT and WebSockets, then use bridge_alpn to configure which protocol is +# requested. Note that WebSockets support for bridges is not yet available. +#bridge_alpn + +# When using certificate based encryption, bridge_insecure disables +# verification of the server hostname in the server certificate. This can be +# useful when testing initial server configurations, but makes it possible for +# a malicious third party to impersonate your server through DNS spoofing, for +# example. Use this option in testing only. If you need to resort to using this +# option in a production environment, your setup is at fault and there is no +# point using encryption. +#bridge_insecure false + +# Path to the PEM encoded client certificate, if required by the remote broker. +#bridge_certfile + +# Path to the PEM encoded client private key, if required by the remote broker. +#bridge_keyfile + +# ----------------------------------------------------------------- +# PSK based SSL/TLS support +# ----------------------------------------------------------------- +# Pre-shared-key encryption provides an alternative to certificate based +# encryption. A bridge can be configured to use PSK with the bridge_identity +# and bridge_psk options. These are the client PSK identity, and pre-shared-key +# in hexadecimal format with no "0x". Only one of certificate and PSK based +# encryption can be used on one +# bridge at once. +#bridge_identity +#bridge_psk + + +# ================================================================= +# External config files +# ================================================================= + +# External configuration files may be included by using the +# include_dir option. This defines a directory that will be searched +# for config files. All files that end in '.conf' will be loaded as +# a configuration file. It is best to have this as the last option +# in the main file. This option will only be processed from the main +# configuration file. The directory specified must not contain the +# main configuration file. +# Files within include_dir will be loaded sorted in case-sensitive +# alphabetical order, with capital letters ordered first. If this option is +# given multiple times, all of the files from the first instance will be +# processed before the next instance. See the man page for examples. +#include_dir \ No newline at end of file