Skip to content

Commit

Permalink
context source implementation for query entities (#1282)
Browse files Browse the repository at this point in the history
* fix: minor serialization change

* feat: untested ContextSource For query entities

* feat: count with csr

* feat: test for ContextSourceUtils and ContextSourceCaller

* feat: test for ContextSourceUtils and ContextSourceCaller

* feat: PR comments

* fix: calling cSource without link header

* fix: type can be a list

* fix: id name in minimal csr

* Apply suggestions from code review

Co-authored-by: Benoit Orihuela <[email protected]>
Co-authored-by: Ranim Naimi <[email protected]>

* fix: logging in ContextSourceCaller

---------

Co-authored-by: Benoit Orihuela <[email protected]>
Co-authored-by: Ranim Naimi <[email protected]>
  • Loading branch information
3 people authored Dec 18, 2024
1 parent 22a8d72 commit 54678fd
Show file tree
Hide file tree
Showing 14 changed files with 444 additions and 82 deletions.
2 changes: 2 additions & 0 deletions search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either&lt;APIException, Unit&gt;</ID>
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt;, @AllowedParameters @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping("/{entityId}", produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getByURI( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @AllowedParameters( implemented = [ QP.OPTIONS, QP.TYPE, QP.ATTRS, QP.GEOMETRY_PROPERTY, QP.LANG, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityHandler.kt$EntityHandler$@GetMapping(produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getEntities( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ QP.OPTIONS, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap&lt;String, String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:LinkedEntityServiceTests.kt$LinkedEntityServiceTests$@Test fun `it should inline entities up to the asked 2nd level`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package com.egm.stellio.search.csr.model

import com.egm.stellio.shared.model.EntityTypeSelection
import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val typeSelection: EntityTypeSelection? = null,
val idPattern: String? = null,
val csf: String? = null,
) {
constructor(ids: Set<URI> = emptySet(), operations: List<Operation>) :
constructor(
ids: Set<URI> = emptySet(),
typeSelection: EntityTypeSelection? = null,
idPattern: String? = null,
operations: List<Operation>
) :
this(
ids = ids,
typeSelection = typeSelection,
idPattern = idPattern,
csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import com.egm.stellio.shared.util.invalidUriMessage
import com.egm.stellio.shared.util.ngsiLdDateTime
import com.egm.stellio.shared.util.toUri
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.module.kotlin.convertValue
import org.springframework.http.MediaType
Expand All @@ -40,12 +42,14 @@ data class ContextSourceRegistration(
val observationInterval: TimeInterval? = null,
val managementInterval: TimeInterval? = null,
val status: StatusType? = null,
@JsonInclude(value = JsonInclude.Include.NON_DEFAULT)
val timesSent: Int = 0,
@JsonInclude(value = JsonInclude.Include.NON_DEFAULT)
val timesFailed: Int = 0,
val lastFailure: ZonedDateTime? = null,
val lastSuccess: ZonedDateTime? = null,
) {

@JsonIgnore
fun isAuxiliary(): Boolean = mode == Mode.AUXILIARY

data class TimeInterval(
Expand Down Expand Up @@ -85,22 +89,19 @@ data class ContextSourceRegistration(
data class EntityInfo(
val id: URI? = null,
val idPattern: String? = null,
@JsonFormat(
with = [
JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
JsonFormat.Feature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED
]
)
val type: List<String>
// no WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED because it is used for the database
@JsonFormat(with = [JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY])
@JsonProperty("type")
val types: List<String>
) {
fun expand(contexts: List<String>): EntityInfo =
this.copy(
type = type.map { expandJsonLdTerm(it, contexts) },
types = types.map { expandJsonLdTerm(it, contexts) },
)

fun compact(contexts: List<String>): EntityInfo =
this.copy(
type = type.map { compactTerm(it, contexts) },
types = types.map { compactTerm(it, contexts) },
)

fun validate(): Either<BadRequestDataException, Unit> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.egm.stellio.search.csr.service

import arrow.core.Either
import arrow.core.getOrNone
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
Expand All @@ -11,28 +12,81 @@ import com.egm.stellio.search.csr.model.NGSILDWarning
import com.egm.stellio.search.csr.model.RevalidationFailedWarning
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.queryparameter.QueryParameter
import com.egm.stellio.shared.util.JsonUtils.deserializeAsList
import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap
import com.egm.stellio.shared.util.RESULTS_COUNT_HEADER
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.core.codec.DecodingException
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.util.CollectionUtils
import org.springframework.util.MultiValueMap
import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

typealias QueryEntitiesResponse = Pair<List<CompactedEntity>, Int?>

object ContextSourceCaller {
val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun retrieveContextSourceEntity(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
id: URI,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
val path = "/ngsi-ld/v1/entities/$id"

return kotlin.runCatching {
getDistributedInformation(httpHeaders, csr, path, params).bind().first?.deserializeAsMap().right()
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Badly formed data received from CSR ${csr.id} at $path: ${e.message}")
RevalidationFailedWarning(
"${csr.id} at $path returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
).left()
}
)
}

suspend fun queryContextSourceEntities(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, QueryEntitiesResponse> = either {
val path = "/ngsi-ld/v1/entities"

return kotlin.runCatching {
getDistributedInformation(httpHeaders, csr, path, params).bind().let { (response, headers) ->
(response?.deserializeAsList() ?: emptyList()) to
// if count was not asked this will be null
headers.header(RESULTS_COUNT_HEADER).firstOrNull()?.toInt()
}
.right()
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Badly formed data received from CSR ${csr.id} at $path: ${e.message}")
RevalidationFailedWarning(
"${csr.id} at $path returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
).left()
}
)
}

suspend fun getDistributedInformation(
httpHeaders: HttpHeaders,
csr: ContextSourceRegistration,
path: String,
params: MultiValueMap<String, String>
): Either<NGSILDWarning, CompactedEntity?> = either {
): Either<NGSILDWarning, Pair<String?, ClientResponse.Headers>> = either {
val uri = URI("${csr.endpoint}$path")

val queryParams = CollectionUtils.toMultiValueMap(params.toMutableMap())
Expand All @@ -49,22 +103,23 @@ object ContextSourceCaller {
.path(uri.path)
.queryParams(queryParams)
.build()
}.headers { newHeaders ->
httpHeaders.getOrNone(HttpHeaders.LINK).onSome { link -> newHeaders[HttpHeaders.LINK] = link }
}
.header(HttpHeaders.LINK, httpHeaders.getFirst(HttpHeaders.LINK))

return runCatching {
val (statusCode, response) = request
.awaitExchange { response ->
response.statusCode() to response.awaitBodyOrNull<CompactedEntity>()
}
val (statusCode, response, headers) = request.awaitExchange { response ->
Triple(response.statusCode(), response.awaitBodyOrNull<String>(), response.headers())
}
when {
statusCode.is2xxSuccessful -> {
logger.info("Successfully received data from CSR ${csr.id} at $uri")
response.right()
(response to headers).right()
}

statusCode.isSameCodeAs(HttpStatus.NOT_FOUND) -> {
logger.info("CSR returned 404 at $uri: $response")
null.right()
(null to headers).right()
}

else -> {
Expand All @@ -80,17 +135,10 @@ object ContextSourceCaller {
onFailure = { e ->
logger.warn("Error contacting CSR at $uri: ${e.message}")
logger.warn(e.stackTraceToString())
if (e is DecodingException) {
RevalidationFailedWarning(
"$uri returned badly formed data message: \"${e.cause}:${e.message}\"",
csr
)
} else {
MiscellaneousWarning(
"Error connecting to $uri message : \"${e.cause}:${e.message}\"",
csr
)
}.left()
MiscellaneousWarning(
"Error connecting to CSR at $uri: \"${e.cause}:${e.message}\"",
csr
).left()
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.AlreadyExistsException
import com.egm.stellio.shared.model.ResourceNotFoundException
import com.egm.stellio.shared.util.Sub
import com.egm.stellio.shared.util.buildTypeQuery
import com.egm.stellio.shared.util.mapper
import com.egm.stellio.shared.util.ngsiLdDateTime
import com.egm.stellio.shared.util.toStringValue
Expand Down Expand Up @@ -201,7 +202,7 @@ class ContextSourceRegistrationService(
LEFT JOIN jsonb_to_recordset(information)
as information(entities jsonb, propertyNames text[], relationshipNames text[]) on true
LEFT JOIN jsonb_to_recordset(entities)
as entity_info(id text, idPattern text, type text) on true
as entity_info(id text, idPattern text, type text[]) on true
WHERE $filterQuery
GROUP BY csr.id
ORDER BY csr.id
Expand Down Expand Up @@ -262,18 +263,36 @@ class ContextSourceRegistrationService(
${csrFilters.ids.joinToString(" OR ") { "'$it' ~ entity_info.idPattern" }}
)
""".trimIndent()
else "true"
else null
val typeFilter = if (!csrFilters.typeSelection.isNullOrBlank()) {
val typeQuery = buildTypeQuery(csrFilters.typeSelection, columnName = "type")
"""
(
type is null OR
( $typeQuery )
)
""".trimIndent()
} else null

// we only filter on id since there is no easy way to know if two idPatterns overlap
// possible resources : https://meta.stackoverflow.com/questions/426313/canonical-for-overlapping-regex-questions
val idPatternFilter = if (!csrFilters.idPattern.isNullOrBlank())
"""
(
entity_info.id is null OR
entity_info.id ~ ('${csrFilters.idPattern}')
)
""".trimIndent()
else null

val csfFilter = if (csrFilters.csf != null && validationRegex.matches(csrFilters.csf)) {
val operations = operationRegex.toRegex().findAll(csrFilters.csf).map { it.groups[1]?.value }
"operations && ARRAY[${operations.joinToString(",") { "'$it'" }}]"
} else "true"
} else null

val filters = listOfNotNull(idFilter, typeFilter, idPatternFilter, csfFilter)

return """
$idFilter
AND
$csfFilter
""".trimMargin()
return if (filters.isEmpty()) "true" else filters.joinToString(" AND ")
}

private val rowToContextSourceRegistration: ((Map<String, Any>) -> ContextSourceRegistration) = { row ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,33 @@ import java.time.ZonedDateTime
import kotlin.random.Random.Default.nextBoolean

typealias CompactedEntityWithCSR = Pair<CompactedEntity, ContextSourceRegistration>
typealias CompactedEntitiesWithCSR = Pair<List<CompactedEntity>, ContextSourceRegistration>

typealias AttributeByDatasetId = Map<String?, CompactedAttributeInstance>
object ContextSourceUtils {

fun mergeEntitiesLists(
localEntities: List<CompactedEntity>,
remoteEntitiesWithCSR: List<CompactedEntitiesWithCSR>
): IorNel<NGSILDWarning, List<CompactedEntity>> {
val mergedEntityMap = localEntities.map { it.toMutableMap() }.associateBy { it[JSONLD_ID_TERM] }.toMutableMap()

val warnings = remoteEntitiesWithCSR.sortedBy { (_, csr) -> csr.isAuxiliary() }.mapNotNull { (entities, csr) ->
either {
entities.forEach { entity ->
val id = entity[JSONLD_ID_TERM]
mergedEntityMap[id]
?.let { it.putAll(getMergeNewValues(it, entity, csr).bind()) }
?: run { mergedEntityMap[id] = entity.toMutableMap() }
}
null
}.leftOrNull()
}.toNonEmptyListOrNull()

val entities = mergedEntityMap.values.toList()
return if (warnings == null) Ior.Right(entities) else Ior.Both(warnings, entities)
}

fun mergeEntities(
localEntity: CompactedEntity?,
remoteEntitiesWithCSR: List<CompactedEntityWithCSR>
Expand Down Expand Up @@ -129,7 +153,7 @@ object ContextSourceUtils {
attribute.associateBy { it[NGSILD_DATASET_ID_TERM] as? String }.right()
}
else -> {
RevalidationFailedWarning(
RevalidationFailedWarning( // could be avoided if Json payload is validated beforehand
"The received payload is invalid. Attribute is nor List nor a Map : $attribute",
csr
).left()
Expand Down
Loading

0 comments on commit 54678fd

Please sign in to comment.