From 54678fd16d8f5fbcffc52adf60e26fa2b40528ef Mon Sep 17 00:00:00 2001 From: Thomas Bousselin <61795238+thomasBousselin@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:37:44 +0100 Subject: [PATCH] context source implementation for query entities (#1282) * fix: minor serialization change * feat: untested ContextSource For query entities * feat: count with csr * feat: test for ContextSourceUtils and ContextSourceCaller * feat: test for ContextSourceUtils and ContextSourceCaller * feat: PR comments * fix: calling cSource without link header * fix: type can be a list * fix: id name in minimal csr * Apply suggestions from code review Co-authored-by: Benoit Orihuela Co-authored-by: Ranim Naimi <156652078+ranim-n@users.noreply.github.com> * fix: logging in ContextSourceCaller --------- Co-authored-by: Benoit Orihuela Co-authored-by: Ranim Naimi <156652078+ranim-n@users.noreply.github.com> --- search-service/config/detekt/baseline.xml | 2 + .../stellio/search/csr/model/CSRFilters.kt | 12 ++- .../csr/model/ContextSourceRegistration.kt | 21 ++--- .../search/csr/service/ContextSourceCaller.kt | 88 ++++++++++++++----- .../ContextSourceRegistrationService.kt | 35 ++++++-- .../search/csr/service/ContextSourceUtils.kt | 26 +++++- .../search/entity/web/EntityHandler.kt | 71 ++++++++++++--- .../csr/service/ContextSourceCallerTests.kt | 84 +++++++++++++++--- .../ContextSourceRegistrationServiceTests.kt | 42 ++++++++- .../csr/service/ContextSourceUtilsTests.kt | 70 +++++++++++++++ .../search/entity/web/EntityHandlerTests.kt | 61 ++++++++++--- ...xtSourceRegistration_minimal_entities.json | 4 +- .../com/egm/stellio/shared/util/QueryUtils.kt | 8 +- .../service/SubscriptionService.kt | 2 +- 14 files changed, 444 insertions(+), 82 deletions(-) diff --git a/search-service/config/detekt/baseline.xml b/search-service/config/detekt/baseline.xml index 3314d7610..7f380d5b4 100644 --- a/search-service/config/detekt/baseline.xml +++ b/search-service/config/detekt/baseline.xml @@ -10,6 +10,8 @@ LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either<APIException, Unit> LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`() LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono<String>, @AllowedParameters @RequestParam queryParams: MultiValueMap<String, String> ): ResponseEntity<*> + LongMethod:EntityHandler.kt$EntityHandler$@GetMapping("/{entityId}", produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getByURI( @RequestHeader httpHeaders: HttpHeaders, @PathVariable entityId: URI, @AllowedParameters( implemented = [ QP.OPTIONS, QP.TYPE, QP.ATTRS, QP.GEOMETRY_PROPERTY, QP.LANG, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap<String, String> ): ResponseEntity<*> + LongMethod:EntityHandler.kt$EntityHandler$@GetMapping(produces = [APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE, GEO_JSON_CONTENT_TYPE]) suspend fun getEntities( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ QP.OPTIONS, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap<String, String> ): ResponseEntity<*> LongMethod:LinkedEntityServiceTests.kt$LinkedEntityServiceTests$@Test fun `it should inline entities up to the asked 2nd level`() LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream<Arguments> LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream<Arguments> diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/CSRFilters.kt b/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/CSRFilters.kt index cfa75065c..3b44a57ac 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/CSRFilters.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/CSRFilters.kt @@ -1,14 +1,24 @@ package com.egm.stellio.search.csr.model +import com.egm.stellio.shared.model.EntityTypeSelection import java.net.URI data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations) val ids: Set = emptySet(), + val typeSelection: EntityTypeSelection? = null, + val idPattern: String? = null, val csf: String? = null, ) { - constructor(ids: Set = emptySet(), operations: List) : + constructor( + ids: Set = emptySet(), + typeSelection: EntityTypeSelection? = null, + idPattern: String? = null, + operations: List + ) : this( ids = ids, + typeSelection = typeSelection, + idPattern = idPattern, csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" } ) } diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/ContextSourceRegistration.kt b/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/ContextSourceRegistration.kt index a6e0148b6..47cbe2c9f 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/ContextSourceRegistration.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/csr/model/ContextSourceRegistration.kt @@ -19,6 +19,8 @@ import com.egm.stellio.shared.util.invalidUriMessage import com.egm.stellio.shared.util.ngsiLdDateTime import com.egm.stellio.shared.util.toUri import com.fasterxml.jackson.annotation.JsonFormat +import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.module.kotlin.convertValue import org.springframework.http.MediaType @@ -40,12 +42,14 @@ data class ContextSourceRegistration( val observationInterval: TimeInterval? = null, val managementInterval: TimeInterval? = null, val status: StatusType? = null, + @JsonInclude(value = JsonInclude.Include.NON_DEFAULT) val timesSent: Int = 0, + @JsonInclude(value = JsonInclude.Include.NON_DEFAULT) val timesFailed: Int = 0, val lastFailure: ZonedDateTime? = null, val lastSuccess: ZonedDateTime? = null, ) { - + @JsonIgnore fun isAuxiliary(): Boolean = mode == Mode.AUXILIARY data class TimeInterval( @@ -85,22 +89,19 @@ data class ContextSourceRegistration( data class EntityInfo( val id: URI? = null, val idPattern: String? = null, - @JsonFormat( - with = [ - JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY, - JsonFormat.Feature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED - ] - ) - val type: List + // no WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED because it is used for the database + @JsonFormat(with = [JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY]) + @JsonProperty("type") + val types: List ) { fun expand(contexts: List): EntityInfo = this.copy( - type = type.map { expandJsonLdTerm(it, contexts) }, + types = types.map { expandJsonLdTerm(it, contexts) }, ) fun compact(contexts: List): EntityInfo = this.copy( - type = type.map { compactTerm(it, contexts) }, + types = types.map { compactTerm(it, contexts) }, ) fun validate(): Either { diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceCaller.kt b/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceCaller.kt index c296b972d..9006ebcd9 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceCaller.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceCaller.kt @@ -1,6 +1,7 @@ package com.egm.stellio.search.csr.service import arrow.core.Either +import arrow.core.getOrNone import arrow.core.left import arrow.core.raise.either import arrow.core.right @@ -11,28 +12,81 @@ import com.egm.stellio.search.csr.model.NGSILDWarning import com.egm.stellio.search.csr.model.RevalidationFailedWarning import com.egm.stellio.shared.model.CompactedEntity import com.egm.stellio.shared.queryparameter.QueryParameter +import com.egm.stellio.shared.util.JsonUtils.deserializeAsList +import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap +import com.egm.stellio.shared.util.RESULTS_COUNT_HEADER import org.slf4j.Logger import org.slf4j.LoggerFactory -import org.springframework.core.codec.DecodingException import org.springframework.http.HttpHeaders import org.springframework.http.HttpMethod import org.springframework.http.HttpStatus import org.springframework.util.CollectionUtils import org.springframework.util.MultiValueMap +import org.springframework.web.reactive.function.client.ClientResponse import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.awaitBodyOrNull import org.springframework.web.reactive.function.client.awaitExchange import java.net.URI +typealias QueryEntitiesResponse = Pair, Int?> + object ContextSourceCaller { val logger: Logger = LoggerFactory.getLogger(javaClass) + suspend fun retrieveContextSourceEntity( + httpHeaders: HttpHeaders, + csr: ContextSourceRegistration, + id: URI, + params: MultiValueMap + ): Either = either { + val path = "/ngsi-ld/v1/entities/$id" + + return kotlin.runCatching { + getDistributedInformation(httpHeaders, csr, path, params).bind().first?.deserializeAsMap().right() + }.fold( + onSuccess = { it }, + onFailure = { e -> + logger.warn("Badly formed data received from CSR ${csr.id} at $path: ${e.message}") + RevalidationFailedWarning( + "${csr.id} at $path returned badly formed data message: \"${e.cause}:${e.message}\"", + csr + ).left() + } + ) + } + + suspend fun queryContextSourceEntities( + httpHeaders: HttpHeaders, + csr: ContextSourceRegistration, + params: MultiValueMap + ): Either = either { + val path = "/ngsi-ld/v1/entities" + + return kotlin.runCatching { + getDistributedInformation(httpHeaders, csr, path, params).bind().let { (response, headers) -> + (response?.deserializeAsList() ?: emptyList()) to + // if count was not asked this will be null + headers.header(RESULTS_COUNT_HEADER).firstOrNull()?.toInt() + } + .right() + }.fold( + onSuccess = { it }, + onFailure = { e -> + logger.warn("Badly formed data received from CSR ${csr.id} at $path: ${e.message}") + RevalidationFailedWarning( + "${csr.id} at $path returned badly formed data message: \"${e.cause}:${e.message}\"", + csr + ).left() + } + ) + } + suspend fun getDistributedInformation( httpHeaders: HttpHeaders, csr: ContextSourceRegistration, path: String, params: MultiValueMap - ): Either = either { + ): Either> = either { val uri = URI("${csr.endpoint}$path") val queryParams = CollectionUtils.toMultiValueMap(params.toMutableMap()) @@ -49,22 +103,23 @@ object ContextSourceCaller { .path(uri.path) .queryParams(queryParams) .build() + }.headers { newHeaders -> + httpHeaders.getOrNone(HttpHeaders.LINK).onSome { link -> newHeaders[HttpHeaders.LINK] = link } } - .header(HttpHeaders.LINK, httpHeaders.getFirst(HttpHeaders.LINK)) + return runCatching { - val (statusCode, response) = request - .awaitExchange { response -> - response.statusCode() to response.awaitBodyOrNull() - } + val (statusCode, response, headers) = request.awaitExchange { response -> + Triple(response.statusCode(), response.awaitBodyOrNull(), response.headers()) + } when { statusCode.is2xxSuccessful -> { logger.info("Successfully received data from CSR ${csr.id} at $uri") - response.right() + (response to headers).right() } statusCode.isSameCodeAs(HttpStatus.NOT_FOUND) -> { logger.info("CSR returned 404 at $uri: $response") - null.right() + (null to headers).right() } else -> { @@ -80,17 +135,10 @@ object ContextSourceCaller { onFailure = { e -> logger.warn("Error contacting CSR at $uri: ${e.message}") logger.warn(e.stackTraceToString()) - if (e is DecodingException) { - RevalidationFailedWarning( - "$uri returned badly formed data message: \"${e.cause}:${e.message}\"", - csr - ) - } else { - MiscellaneousWarning( - "Error connecting to $uri message : \"${e.cause}:${e.message}\"", - csr - ) - }.left() + MiscellaneousWarning( + "Error connecting to CSR at $uri: \"${e.cause}:${e.message}\"", + csr + ).left() } ) } diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationService.kt b/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationService.kt index a0973824b..4903e9d6f 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationService.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationService.kt @@ -24,6 +24,7 @@ import com.egm.stellio.shared.model.APIException import com.egm.stellio.shared.model.AlreadyExistsException import com.egm.stellio.shared.model.ResourceNotFoundException import com.egm.stellio.shared.util.Sub +import com.egm.stellio.shared.util.buildTypeQuery import com.egm.stellio.shared.util.mapper import com.egm.stellio.shared.util.ngsiLdDateTime import com.egm.stellio.shared.util.toStringValue @@ -201,7 +202,7 @@ class ContextSourceRegistrationService( LEFT JOIN jsonb_to_recordset(information) as information(entities jsonb, propertyNames text[], relationshipNames text[]) on true LEFT JOIN jsonb_to_recordset(entities) - as entity_info(id text, idPattern text, type text) on true + as entity_info(id text, idPattern text, type text[]) on true WHERE $filterQuery GROUP BY csr.id ORDER BY csr.id @@ -262,18 +263,36 @@ class ContextSourceRegistrationService( ${csrFilters.ids.joinToString(" OR ") { "'$it' ~ entity_info.idPattern" }} ) """.trimIndent() - else "true" + else null + val typeFilter = if (!csrFilters.typeSelection.isNullOrBlank()) { + val typeQuery = buildTypeQuery(csrFilters.typeSelection, columnName = "type") + """ + ( + type is null OR + ( $typeQuery ) + ) + """.trimIndent() + } else null + + // we only filter on id since there is no easy way to know if two idPatterns overlap + // possible resources : https://meta.stackoverflow.com/questions/426313/canonical-for-overlapping-regex-questions + val idPatternFilter = if (!csrFilters.idPattern.isNullOrBlank()) + """ + ( + entity_info.id is null OR + entity_info.id ~ ('${csrFilters.idPattern}') + ) + """.trimIndent() + else null val csfFilter = if (csrFilters.csf != null && validationRegex.matches(csrFilters.csf)) { val operations = operationRegex.toRegex().findAll(csrFilters.csf).map { it.groups[1]?.value } "operations && ARRAY[${operations.joinToString(",") { "'$it'" }}]" - } else "true" + } else null + + val filters = listOfNotNull(idFilter, typeFilter, idPatternFilter, csfFilter) - return """ - $idFilter - AND - $csfFilter - """.trimMargin() + return if (filters.isEmpty()) "true" else filters.joinToString(" AND ") } private val rowToContextSourceRegistration: ((Map) -> ContextSourceRegistration) = { row -> diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtils.kt b/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtils.kt index cf09fe76f..776f3265a 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtils.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtils.kt @@ -26,9 +26,33 @@ import java.time.ZonedDateTime import kotlin.random.Random.Default.nextBoolean typealias CompactedEntityWithCSR = Pair +typealias CompactedEntitiesWithCSR = Pair, ContextSourceRegistration> + typealias AttributeByDatasetId = Map object ContextSourceUtils { + fun mergeEntitiesLists( + localEntities: List, + remoteEntitiesWithCSR: List + ): IorNel> { + val mergedEntityMap = localEntities.map { it.toMutableMap() }.associateBy { it[JSONLD_ID_TERM] }.toMutableMap() + + val warnings = remoteEntitiesWithCSR.sortedBy { (_, csr) -> csr.isAuxiliary() }.mapNotNull { (entities, csr) -> + either { + entities.forEach { entity -> + val id = entity[JSONLD_ID_TERM] + mergedEntityMap[id] + ?.let { it.putAll(getMergeNewValues(it, entity, csr).bind()) } + ?: run { mergedEntityMap[id] = entity.toMutableMap() } + } + null + }.leftOrNull() + }.toNonEmptyListOrNull() + + val entities = mergedEntityMap.values.toList() + return if (warnings == null) Ior.Right(entities) else Ior.Both(warnings, entities) + } + fun mergeEntities( localEntity: CompactedEntity?, remoteEntitiesWithCSR: List @@ -129,7 +153,7 @@ object ContextSourceUtils { attribute.associateBy { it[NGSILD_DATASET_ID_TERM] as? String }.right() } else -> { - RevalidationFailedWarning( + RevalidationFailedWarning( // could be avoided if Json payload is validated beforehand "The received payload is invalid. Attribute is nor List nor a Map : $attribute", csr ).left() diff --git a/search-service/src/main/kotlin/com/egm/stellio/search/entity/web/EntityHandler.kt b/search-service/src/main/kotlin/com/egm/stellio/search/entity/web/EntityHandler.kt index acedd6a9a..5dfb34e01 100644 --- a/search-service/src/main/kotlin/com/egm/stellio/search/entity/web/EntityHandler.kt +++ b/search-service/src/main/kotlin/com/egm/stellio/search/entity/web/EntityHandler.kt @@ -194,11 +194,11 @@ class EntityHandler( @RequestHeader httpHeaders: HttpHeaders, @AllowedParameters( implemented = [ - QP.OPTIONS, QP.FORMAT, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, + QP.OPTIONS, QP.COUNT, QP.OFFSET, QP.LIMIT, QP.ID, QP.TYPE, QP.ID_PATTERN, QP.ATTRS, QP.Q, QP.GEOMETRY, QP.GEOREL, QP.COORDINATES, QP.GEOPROPERTY, QP.GEOMETRY_PROPERTY, QP.LANG, QP.SCOPEQ, QP.CONTAINED_BY, QP.JOIN, QP.JOIN_LEVEL, QP.DATASET_ID, ], - notImplemented = [QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] + notImplemented = [QP.FORMAT, QP.PICK, QP.OMIT, QP.EXPAND_VALUES, QP.CSF, QP.ENTITY_MAP, QP.LOCAL, QP.VIA] ) @RequestParam queryParams: MultiValueMap ): ResponseEntity<*> = either { @@ -213,25 +213,67 @@ class EntityHandler( ).bind() .validateMinimalQueryEntitiesParameters().bind() - val (entities, count) = entityQueryService.queryEntities(entitiesQuery, sub.getOrNull()).bind() + val csrFilters = + CSRFilters( + ids = entitiesQuery.ids, + idPattern = entitiesQuery.idPattern, + typeSelection = entitiesQuery.typeSelection, + operations = listOf( + Operation.QUERY_ENTITY, + Operation.FEDERATION_OPS, + Operation.RETRIEVE_OPS, + Operation.REDIRECTION_OPS + ) + ) + + val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters) + + val (entities, localCount) = entityQueryService.queryEntities(entitiesQuery, sub.getOrNull()).bind() val filteredEntities = entities.filterAttributes(entitiesQuery.attrs, entitiesQuery.datasetId) - val compactedEntities = + val localEntities = compactEntities(filteredEntities, contexts).let { linkedEntityService.processLinkedEntities(it, entitiesQuery, sub.getOrNull()).bind() } + val (warnings, remoteEntitiesWithCSR, remoteCounts) = matchingCSR.parMap { csr -> + val response = ContextSourceCaller.queryContextSourceEntities( + httpHeaders, + csr, + queryParams + ) + contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight()) + response.map { (entities, count) -> Triple(entities, csr, count) } + }.separateEither() + .let { (warnings, response) -> + Triple( + warnings.toMutableList(), + response.map { (entities, csr, _) -> entities to csr }, + response.map { (_, _, counts) -> counts } + ) + } + + val maxCount = (remoteCounts + localCount).maxBy { it ?: 0 } ?: 0 + + val mergedEntities = ContextSourceUtils.mergeEntitiesLists( + localEntities, + remoteEntitiesWithCSR + ).toPair().let { (mergeWarnings, mergedEntities) -> + mergeWarnings?.let { warnings.addAll(it) } + mergedEntities ?: emptyList() + } + val ngsiLdDataRepresentation = parseRepresentations(queryParams, mediaType) buildQueryResponse( - compactedEntities.toFinalRepresentation(ngsiLdDataRepresentation), - count, + mergedEntities.toFinalRepresentation(ngsiLdDataRepresentation), + maxCount, "/ngsi-ld/v1/entities", entitiesQuery.paginationQuery, queryParams, mediaType, contexts - ) + ).addWarnings(warnings) }.fold( { it.toErrorResponse() }, { it } @@ -264,7 +306,15 @@ class EntityHandler( ).bind() val csrFilters = - CSRFilters(ids = setOf(entityId), operations = listOf(Operation.FEDERATION_OPS, Operation.RETRIEVE_ENTITY)) + CSRFilters( + ids = setOf(entityId), + operations = listOf( + Operation.RETRIEVE_ENTITY, + Operation.FEDERATION_OPS, + Operation.RETRIEVE_OPS, + Operation.REDIRECTION_OPS + ) + ) val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(csrFilters) @@ -280,10 +330,10 @@ class EntityHandler( // we can add parMap(concurrency = X) if this trigger too much http connexion at the same time val (warnings, remoteEntitiesWithCSR) = matchingCSR.parMap { csr -> - val response = ContextSourceCaller.getDistributedInformation( + val response = ContextSourceCaller.retrieveContextSourceEntity( httpHeaders, csr, - "/ngsi-ld/v1/entities/$entityId", + entityId, queryParams ) contextSourceRegistrationService.updateContextSourceStatus(csr, response.isRight()) @@ -293,7 +343,6 @@ class EntityHandler( warnings.toMutableList() to maybeResponses.filterNotNull() } - // we could simplify the code if we check the JsonPayload beforehand val (mergeWarnings, mergedEntity) = ContextSourceUtils.mergeEntities( localEntity.getOrNull(), remoteEntitiesWithCSR diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceCallerTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceCallerTests.kt index 31c8a4616..7a4dfd556 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceCallerTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceCallerTests.kt @@ -9,7 +9,9 @@ import com.egm.stellio.shared.util.APIC_COMPOUND_CONTEXT import com.egm.stellio.shared.util.GEO_JSON_CONTENT_TYPE import com.egm.stellio.shared.util.GEO_JSON_MEDIA_TYPE import com.egm.stellio.shared.util.JsonUtils.serializeObject +import com.egm.stellio.shared.util.RESULTS_COUNT_HEADER import com.egm.stellio.shared.util.assertJsonPayloadsAreEqual +import com.egm.stellio.shared.util.toUri import com.github.tomakehurst.wiremock.client.WireMock.get import com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor import com.github.tomakehurst.wiremock.client.WireMock.notContaining @@ -22,7 +24,9 @@ import com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo import com.github.tomakehurst.wiremock.client.WireMock.verify import com.github.tomakehurst.wiremock.junit5.WireMockTest import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertInstanceOf +import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test @@ -71,34 +75,75 @@ class ContextSourceCallerTests { """.trimIndent() @Test - fun `getDistributedInformation should return the entity when the request succeed`() = runTest { + fun `queryContextSourceEntities should return the count and the entities when the request succeed`() = runTest { val csr = gimmeRawCSR() - val path = "/ngsi-ld/v1/entities/$apiaryId" + val path = "/ngsi-ld/v1/entities" + val count = 222 + val payload = "[$entityWithSysAttrs, $entityWithSysAttrs]" stubFor( get(urlMatching(path)) .willReturn( ok() - .withHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE).withBody(entityWithSysAttrs) + .withHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE) + .withHeader(RESULTS_COUNT_HEADER, count.toString()).withBody(payload) ) ) - val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams) - assertJsonPayloadsAreEqual(entityWithSysAttrs, serializeObject(response.getOrNull()!!)) + val response = ContextSourceCaller.queryContextSourceEntities( + HttpHeaders.EMPTY, + csr, + emptyParams + ).getOrNull() + assertNotNull(response) + assertEquals(count, response!!.second) + assertJsonPayloadsAreEqual(entityWithSysAttrs, serializeObject(response.first.first())) } @Test - fun `getDistributedInformation should return a MiscellaneousWarning if it receives no answer`() = runTest { + fun `queryContextSourceEntities should return a RevalidationFailedWarning when receiving bad payload`() = runTest { val csr = gimmeRawCSR() - val path = "/ngsi-ld/v1/entities/$apiaryId" + val path = "/ngsi-ld/v1/entities" + stubFor( + get(urlMatching(path)) + .willReturn( + ok() + .withHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE).withBody(entityWithSysAttrs) + ) + ) - val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams) + val response = ContextSourceCaller.queryContextSourceEntities( + HttpHeaders.EMPTY, + csr, + emptyParams + ) assertTrue(response.isLeft()) - assertInstanceOf(MiscellaneousWarning::class.java, response.leftOrNull()) + assertInstanceOf(RevalidationFailedWarning::class.java, response.leftOrNull()) } @Test - fun `getDistributedInformation should return a RevalidationFailedWarning when receiving a bad payload`() = runTest { + fun `retrieveContextSourceEntity should return the entity when the request succeeds`() = runTest { + val csr = gimmeRawCSR() + val path = "/ngsi-ld/v1/entities/$apiaryId" + stubFor( + get(urlMatching(path)) + .willReturn( + ok() + .withHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE).withBody(entityWithSysAttrs) + ) + ) + + val response = ContextSourceCaller.retrieveContextSourceEntity( + HttpHeaders.EMPTY, + csr, + apiaryId.toUri(), + emptyParams + ) + assertJsonPayloadsAreEqual(entityWithSysAttrs, serializeObject(response.getOrNull()!!)) + } + + @Test + fun `retrieveContextSourceEntity should return a RevalidationFailedWarning when receiving bad payload`() = runTest { val csr = gimmeRawCSR() val path = "/ngsi-ld/v1/entities/$apiaryId" stubFor( @@ -109,12 +154,27 @@ class ContextSourceCallerTests { ) ) - val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams) + val response = ContextSourceCaller.retrieveContextSourceEntity( + HttpHeaders.EMPTY, + csr, + apiaryId.toUri(), + emptyParams + ) assertTrue(response.isLeft()) assertInstanceOf(RevalidationFailedWarning::class.java, response.leftOrNull()) } + @Test + fun `getDistributedInformation should return a MiscellaneousWarning if it receives no answer`() = runTest { + val csr = gimmeRawCSR().copy(endpoint = "http://localhost:invalid".toUri()) + val path = "/ngsi-ld/v1/entities/$apiaryId" + val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams) + + assertTrue(response.isLeft()) + assertInstanceOf(MiscellaneousWarning::class.java, response.leftOrNull()) + } + @Test fun `getDistributedInformation should return MiscellaneousPersistentWarning when receiving error 401`() = runTest { val csr = gimmeRawCSR() @@ -142,7 +202,7 @@ class ContextSourceCallerTests { val response = ContextSourceCaller.getDistributedInformation(HttpHeaders.EMPTY, csr, path, emptyParams) assertTrue(response.isRight()) - assertNull(response.getOrNull()) + assertNull(response.getOrNull()!!.first) } @Test diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationServiceTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationServiceTests.kt index c2df0caff..865e11889 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationServiceTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceRegistrationServiceTests.kt @@ -9,6 +9,8 @@ import com.egm.stellio.search.support.WithTimescaleContainer import com.egm.stellio.shared.model.AlreadyExistsException import com.egm.stellio.shared.model.ResourceNotFoundException import com.egm.stellio.shared.util.APIC_COMPOUND_CONTEXTS +import com.egm.stellio.shared.util.BEEHIVE_TYPE +import com.egm.stellio.shared.util.DEVICE_TYPE import com.egm.stellio.shared.util.JsonUtils.deserializeAsMap import com.egm.stellio.shared.util.loadSampleData import com.egm.stellio.shared.util.shouldFailWith @@ -106,7 +108,7 @@ class ContextSourceRegistrationServiceTests : WithTimescaleContainer { contextSourceRegistrationService.create(contextSourceRegistration, mockUserSub).shouldSucceed() val matchingCsrs = contextSourceRegistrationService.getContextSourceRegistrations( - CSRFilters(ids = setOf("urn:ngsi-ld:Vehicle:A456".toUri())) + CSRFilters(ids = setOf("urn:ngsi-ld:BeeHive:A456".toUri())) ) assertEquals(1, matchingCsrs.size) @@ -283,6 +285,44 @@ class ContextSourceRegistrationServiceTests : WithTimescaleContainer { assertTrue(notMatchingCsr.isEmpty()) } + @Test + fun `query on CSR entity types should filter the result`() = runTest { + val csr = + loadAndDeserializeContextSourceRegistration("csr/contextSourceRegistration_minimal_entities.json") + contextSourceRegistrationService.create(csr, mockUserSub).shouldSucceed() + val oneCsrMatching = contextSourceRegistrationService.getContextSourceRegistrations( + CSRFilters(typeSelection = BEEHIVE_TYPE) + ) + assertEquals(listOf(csr), oneCsrMatching) + + val multipleTypesOneCsrMatching = contextSourceRegistrationService.getContextSourceRegistrations( + CSRFilters(typeSelection = "$BEEHIVE_TYPE|$DEVICE_TYPE") + ) + assertEquals(listOf(csr), multipleTypesOneCsrMatching) + + val notMatchingCsr = contextSourceRegistrationService.getContextSourceRegistrations( + CSRFilters(typeSelection = "INVALID") + ) + assertTrue(notMatchingCsr.isEmpty()) + } + + @Test + fun `query on CSR entity idPattern should filter the result`() = runTest { + val contextSourceRegistration = + loadAndDeserializeContextSourceRegistration("csr/contextSourceRegistration_minimal_entities.json") + contextSourceRegistrationService.create(contextSourceRegistration, mockUserSub).shouldSucceed() + + val oneCsrMatching = contextSourceRegistrationService.getContextSourceRegistrations( + CSRFilters(idPattern = ".*") + ) + assertEquals(listOf(contextSourceRegistration), oneCsrMatching) + + val notMatchingCsr = contextSourceRegistrationService.getContextSourceRegistrations( + CSRFilters(idPattern = "INVALID") + ) + assertTrue(notMatchingCsr.isEmpty()) + } + @Test fun `delete an existing CSR should succeed`() = runTest { val contextSourceRegistration = diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtilsTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtilsTests.kt index f2ff972b1..c78ba6771 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtilsTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/csr/service/ContextSourceUtilsTests.kt @@ -7,6 +7,7 @@ import com.egm.stellio.search.csr.model.MiscellaneousWarning import com.egm.stellio.search.csr.model.Mode import com.egm.stellio.shared.model.CompactedAttributeInstance import com.egm.stellio.shared.model.CompactedEntity +import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_ID_TERM import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE_TERM import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_VALUE_TERM import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CREATED_AT_TERM @@ -230,4 +231,73 @@ class ContextSourceUtilsTests { assertEquals(entityWithNameAndSurname, entity) } } + + @Test + fun `merge entitiesList should merge entities with the same id`() = runTest { + val mergedEntity = ContextSourceUtils.mergeEntitiesLists( + listOf(entityWithName), + listOf(listOf(entityWithLastName) to auxiliaryCSR, listOf(entityWithSurName) to inclusiveCSR) + ).getOrNull() + assertEquals(1, mergedEntity!!.size) + assertEquals(entityWithName + entityWithLastName + entityWithSurName, mergedEntity.first()) + } + + @Test + fun `merge entitiesList should not keep conflicting data from an auxiliary csr`() = runTest { + val mergedEntity = ContextSourceUtils.mergeEntitiesLists( + listOf(moreRecentEntity), + listOf(listOf(evenMoreRecentEntity) to auxiliaryCSR, listOf(entityWithName) to inclusiveCSR) + ).getOrNull() + assertEquals(1, mergedEntity!!.size) + assertEquals(moreRecentEntity, mergedEntity.first()) + } + + @Test + fun `merge entitiesList should add entities with the different ids`() = runTest { + val entityWithDifferentId = minimalEntity.toMutableMap() + (JSONLD_ID_TERM to "differentId") + val entityWithAnotherDifferentId = minimalEntity.toMutableMap() + (JSONLD_ID_TERM to "anotherDifferentId") + val mergedEntity = ContextSourceUtils.mergeEntitiesLists( + listOf(entityWithName), + listOf(listOf(entityWithDifferentId) to inclusiveCSR, listOf(entityWithAnotherDifferentId) to inclusiveCSR) + ).getOrNull() + assertThat(mergedEntity) + .hasSize(3) + .contains(entityWithName, entityWithDifferentId, entityWithAnotherDifferentId) + } + + @Test + fun `merge entitiesList should merge using getMergeNewValues and return the received warnings`() = runTest { + val warning1 = MiscellaneousWarning("1", inclusiveCSR) + val warning2 = MiscellaneousWarning("2", inclusiveCSR) + mockkObject(ContextSourceUtils) { + every { ContextSourceUtils.getMergeNewValues(any(), any(), any()) } returns + warning1.left() andThen warning2.left() + + val (warnings, entity) = ContextSourceUtils.mergeEntitiesLists( + listOf(entityWithName), + listOf(listOf(entityWithName) to inclusiveCSR, listOf(entityWithName) to inclusiveCSR) + ).toPair() + verify(exactly = 2) { ContextSourceUtils.getMergeNewValues(any(), any(), any()) } + assertThat(warnings).hasSize(2).contains(warning1, warning2) + assertEquals(listOf(entityWithName), entity) + } + } + + @Test + fun `merge entitiesList should not merge List in error`() = runTest { + mockkObject(ContextSourceUtils) { + val (warnings, entity) = ContextSourceUtils.mergeEntitiesLists( + listOf(entityWithName), + listOf( + listOf(invalidEntityWithLastName) to inclusiveCSR, + listOf(entityWithSurName) to inclusiveCSR + ) + ).toPair() + + verify(exactly = 2) { ContextSourceUtils.getMergeNewValues(any(), any(), any()) } + assertThat(warnings).hasSize(1) + val entityWithNameAndSurname = entityWithName.plus("surName" to nameAttribute) + assertEquals(listOf(entityWithNameAndSurname), entity) + } + } } diff --git a/search-service/src/test/kotlin/com/egm/stellio/search/entity/web/EntityHandlerTests.kt b/search-service/src/test/kotlin/com/egm/stellio/search/entity/web/EntityHandlerTests.kt index 90180f484..aefc5d192 100644 --- a/search-service/src/test/kotlin/com/egm/stellio/search/entity/web/EntityHandlerTests.kt +++ b/search-service/src/test/kotlin/com/egm/stellio/search/entity/web/EntityHandlerTests.kt @@ -930,16 +930,6 @@ class EntityHandlerTests { ) } - fun initializeQueryEntitiesMocks() { - val compactedEntities = slot>() - - coEvery { - linkedEntityService.processLinkedEntities(capture(compactedEntities), any(), any()) - } answers { - compactedEntities.captured.right() - } - } - @Test fun `get entity by id should return the warnings sent by the CSRs and update the CSRs statuses`() { val csr = gimmeRawCSR() @@ -954,7 +944,7 @@ class EntityHandlerTests { mockkObject(ContextSourceCaller) { coEvery { - ContextSourceCaller.getDistributedInformation(any(), any(), any(), any()) + ContextSourceCaller.retrieveContextSourceEntity(any(), any(), any(), any()) } returns MiscellaneousWarning( "message with\nline\nbreaks", csr @@ -977,6 +967,16 @@ class EntityHandlerTests { } } + fun initializeQueryEntitiesMocks() { + val compactedEntities = slot>() + + coEvery { + linkedEntityService.processLinkedEntities(capture(compactedEntities), any(), any()) + } answers { + compactedEntities.captured.right() + } + } + @Test fun `get entities by type should not include temporal properties if query param sysAttrs is not present`() { initializeQueryEntitiesMocks() @@ -1257,6 +1257,45 @@ class EntityHandlerTests { ) } + @Test + fun `get entities should return the warnings sent by the CSRs and update the CSRs statuses`() { + val csr = gimmeRawCSR() + initializeQueryEntitiesMocks() + + coEvery { + entityQueryService.queryEntities(any(), sub.getOrNull()) + } returns (emptyList() to 0).right() + + coEvery { + contextSourceRegistrationService + .getContextSourceRegistrations(any(), any(), any()) + } returns listOf(csr, csr) + + mockkObject(ContextSourceCaller) { + coEvery { + ContextSourceCaller.queryContextSourceEntities(any(), any(), any()) + } returns MiscellaneousWarning( + "message with\nline\nbreaks", + csr + ).left() andThen + MiscellaneousWarning("message", csr).left() + + coEvery { contextSourceRegistrationService.updateContextSourceStatus(any(), any()) } returns Unit + webClient.get() + .uri("/ngsi-ld/v1/entities?type=$BEEHIVE_COMPACT_TYPE&count=true") + .header(HttpHeaders.LINK, AQUAC_HEADER_LINK) + .exchange() + .expectStatus().isOk + .expectHeader().valueEquals( + NGSILDWarning.HEADER_NAME, + "199 urn:ngsi-ld:ContextSourceRegistration:test \"message with line breaks\"", + "199 urn:ngsi-ld:ContextSourceRegistration:test \"message\"" + ).expectHeader().valueEquals(RESULTS_COUNT_HEADER, "0",) + + coVerify(exactly = 2) { contextSourceRegistrationService.updateContextSourceStatus(any(), false) } + } + } + @Test fun `replace entity should return a 201 if JSON-LD payload is correct`() { val jsonLdFile = ClassPathResource("/ngsild/aquac/breedingService.jsonld") diff --git a/search-service/src/test/resources/ngsild/csr/contextSourceRegistration_minimal_entities.json b/search-service/src/test/resources/ngsild/csr/contextSourceRegistration_minimal_entities.json index 5fda3064b..fcbd01aec 100644 --- a/search-service/src/test/resources/ngsild/csr/contextSourceRegistration_minimal_entities.json +++ b/search-service/src/test/resources/ngsild/csr/contextSourceRegistration_minimal_entities.json @@ -5,8 +5,8 @@ { "entities": [ { - "id": "urn:ngsi-ld:Vehicle:A456", - "type": "Vehicle" + "id": "urn:ngsi-ld:BeeHive:A456", + "type": "BeeHive" } ] } diff --git a/shared/src/main/kotlin/com/egm/stellio/shared/util/QueryUtils.kt b/shared/src/main/kotlin/com/egm/stellio/shared/util/QueryUtils.kt index e615948dc..bedf3539d 100644 --- a/shared/src/main/kotlin/com/egm/stellio/shared/util/QueryUtils.kt +++ b/shared/src/main/kotlin/com/egm/stellio/shared/util/QueryUtils.kt @@ -93,7 +93,7 @@ fun String.unescapeRegexPattern(): String = this.replace("##", "(") .replace("ยงยง", ")") -fun buildTypeQuery(rawQuery: String, target: List? = null): String = +fun buildTypeQuery(rawQuery: String, columnName: String = "types", target: List? = null): String = rawQuery.replace(typeSelectionRegex) { matchResult -> """ #{TARGET}# && ARRAY['${matchResult.value}'] @@ -103,10 +103,10 @@ fun buildTypeQuery(rawQuery: String, target: List? = null): String .replace("|", " OR ") .replace(",", " OR ") .let { - if (target == null) - it.replace("#{TARGET}#", "types") - else + if (target != null) it.replace("#{TARGET}#", "ARRAY[${target.joinToString(",") { "'$it'"} }]") + else + it.replace("#{TARGET}#", columnName) } // Transforms an NGSI-LD Query Language parameter as per clause 4.9 to a query supported by JsonPath. diff --git a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt index bb27259b4..0a5dfbc41 100644 --- a/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt +++ b/subscription-service/src/main/kotlin/com/egm/stellio/subscription/service/SubscriptionService.kt @@ -646,7 +646,7 @@ class SubscriptionService( else { val entityTypes = expandedEntity.types entities.joinToString(" OR ") { - val typeSelectionQuery = buildTypeQuery(it.typeSelection, entityTypes) + val typeSelectionQuery = buildTypeQuery(it.typeSelection, target = entityTypes) val idQuery = it.id?.let { " '${expandedEntity.id}' = '$it' " } val idPatternQuery = it.idPattern?.let { " '${expandedEntity.id}' ~ '$it' " } listOfNotNull(typeSelectionQuery, idQuery, idPatternQuery)