Skip to content

Commit

Permalink
feat(mqtt notification):
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasBousselin committed May 31, 2024
1 parent d193584 commit d790223
Show file tree
Hide file tree
Showing 14 changed files with 1,436 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ data class NotImplementedException(override val message: String) : APIException(
data class LdContextNotAvailableException(override val message: String) : APIException(message)
data class NonexistentTenantException(override val message: String) : APIException(message)
data class NotAcceptableException(override val message: String) : APIException(message)
data class BadSchemeException(override val message: String) : APIException(message)

fun Throwable.toAPIException(specificMessage: String? = null): APIException =
when (this) {
Expand All @@ -28,5 +29,6 @@ fun Throwable.toAPIException(specificMessage: String? = null): APIException =
if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED)
LdContextNotAvailableException(specificMessage ?: "Unable to load remote context (cause was: $this)")
else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)")

else -> BadRequestDataException(specificMessage ?: this.localizedMessage)
}
2 changes: 2 additions & 0 deletions subscription-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation("org.postgresql:r2dbc-postgresql")
implementation("com.jayway.jsonpath:json-path:2.9.0")
implementation(project(":shared"))
implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
implementation("org.eclipse.paho:org.eclipse.paho.mqttv5.client:1.2.5")

detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:1.23.6")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import com.egm.stellio.subscription.model.Notification
import com.egm.stellio.subscription.model.NotificationParams
import com.egm.stellio.subscription.model.NotificationTrigger
import com.egm.stellio.subscription.model.Subscription
import com.egm.stellio.subscription.service.mqtt.MQTTNotificationService
import com.egm.stellio.subscription.service.mqtt.Mqtt
import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
Expand All @@ -23,7 +25,8 @@ import org.springframework.web.reactive.function.client.awaitExchange

@Service
class NotificationService(
private val subscriptionService: SubscriptionService
private val subscriptionService: SubscriptionService,
private val mqttNotificationService: MQTTNotificationService
) {

private val logger = LoggerFactory.getLogger(javaClass)
Expand Down Expand Up @@ -70,32 +73,50 @@ class NotificationService(
)
val uri = subscription.notification.endpoint.uri.toString()
logger.info("Notification is about to be sent to $uri for subscription ${subscription.id}")
val request =
WebClient.create(uri).post().contentType(mediaType).headers {
if (mediaType == MediaType.APPLICATION_JSON) {
it.set(HttpHeaders.LINK, subscriptionService.getContextsLink(subscription))
}
if (tenantName != DEFAULT_TENANT_NAME)
it.set(NGSILD_TENANT_HEADER, tenantName)
subscription.notification.endpoint.receiverInfo?.forEach { endpointInfo ->
it.set(endpointInfo.key, endpointInfo.value)

val headerMap: MutableMap<String, String> = emptyMap<String, String>().toMutableMap()
if (mediaType == MediaType.APPLICATION_JSON) {
headerMap[HttpHeaders.LINK] = subscriptionService.getContextsLink(subscription)
}
if (tenantName != DEFAULT_TENANT_NAME)
headerMap[NGSILD_TENANT_HEADER] = tenantName
subscription.notification.endpoint.receiverInfo?.forEach { endpointInfo ->
headerMap[endpointInfo.key] = endpointInfo.value
}

val result =
kotlin.runCatching {
if (uri.startsWith(Mqtt.SCHEME.MQTT)) {
headerMap["Content-Type"] = mediaType.toString() // could be common with line 99 ?
Triple(
subscription,
notification,
mqttNotificationService.mqttNotifier(
notification = notification,
subscription = subscription,
headers = headerMap
)
)
} else {
val request =
WebClient.create(uri).post().contentType(mediaType).headers { it.setAll(headerMap) }
request
.bodyValue(serializeObject(notification))
.awaitExchange { response ->
val success = response.statusCode() == HttpStatus.OK
logger.info(
"The notification sent has been received with ${if (success) "success" else "failure"}"
)
if (!success) {
logger.error("Failed to send notification to $uri: ${response.statusCode()}")
}
Triple(subscription, notification, success)
}
}
}.getOrElse {
Triple(subscription, notification, false)
}

val result = kotlin.runCatching {
request
.bodyValue(serializeObject(notification))
.awaitExchange { response ->
val success = response.statusCode() == HttpStatus.OK
logger.info("The notification sent has been received with ${if (success) "success" else "failure"}")
if (!success) {
logger.error("Failed to send notification to $uri: ${response.statusCode()}")
}
Triple(subscription, notification, success)
}
}.getOrElse {
Triple(subscription, notification, false)
}
subscriptionService.updateSubscriptionNotification(result.first, result.second, result.third)
return result
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.egm.stellio.subscription.service.mqtt

import com.egm.stellio.subscription.model.Notification

data class MQTTNotificationData(
val topic: String,
val mqttMessage: MqttMessage,
val qos: Int,
val brokerUrl: String,
val clientId: String,
val username: String,
val password: String? = null,
) {

data class MqttMessage(
val body: Notification,
val metadata: Map<String, String> = emptyMap(),
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.egm.stellio.subscription.service.mqtt

import com.egm.stellio.shared.model.BadSchemeException
import com.egm.stellio.subscription.model.Notification
import com.egm.stellio.subscription.model.Subscription
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service
import org.eclipse.paho.client.mqttv3.MqttException as MqttExceptionV3
import org.eclipse.paho.mqttv5.common.MqttException as MqttExceptionV5

@Service
class MQTTNotificationService(
@Value("\${mqtt.clientId}")
private val clientId: String = "stellio-context-brokerUrl",
private val mqttVersionService: MQTTVersionService
) {

private val logger = LoggerFactory.getLogger(javaClass)

suspend fun mqttNotifier(
subscription: Subscription,
notification: Notification,
headers: Map<String, String>
): Boolean {
val endpoint = subscription.notification.endpoint
val uri = endpoint.uri
val userInfo = uri.userInfo.split(':')
val username = userInfo.getOrNull(0) ?: ""
val password = userInfo.getOrNull(1)
val brokerScheme = Mqtt.SCHEME.brokerSchemeMap[uri.scheme]
?: throw BadSchemeException("${uri.scheme} is not a valid mqtt scheme")

val brokerPort = if (uri.port != -1) uri.port else Mqtt.SCHEME.defaultPortMap[uri.scheme]

val brokerUrl = "$brokerScheme://${uri.host}:$brokerPort"
val notifierInfo = endpoint.notifierInfo?.map { it.key to it.value }?.toMap() ?: emptyMap()
val qos =
notifierInfo[Mqtt.QualityOfService.KEY]?.let { Integer.parseInt(it) } ?: Mqtt.QualityOfService.AT_MOST_ONCE

val data = MQTTNotificationData(
topic = uri.path,
brokerUrl = brokerUrl,
clientId = clientId,
qos = qos,
mqttMessage = MQTTNotificationData.MqttMessage(notification, headers),
username = username,
password = password
)

try {
val mqttVersion = notifierInfo[Mqtt.Version.KEY]
when (mqttVersion) {
Mqtt.Version.V3 -> mqttVersionService.callMqttV3(data)
Mqtt.Version.V5 -> mqttVersionService.callMqttV5(data)
else -> mqttVersionService.callMqttV5(data)
}
logger.info("successfull mqtt notification for uri : ${data.brokerUrl} version: $mqttVersion")
return true
} catch (e: MqttExceptionV3) {
logger.error("failed mqttv3 notification for uri : ${data.brokerUrl}", e)
return false
} catch (e: MqttExceptionV5) {
logger.error("failed mqttv5 notification for uri : ${data.brokerUrl}", e)
return false
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.egm.stellio.subscription.service.mqtt

import com.egm.stellio.shared.util.JsonUtils.serializeObject
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions
import org.springframework.stereotype.Service
import org.eclipse.paho.client.mqttv3.MqttClient as MqttClientv3
import org.eclipse.paho.client.mqttv3.MqttConnectOptions as MqttConnectOptionsv3
import org.eclipse.paho.client.mqttv3.MqttMessage as MqttMessagev3
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence as MemoryPersistencev3
import org.eclipse.paho.mqttv5.client.IMqttToken as IMqttTokenv5
import org.eclipse.paho.mqttv5.client.MqttAsyncClient as MqttAsyncClientv5
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence as MemoryPersistencev5
import org.eclipse.paho.mqttv5.common.MqttMessage as MqttMessagev5

@Service
class MQTTVersionService {

internal suspend fun callMqttV3(data: MQTTNotificationData) {
val persistence = MemoryPersistencev3()
val sampleClient = MqttClientv3(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectOptionsv3()
connOpts.isCleanSession = true
connOpts.userName = data.username
connOpts.password = data.password?.toCharArray() ?: "".toCharArray()
sampleClient.connect(connOpts)
val message = MqttMessagev3(
serializeObject(data.mqttMessage).toByteArray()
)
message.qos = data.qos
sampleClient.publish(data.topic, message)
sampleClient.disconnect()
}

internal suspend fun callMqttV5(data: MQTTNotificationData) {
val persistence = MemoryPersistencev5()
val sampleClient = MqttAsyncClientv5(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectionOptions()
connOpts.isCleanStart = true
connOpts.userName = data.username
connOpts.password = data.password?.toByteArray()
var token: IMqttTokenv5 = sampleClient.connect(connOpts)
token.waitForCompletion()
val message = MqttMessagev5(serializeObject(data.mqttMessage).toByteArray())
message.qos = data.qos
token = sampleClient.publish(data.topic, message)
token.waitForCompletion()
sampleClient.disconnect()
sampleClient.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.egm.stellio.subscription.service.mqtt

object Mqtt {

object Version {
const val KEY = "MQTT-Version"
const val V3 = "mqtt3.1.1"
const val V5 = "mqtt5.0"
}

object QualityOfService {
const val KEY = "MQTT-QoS"
const val AT_MOST_ONCE = 0
// const val AT_LEAST_ONCE = 1
// const val EXACTLY_ONCE = 2
}

object SCHEME {
const val MQTT = "mqtt"
const val MQTTS = "mqtts"
const val MQTT_DEFAULT_PORT = 1883
const val MQTTS_DEFAULT_PORT = 8883
val defaultPortMap = mapOf(MQTT to MQTT_DEFAULT_PORT, MQTTS to MQTTS_DEFAULT_PORT)
val brokerSchemeMap = mapOf(MQTT to "tcp", MQTTS to "ssl")
}
}
21 changes: 8 additions & 13 deletions subscription-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
spring.config.import=classpath:/shared.properties

spring.r2dbc.url = r2dbc:postgresql://localhost/stellio_subscription
spring.r2dbc.username = stellio
spring.r2dbc.password = stellio_password

spring.r2dbc.url=r2dbc:postgresql://localhost/stellio_subscription
spring.r2dbc.username=stellio
spring.r2dbc.password=stellio_password
# Required for Flyway to know where the DB is located
spring.flyway.url = jdbc:postgresql://localhost/stellio_subscription
spring.flyway.user = ${spring.r2dbc.username}
spring.flyway.password = ${spring.r2dbc.password}

spring.flyway.url=jdbc:postgresql://localhost/stellio_subscription
spring.flyway.user=${spring.r2dbc.username}
spring.flyway.password=${spring.r2dbc.password}
# Client registration used to get entities from search-service
spring.security.oauth2.client.registration.keycloak.authorization-grant-type=client_credentials
spring.security.oauth2.client.registration.keycloak.client-id=client-id
spring.security.oauth2.client.registration.keycloak.client-secret=client-secret
spring.security.oauth2.client.provider.keycloak.token-uri=https://my.sso/token

subscription.entity-service-url=http://localhost:8083

# Stellio url used to form the link to get the contexts associated to a notification
subscription.stellio-url=http://localhost:8080

server.port = 8084
server.port=8084
mqtt.clientId=stellio-mqtt-client
Loading

0 comments on commit d790223

Please sign in to comment.