Skip to content

Commit

Permalink
fix: mr comment
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasBousselin committed Jun 21, 2024
1 parent be03c9f commit d909198
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ data class NotImplementedException(override val message: String) : APIException(
data class LdContextNotAvailableException(override val message: String) : APIException(message)
data class NonexistentTenantException(override val message: String) : APIException(message)
data class NotAcceptableException(override val message: String) : APIException(message)
data class BadSchemeException(override val message: String) : APIException(message)

fun Throwable.toAPIException(specificMessage: String? = null): APIException =
when (this) {
Expand All @@ -29,6 +28,5 @@ fun Throwable.toAPIException(specificMessage: String? = null): APIException =
if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED)
LdContextNotAvailableException(specificMessage ?: "Unable to load remote context (cause was: $this)")
else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)")

else -> BadRequestDataException(specificMessage ?: this.localizedMessage)
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ class NotificationService(
subscription.notification.endpoint.receiverInfo?.forEach { endpointInfo ->
headerMap[endpointInfo.key] = endpointInfo.value
}
headerMap[HttpHeaders.CONTENT_TYPE] = mediaType.toString()

val result =
kotlin.runCatching {
if (uri.startsWith(Mqtt.SCHEME.MQTT)) {
headerMap[HttpHeaders.CONTENT_TYPE] = mediaType.toString() // could be common with line 99 ?
Triple(
subscription,
notification,
Expand All @@ -99,7 +99,7 @@ class NotificationService(
)
} else {
val request =
WebClient.create(uri).post().contentType(mediaType).headers { it.setAll(headerMap) }
WebClient.create(uri).post().headers { it.setAll(headerMap) }
request
.bodyValue(serializeObject(notification))
.awaitExchange { response ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ class MqttNotificationService(
internal suspend fun connectMqttv3(data: MqttConnectionData): MqttClient {
val persistence = MemoryPersistence()
val mqttClient = MqttClient(data.brokerUrl, data.clientId, persistence)
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = data.username
connOpts.password = data.password?.toCharArray() ?: "".toCharArray()
val connOpts = MqttConnectOptions().apply {
isCleanSession = true
userName = data.username
password = data.password?.toCharArray() ?: "".toCharArray()
}

mqttClient.connect(connOpts)
return mqttClient
}
Expand Down
16 changes: 9 additions & 7 deletions subscription-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
spring.config.import=classpath:/shared.properties
spring.r2dbc.url=r2dbc:postgresql://localhost/stellio_subscription
spring.r2dbc.username=stellio
spring.r2dbc.password=stellio_password
spring.r2dbc.url = r2dbc:postgresql://localhost/stellio_subscription
spring.r2dbc.username = stellio
spring.r2dbc.password = stellio_password
# Required for Flyway to know where the DB is located
spring.flyway.url=jdbc:postgresql://localhost/stellio_subscription
spring.flyway.user=${spring.r2dbc.username}
spring.flyway.password=${spring.r2dbc.password}
spring.flyway.url = jdbc:postgresql://localhost/stellio_subscription
spring.flyway.user = ${spring.r2dbc.username}
spring.flyway.password = ${spring.r2dbc.password}
# Client registration used to get entities from search-service
spring.security.oauth2.client.registration.keycloak.authorization-grant-type=client_credentials
spring.security.oauth2.client.registration.keycloak.client-id=client-id
spring.security.oauth2.client.registration.keycloak.client-secret=client-secret
spring.security.oauth2.client.provider.keycloak.token-uri=https://my.sso/token

subscription.entity-service-url=http://localhost:8083
# Stellio url used to form the link to get the contexts associated to a notification
subscription.stellio-url=http://localhost:8080
subscription.mqtt.clientId=stellio-mqtt-client
server.port=8084

server.port = 8084

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.junit.jupiter.api.assertThrows
import org.slf4j.LoggerFactory
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.ActiveProfiles
import java.net.URI
Expand All @@ -35,7 +36,7 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
@SpykBean
private lateinit var mqttNotificationService: MqttNotificationService

private val mqttContainerPort = WithMosquittoContainer.getBasicPort()
private val mqttContainerPort = WithMosquittoContainer.getPort()

private val mqttSubscriptionV3 = Subscription(
type = NGSILD_SUBSCRIPTION_TERM,
Expand Down Expand Up @@ -82,11 +83,6 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
message = MqttNotificationData.MqttMessage(getNotificationForSubscription(mqttSubscriptionV3), emptyMap())
)

private val notification = Notification(
subscriptionId = URI("1"),
data = listOf(mapOf("hello" to "world"))
)

private val invalidUriMqttNotificationData = MqttNotificationData(
connection = MqttConnectionData(
brokerUrl = "tcp://badHost:1883",
Expand All @@ -96,7 +92,10 @@ class MqttNotificationServiceTest : WithMosquittoContainer {
topic = "notification",
qos = 0,
message = MqttNotificationData.MqttMessage(
notification,
Notification(
subscriptionId = URI("1"),
data = listOf(mapOf("hello" to "world"))
),
emptyMap(),
)
)
Expand All @@ -108,7 +107,6 @@ class MqttNotificationServiceTest : WithMosquittoContainer {

@Test
fun `notify should process endpoint uri to get connection information`() = runTest {
val subscription = mqttSubscriptionV3
coEvery { mqttNotificationService.callMqttV3(any()) } returns Unit
assertTrue(
mqttNotificationService.notify(
Expand Down Expand Up @@ -221,45 +219,47 @@ class MqttNotificationServiceTest : WithMosquittoContainer {

private class MqttV3MessageReceiver : MqttCallbackV3 {
var lastReceivedMessage: String? = null
private val logger = LoggerFactory.getLogger(javaClass)

override fun messageArrived(topic: String, message: MqttMessageV3) {
lastReceivedMessage = message.payload?.decodeToString()
}

override fun deliveryComplete(p0: IMqttDeliveryToken?) {
println("delivery complete")
logger.info("delivery complete")
}

override fun connectionLost(p0: Throwable?) {
println("connection lost")
logger.info("connection lost")
}
}

private class MqttV5MessageReceiver : MqttCallbackV5 {
var lastReceivedMessage: String? = null
private val logger = LoggerFactory.getLogger(javaClass)

override fun messageArrived(topic: String?, message: MqttMessageV5?) {
lastReceivedMessage = message?.payload?.decodeToString()
}

override fun disconnected(p0: MqttDisconnectResponse?) {
println("connection lost")
logger.info("mqtt connection lost")
}

override fun mqttErrorOccurred(p0: org.eclipse.paho.mqttv5.common.MqttException?) {
println("mqtt error occured")
logger.info("mqtt error occured")
}

override fun deliveryComplete(p0: IMqttToken?) {
println("delivery complete")
logger.info("mqtt delivery complete")
}

override fun connectComplete(p0: Boolean, p1: String?) {
println("connection success")
logger.info("mqtt connection success")
}

override fun authPacketArrived(p0: Int, p1: MqttProperties?) {
println("auth")
logger.info("mqtt auth")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ interface WithMosquittoContainer {
private val mosquittoImage: DockerImageName =
DockerImageName.parse("eclipse-mosquitto:2.0.18")

val mosquittoContainer = GenericContainer<Nothing>(mosquittoImage).apply {
private val mosquittoContainer = GenericContainer<Nothing>(mosquittoImage).apply {
withReuse(true)
withExposedPorts(Mqtt.SCHEME.MQTT_DEFAULT_PORT)
withCopyFileToContainer(
Expand All @@ -21,7 +21,7 @@ interface WithMosquittoContainer {
)
}

fun getBasicPort() = mosquittoContainer.getMappedPort(
fun getPort(): Int = mosquittoContainer.getMappedPort(
Mqtt.SCHEME.MQTT_DEFAULT_PORT
)

Expand Down

0 comments on commit d909198

Please sign in to comment.