Skip to content

Commit

Permalink
fix most comment
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasBousselin committed Jun 3, 2024
1 parent d11fcd3 commit 49e288d
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 1,070 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import com.egm.stellio.subscription.model.Notification
import com.egm.stellio.subscription.model.NotificationParams
import com.egm.stellio.subscription.model.NotificationTrigger
import com.egm.stellio.subscription.model.Subscription
import com.egm.stellio.subscription.service.mqtt.MQTTNotificationService
import com.egm.stellio.subscription.service.mqtt.Mqtt
import com.egm.stellio.subscription.service.mqtt.MqttNotificationService
import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
Expand All @@ -26,7 +26,7 @@ import org.springframework.web.reactive.function.client.awaitExchange
@Service
class NotificationService(
private val subscriptionService: SubscriptionService,
private val mqttNotificationService: MQTTNotificationService
private val mqttNotificationService: MqttNotificationService
) {

private val logger = LoggerFactory.getLogger(javaClass)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ object Mqtt {
object QualityOfService {
const val KEY = "MQTT-QoS"
const val AT_MOST_ONCE = 0
// const val AT_LEAST_ONCE = 1
// const val EXACTLY_ONCE = 2
}

object SCHEME {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.egm.stellio.subscription.service.mqtt

import com.egm.stellio.subscription.model.Notification

data class MQTTNotificationData(
data class MqttNotificationData(
val topic: String,
val mqttMessage: MqttMessage,
val qos: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package com.egm.stellio.subscription.service.mqtt

import com.egm.stellio.shared.model.BadSchemeException
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.egm.stellio.subscription.model.Notification
import com.egm.stellio.subscription.model.Subscription
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttConnectOptions
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.eclipse.paho.mqttv5.client.IMqttToken
import org.eclipse.paho.mqttv5.client.MqttAsyncClient
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service
import org.eclipse.paho.client.mqttv3.MqttException as MqttExceptionV3
import org.eclipse.paho.mqttv5.common.MqttException as MqttExceptionV5

@Service
class MQTTNotificationService(
class MqttNotificationService(
@Value("\${mqtt.clientId}")
private val clientId: String = "stellio-context-brokerUrl",
private val mqttVersionService: MQTTVersionService
) {

private val logger = LoggerFactory.getLogger(javaClass)
Expand All @@ -38,22 +45,22 @@ class MQTTNotificationService(
val qos =
notifierInfo[Mqtt.QualityOfService.KEY]?.let { Integer.parseInt(it) } ?: Mqtt.QualityOfService.AT_MOST_ONCE

val data = MQTTNotificationData(
val data = MqttNotificationData(
topic = uri.path,
brokerUrl = brokerUrl,
clientId = clientId,
qos = qos,
mqttMessage = MQTTNotificationData.MqttMessage(notification, headers),
mqttMessage = MqttNotificationData.MqttMessage(notification, headers),
username = username,
password = password
)

try {
val mqttVersion = notifierInfo[Mqtt.Version.KEY]
when (mqttVersion) {
Mqtt.Version.V3 -> mqttVersionService.callMqttV3(data)
Mqtt.Version.V5 -> mqttVersionService.callMqttV5(data)
else -> mqttVersionService.callMqttV5(data)
Mqtt.Version.V3 -> callMqttV3(data)
Mqtt.Version.V5 -> callMqttV5(data)
else -> callMqttV5(data)
}
logger.info("successfull mqtt notification for uri : ${data.brokerUrl} version: $mqttVersion")
return true
Expand All @@ -65,4 +72,37 @@ class MQTTNotificationService(
return false
}
}

internal suspend fun callMqttV3(data: MqttNotificationData) {
val persistence = MemoryPersistence()
val mqttClient = MqttClient(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = data.username
connOpts.password = data.password?.toCharArray() ?: "".toCharArray()
mqttClient.connect(connOpts)
val message = MqttMessage(
serializeObject(data.mqttMessage).toByteArray()
)
message.qos = data.qos
mqttClient.publish(data.topic, message)
mqttClient.disconnect()
}

internal suspend fun callMqttV5(data: MqttNotificationData) {
val persistence = org.eclipse.paho.mqttv5.client.persist.MemoryPersistence()
val mqttClient = MqttAsyncClient(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectionOptions()
connOpts.isCleanStart = true
connOpts.userName = data.username
connOpts.password = data.password?.toByteArray()
var token: IMqttToken = mqttClient.connect(connOpts)
token.waitForCompletion()
val message = org.eclipse.paho.mqttv5.common.MqttMessage(serializeObject(data.mqttMessage).toByteArray())
message.qos = data.qos
token = mqttClient.publish(data.topic, message)
token.waitForCompletion()
mqttClient.disconnect()
mqttClient.close()
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.egm.stellio.subscription.model.EndpointInfo
import com.egm.stellio.subscription.model.NotificationParams
import com.egm.stellio.subscription.model.NotificationParams.FormatType
import com.egm.stellio.subscription.model.NotificationTrigger.*
import com.egm.stellio.subscription.service.mqtt.MQTTNotificationService
import com.egm.stellio.subscription.service.mqtt.MqttNotificationService
import com.egm.stellio.subscription.support.gimmeRawSubscription
import com.github.tomakehurst.wiremock.client.WireMock.*
import com.github.tomakehurst.wiremock.junit5.WireMockTest
Expand Down Expand Up @@ -45,7 +45,7 @@ class NotificationServiceTests {
private lateinit var subscriptionService: SubscriptionService

@MockkBean
private lateinit var mqttNotificationService: MQTTNotificationService
private lateinit var mqttNotificationService: MqttNotificationService

@Autowired
private lateinit var notificationService: NotificationService
Expand Down Expand Up @@ -565,7 +565,7 @@ class NotificationServiceTests {
}

@Test
fun `callSuscriber should ask mqttNotifier if the brokerUrl startWith mqtt`() = runTest {
fun `callSuscriber should ask mqttNotifier if the endpoint uri startWith mqtt`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
Expand Down
Loading

0 comments on commit 49e288d

Please sign in to comment.