Skip to content

Commit

Permalink
feat: add support for jsonldContext member in subscriptions (#1230)
Browse files Browse the repository at this point in the history
* feat: add support for jsonldContext member in subscriptions

* added jsonldContext initialization and validation

* re-implementing jsonldContext without initialization

* exceptions handling + some fixes

* minor fixes

* adapting link header when sending notification to jsonldContext

* minor fix
  • Loading branch information
ranim-n authored Sep 9, 2024
1 parent 2d2e0ff commit cf5b4f7
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ fun Throwable.toAPIException(specificMessage: String? = null): APIException =
when (this) {
is APIException -> this
is JsonLdError ->
if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED)
if (this.code == JsonLdErrorCode.LOADING_REMOTE_CONTEXT_FAILED ||
this.code == JsonLdErrorCode.LOADING_DOCUMENT_FAILED
)
LdContextNotAvailableException(specificMessage ?: "Unable to load remote context (cause was: $this)")
else BadRequestDataException("Unexpected error while parsing payload (cause was: $this)")
else -> BadRequestDataException(specificMessage ?: this.localizedMessage)
Expand Down
16 changes: 16 additions & 0 deletions shared/src/main/kotlin/com/egm/stellio/shared/util/JsonLdUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package com.egm.stellio.shared.util

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.apicatalog.jsonld.JsonLd
import com.apicatalog.jsonld.JsonLdError
import com.apicatalog.jsonld.JsonLdOptions
import com.apicatalog.jsonld.context.cache.LruCache
import com.apicatalog.jsonld.document.JsonDocument
import com.apicatalog.jsonld.http.DefaultHttpClient
import com.apicatalog.jsonld.loader.DocumentLoaderOptions
import com.apicatalog.jsonld.loader.HttpLoader
import com.egm.stellio.shared.model.*
import com.egm.stellio.shared.util.JsonUtils.deserializeAs
import com.egm.stellio.shared.util.JsonUtils.deserializeAsList
Expand Down Expand Up @@ -121,6 +125,7 @@ object JsonLdUtils {
contextCache = LruCache(CONTEXT_CACHE_CAPACITY)
documentCache = LruCache(DOCUMENT_CACHE_CAPACITY)
}
private val loader = HttpLoader(DefaultHttpClient.defaultInstance())

private fun buildContextDocument(contexts: List<String>): JsonStructure {
val contextsArray = Json.createArrayBuilder()
Expand Down Expand Up @@ -244,6 +249,17 @@ object JsonLdUtils {
}
}

fun checkJsonldContext(context: URI): Either<APIException, Unit> = either {
return try {
loader.loadDocument(context, DocumentLoaderOptions())
Unit.right()
} catch (e: JsonLdError) {
e.toAPIException(e.cause?.cause?.message).left()
} catch (e: IllegalArgumentException) {
BadRequestDataException(e.cause?.message ?: "Provided context is invalid: $context").left()
}
}

private fun transformGeoPropertyToWKT(): (Map.Entry<String, Any>) -> Any = {
if (NGSILD_GEO_PROPERTIES_TERMS.contains(it.key)) {
when (it.value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ data class Subscription(
val throttling: Int? = null,
val lang: String? = null,
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
val datasetId: List<String>? = null
val datasetId: List<String>? = null,
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
val jsonldContext: URI? = null
) {

@Transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ class NotificationService(
AttributeRepresentation.SIMPLIFIED
else AttributeRepresentation.NORMALIZED

val contexts = it.jsonldContext?.let { listOf(it.toString()) } ?: it.contexts

val compactedEntity = compactEntity(
ExpandedEntity(filteredEntity),
it.contexts
contexts
).toFinalRepresentation(
NgsiLdDataRepresentation(
entityRepresentation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.egm.stellio.shared.util.*
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_LOCATION_PROPERTY
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SUBSCRIPTION_TERM
import com.egm.stellio.shared.util.JsonLdUtils.checkJsonldContext
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.subscription.config.SubscriptionProperties
import com.egm.stellio.subscription.model.*
Expand Down Expand Up @@ -56,6 +57,7 @@ class SubscriptionService(
checkExpiresAtInTheFuture(subscription).bind()
checkIdPatternIsValid(subscription).bind()
checkNotificationTriggersAreValid(subscription).bind()
checkJsonLdContextIsValid(subscription).bind()
}

private fun checkTypeIsSubscription(subscription: Subscription): Either<APIException, Unit> =
Expand Down Expand Up @@ -134,6 +136,14 @@ class SubscriptionService(
else BadRequestDataException("Unknown notification trigger in ${subscription.notificationTrigger}").left()
}

suspend fun checkJsonLdContextIsValid(subscription: Subscription): Either<APIException, Unit> = either {
val jsonldContext = subscription.jsonldContext

if (jsonldContext != null) {
checkJsonldContext(jsonldContext).bind()
}
}

@Transactional
suspend fun create(subscription: Subscription, sub: Option<Sub>): Either<APIException, Unit> = either {
validateNewSubscription(subscription).bind()
Expand All @@ -149,11 +159,11 @@ class SubscriptionService(
INSERT INTO subscription(id, type, subscription_name, created_at, description, watched_attributes,
notification_trigger, time_interval, q, scope_q, notif_attributes, notif_format, endpoint_uri,
endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, times_sent, is_active,
expires_at, sub, contexts, throttling, sys_attrs, lang, datasetId)
expires_at, sub, contexts, throttling, sys_attrs, lang, datasetId, jsonld_context)
VALUES(:id, :type, :subscription_name, :created_at, :description, :watched_attributes,
:notification_trigger, :time_interval, :q, :scope_q, :notif_attributes, :notif_format, :endpoint_uri,
:endpoint_accept, :endpoint_receiver_info, :endpoint_notifier_info, :times_sent, :is_active,
:expires_at, :sub, :contexts, :throttling, :sys_attrs, :lang, :datasetId)
:expires_at, :sub, :contexts, :throttling, :sys_attrs, :lang, :datasetId, :jsonld_context)
""".trimIndent()

databaseClient.sql(insertStatement)
Expand Down Expand Up @@ -182,6 +192,7 @@ class SubscriptionService(
.bind("sys_attrs", subscription.notification.sysAttrs)
.bind("lang", subscription.lang)
.bind("datasetId", subscription.datasetId?.toTypedArray())
.bind("jsonld_context", subscription.jsonldContext)
.execute().bind()

geoQuery?.let {
Expand Down Expand Up @@ -252,7 +263,8 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang, datasetId
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = :id
LEFT JOIN geometry_query ON geometry_query.subscription_id = :id
Expand All @@ -269,25 +281,30 @@ class SubscriptionService(
suspend fun getContextsForSubscription(id: URI): Either<APIException, List<String>> {
val selectStatement =
"""
SELECT contexts
SELECT contexts, jsonld_context
FROM subscription
WHERE id = :id
""".trimIndent()

return databaseClient.sql(selectStatement)
.bind("id", id)
.oneToResult {
toList(it["contexts"]!!)
it["jsonld_context"]?.let { listOf(it as String) } ?: toList(it["contexts"]!!)
}
}

fun getContextsLink(subscription: Subscription): String =
if (subscription.contexts.size > 1) {
val linkToRetrieveContexts = subscriptionProperties.stellioUrl +
"/ngsi-ld/v1/subscriptions/${subscription.id}/context"
buildContextLinkHeader(linkToRetrieveContexts)
} else
buildContextLinkHeader(subscription.contexts[0])
fun getContextsLink(subscription: Subscription): String {
val contextLink = when {
subscription.jsonldContext != null -> subscription.jsonldContext.toString()
subscription.contexts.size > 1 -> {
val linkToRetrieveContexts = subscriptionProperties.stellioUrl +
"/ngsi-ld/v1/subscriptions/${subscription.id}/context"
linkToRetrieveContexts
}
else -> subscription.contexts[0]
}
return buildContextLinkHeader(contextLink)
}

suspend fun isCreatorOf(subscriptionId: URI, sub: Option<Sub>): Either<APIException, Boolean> {
val selectStatement =
Expand Down Expand Up @@ -359,7 +376,8 @@ class SubscriptionService(
"modifiedAt",
"throttling",
"lang",
"datasetId"
"datasetId",
"jsonldContext"
).contains(it.key) -> {
val columnName = it.key.toSqlColumnName()
val value = it.value.toSqlValue(it.key)
Expand Down Expand Up @@ -502,7 +520,8 @@ class SubscriptionService(
notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info, endpoint_notifier_info, status,
times_sent, is_active, last_notification, last_failure, last_success, entity_selector.id as entity_id,
id_pattern, entity_selector.type_selection as type_selection, georel, geometry, coordinates,
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang, datasetId
pgis_geometry, geoproperty, scope_q, expires_at, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down Expand Up @@ -544,7 +563,8 @@ class SubscriptionService(
entity_selector.id as entity_id, entity_selector.id_pattern as id_pattern,
entity_selector.type_selection as type_selection, georel, geometry, coordinates, pgis_geometry,
geoproperty, scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, times_sent,
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling, sys_attrs, lang, datasetId
endpoint_receiver_info, endpoint_notifier_info, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector on subscription.id = entity_selector.subscription_id
LEFT JOIN geometry_query on subscription.id = geometry_query.subscription_id
Expand Down Expand Up @@ -699,7 +719,8 @@ class SubscriptionService(
contexts = toList(row["contexts"]!!),
throttling = toNullableInt(row["throttling"]),
lang = row["lang"] as? String,
datasetId = toNullableList(row["datasetId"])
datasetId = toNullableList(row["datasetId"]),
jsonldContext = toNullableUri(row["jsonld_context"])
)
}

Expand Down Expand Up @@ -732,7 +753,8 @@ class SubscriptionService(
contexts = toList(row["contexts"]!!),
throttling = toNullableInt(row["throttling"]),
lang = row["lang"] as? String,
datasetId = toNullableList(row["datasetId"])
datasetId = toNullableList(row["datasetId"]),
jsonldContext = toNullableUri(row["jsonld_context"])
)
}

Expand Down Expand Up @@ -768,7 +790,8 @@ class SubscriptionService(
scope_q, notif_attributes, notif_format, endpoint_uri, endpoint_accept, endpoint_receiver_info,
endpoint_notifier_info, status, times_sent, last_notification, last_failure, last_success, is_active,
entity_selector.id as entity_id, id_pattern, entity_selector.type_selection as type_selection, georel,
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling, sys_attrs, lang, datasetId
geometry, coordinates, pgis_geometry, geoproperty, contexts, throttling, sys_attrs, lang,
datasetId, jsonld_context
FROM subscription
LEFT JOIN entity_selector ON entity_selector.subscription_id = subscription.id
LEFT JOIN geometry_query ON geometry_query.subscription_id = subscription.id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE subscription
ADD jsonld_context text;
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class NotificationServiceTests {
}

@Test
fun `it should notify the subscriber and use the contexts of the subscription to compact`() = runTest {
fun `it should notify the subscriber and use subscription contexts to compact`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
Expand Down Expand Up @@ -215,6 +215,43 @@ class NotificationServiceTests {
}
}

@Test
fun `it should notify the subscriber and use jsonldContext to compact when it is provided`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSONLD
)
),
contexts = listOf(NGSILD_TEST_CORE_CONTEXT),
jsonldContext = APIC_COMPOUND_CONTEXT.toUri()
)
val expandedEntity = expandJsonLdEntity(rawEntity)

coEvery {
subscriptionService.getMatchingSubscriptions(any(), any(), any())
} returns listOf(subscription).right()
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.notifyMatchingSubscribers(
expandedEntity,
setOf(NGSILD_NAME_TERM),
ATTRIBUTE_UPDATED
).shouldSucceedWith { notificationResults ->
val notificationResult = notificationResults[0]
assertTrue(notificationResult.second.data[0].containsKey(NGSILD_NAME_TERM))
assertTrue(notificationResult.second.data[0].containsKey(MANAGED_BY_COMPACT_RELATIONSHIP))
assertEquals(APIC_COMPOUND_CONTEXT, notificationResult.second.data[0][JsonLdUtils.JSONLD_CONTEXT])
}
}

@Test
fun `it should send a simplified payload when format is keyValues and include only the specified attributes`() =
runTest {
Expand Down Expand Up @@ -372,6 +409,37 @@ class NotificationServiceTests {
)
}

@Test
fun `it should add a Link header containing the jsonldContext of the subscription when provided`() = runTest {
val subscription = gimmeRawSubscription().copy(
notification = NotificationParams(
attributes = emptyList(),
endpoint = Endpoint(
uri = "http://localhost:8089/notification".toUri(),
accept = Endpoint.AcceptType.JSON
)
),
jsonldContext = APIC_COMPOUND_CONTEXT.toUri()
)

coEvery { subscriptionService.getContextsLink(any()) } returns buildContextLinkHeader(APIC_COMPOUND_CONTEXT)
coEvery { subscriptionService.updateSubscriptionNotification(any(), any(), any()) } returns 1

stubFor(
post(urlMatching("/notification"))
.willReturn(ok())
)

notificationService.callSubscriber(subscription, rawEntity.deserializeAsMap())

val link = buildContextLinkHeader(subscription.jsonldContext.toString())
verify(
1,
postRequestedFor(urlPathEqualTo("/notification"))
.withHeader(HttpHeaders.LINK, equalTo(link))
)
}

@Test
fun `it should add an NGSILD-Tenant header if the subscription is not from the default context`() = runTest {
val subscription = gimmeRawSubscription().copy(
Expand Down Expand Up @@ -517,7 +585,8 @@ class NotificationServiceTests {
)
),
lang = "fr",
contexts = APIC_COMPOUND_CONTEXTS
contexts = APIC_COMPOUND_CONTEXTS,
jsonldContext = APIC_COMPOUND_CONTEXT.toUri()
)

val expandedEntity = expandJsonLdEntity(
Expand Down
Loading

0 comments on commit cf5b4f7

Please sign in to comment.