Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/distributed create entity #1321

Open
wants to merge 21 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e7f41d3
wip: removeattribute after csr call
thomasBousselin Jan 16, 2025
c695c58
wip: getFilteredAndRemoved on ExpandedEntity
thomasBousselin Jan 16, 2025
73fe8af
feat: functionning call for distributeCreateEntity
thomasBousselin Jan 17, 2025
f1225d5
feat: createEntity with error gestion
thomasBousselin Jan 20, 2025
8f3699d
fix: minor formating for pr
thomasBousselin Jan 20, 2025
c491bc8
fix: build
thomasBousselin Jan 20, 2025
487d0a9
fix: existing tests
thomasBousselin Jan 20, 2025
4ac2e4e
refactor: BatchEntityError.error as ProblemDetail
thomasBousselin Jan 21, 2025
aa19ccf
feat: first easy fixes
thomasBousselin Jan 21, 2025
a7699c4
feat: case when the entity is entirely merged + test for create entit…
thomasBousselin Jan 22, 2025
3d96a98
fix: verify entity before distribution + accept non Problem Detail error
thomasBousselin Jan 22, 2025
3d1a307
fix: case receive 207
thomasBousselin Jan 22, 2025
a1d7d14
refactor: rename InternalCsrFilter to RegistrationInfoFilter
thomasBousselin Jan 23, 2025
55b1ced
refactor: put entity filter from the csr info in CSR class
thomasBousselin Jan 23, 2025
90eabea
fix: test on getAssociatedAttributes
thomasBousselin Jan 23, 2025
65ea75e
feat: test for postDistributedInformation
thomasBousselin Jan 23, 2025
f317520
feat: test for distributeCreateEntityForContextSources
thomasBousselin Jan 23, 2025
8265759
feat: test for distributeCreateEntity
thomasBousselin Jan 24, 2025
9533ddf
fix: only call with content-type json-ld (as @context always is on th…
thomasBousselin Jan 27, 2025
32bf782
wip: add success to multistatus result
thomasBousselin Feb 3, 2025
e4333ac
fix: tests
thomasBousselin Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.egm.stellio.search.csr.model
import com.egm.stellio.shared.model.EntityTypeSelection
import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
open class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val typeSelection: EntityTypeSelection? = null,
val idPattern: String? = null,
Expand All @@ -13,12 +13,24 @@ data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQ
ids: Set<URI> = emptySet(),
typeSelection: EntityTypeSelection? = null,
idPattern: String? = null,
operations: List<Operation>
operations: List<Operation>?
) :
this(
ids = ids,
typeSelection = typeSelection,
idPattern = idPattern,
csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
csf = operations?.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
)

constructor(
ids: Set<URI> = emptySet(),
types: Set<String>,
idPattern: String? = null,
operations: List<Operation>? = null
) : this(
ids = ids,
typeSelection = types.joinToString("|"),
idPattern = idPattern,
operations = operations
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.model.ExpandedTerm
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.DataTypes
import com.egm.stellio.shared.util.JSON_LD_MEDIA_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_CONTEXT
import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_CSR_TERM
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_RELATIONSHIP_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.compactTerm
import com.egm.stellio.shared.util.JsonLdUtils.expandJsonLdTerm
import com.egm.stellio.shared.util.JsonUtils.deserializeAs
Expand All @@ -24,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.module.kotlin.convertValue
import org.springframework.http.MediaType
import org.springframework.messaging.simp.SimpAttributesContextHolder.getAttributes
import java.net.URI
import java.time.ZonedDateTime
import java.util.UUID
Expand Down Expand Up @@ -154,6 +159,46 @@ data class ContextSourceRegistration(
if (!id.isAbsolute)
BadRequestDataException(invalidUriMessage("$id")).left()
else Unit.right()

fun getAssociatedAttributes(
registrationInfoFilter: RegistrationInfoFilter,
entity: ExpandedEntity,
): Set<ExpandedTerm> {
val matchingInformation = getMatchingInformation(registrationInfoFilter)

val properties =
if (matchingInformation.any { it.propertyNames == null }) null
else matchingInformation.flatMap { it.propertyNames!! }.toSet()

val relationships =
if (matchingInformation.any { it.relationshipNames == null }) null
else matchingInformation.flatMap { it.relationshipNames!! }.toSet()

return entity.getAttributes().filter { (term, attribute) ->
val attributeType = attribute.first()[JSONLD_TYPE]?.first()
if (NGSILD_RELATIONSHIP_TYPE.uri == attributeType) {
relationships == null || term in relationships
} else {
properties == null || term in properties
}
}.keys
}

private fun getMatchingInformation(registrationInfoFilter: RegistrationInfoFilter): List<RegistrationInfo> =
information.filter { info ->
info.entities?.any { entityInfo ->
entityInfo.id?.let { registrationInfoFilter.ids.contains(it) } ?: true &&
entityInfo.types.let { types ->
types.any {
registrationInfoFilter.types?.contains(it) ?: true
}
} &&
entityInfo.idPattern?.let { pattern ->
registrationInfoFilter.ids.any { pattern.toRegex().matches(it.toString()) }
} ?: true
} ?: true
}

companion object {

fun deserialize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.egm.stellio.search.csr.model

import java.net.URI

class RegistrationInfoFilter(
ids: Set<URI> = emptySet(),
val types: Set<String>? = null,
idPattern: String? = null,
operations: List<Operation>? = null
) : CSRFilters(
ids = ids,
typeSelection = types?.joinToString("|"),
idPattern = idPattern,
operations = operations
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package com.egm.stellio.search.csr.service

import arrow.core.Either
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.csr.model.CSRFilters
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.Mode
import com.egm.stellio.search.csr.model.Operation
import com.egm.stellio.search.csr.model.RegistrationInfoFilter
import com.egm.stellio.search.entity.web.BatchEntityError
import com.egm.stellio.search.entity.web.BatchEntitySuccess
import com.egm.stellio.search.entity.web.BatchOperationResult
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadGatewayException
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.model.ConflictException
import com.egm.stellio.shared.model.ContextSourceException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.model.ExpandedTerm
import com.egm.stellio.shared.model.GatewayTimeoutException
import com.egm.stellio.shared.util.JSON_LD_CONTENT_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.compactEntity
import com.egm.stellio.shared.util.toUri
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

// ContextSourceRegistration is null in case of local error
typealias DistributionStatus = Either<Pair<APIException, ContextSourceRegistration?>, Unit>
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved

@Service
class DistributedEntityProvisionService(
private val contextSourceRegistrationService: ContextSourceRegistrationService,
) {
val createPath = "/ngsi-ld/v1/entities"

val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun distributeCreateEntity(
entity: ExpandedEntity,
contexts: List<String>,
): Pair<BatchOperationResult, ExpandedEntity?> {
val csrFilters =
CSRFilters(
ids = setOf(entity.id.toUri()),
types = entity.types.toSet()
)
val result = BatchOperationResult()
val registrationInfoFilter =
RegistrationInfoFilter(ids = setOf(entity.id.toUri()), types = entity.types.toSet())

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(
filters = csrFilters,
).groupBy { it.mode }

val entityAfterExclusive = distributeCreateEntityForContextSources(
matchingCSR[Mode.EXCLUSIVE], // could be only one
registrationInfoFilter,
entity,
contexts,
result
)
if (entityAfterExclusive == null) return result to null

val entityAfterRedirect = distributeCreateEntityForContextSources(
matchingCSR[Mode.REDIRECT],
registrationInfoFilter,
entityAfterExclusive,
contexts,
result
)
if (entityAfterRedirect == null) return result to null

distributeCreateEntityForContextSources(
matchingCSR[Mode.INCLUSIVE],
registrationInfoFilter,
entityAfterRedirect,
contexts,
result
)
return result to entityAfterRedirect
}

internal suspend fun distributeCreateEntityForContextSources(
csrs: List<ContextSourceRegistration>?,
registrationInfoFilter: RegistrationInfoFilter,
entity: ExpandedEntity,
contexts: List<String>,
resultToUpdate: BatchOperationResult
): ExpandedEntity? {
val allProcessedAttrs = mutableSetOf<ExpandedTerm>()
csrs?.forEach { csr ->
csr.getAssociatedAttributes(registrationInfoFilter, entity)
.let { attrs ->
allProcessedAttrs.addAll(attrs)
if (attrs.isEmpty()) Unit
else if (csr.operations.any { it == Operation.CREATE_ENTITY || it == Operation.UPDATE_OPS }) {
resultToUpdate.errors.add(
BatchEntityError(
entityId = entity.id.toUri(),
registrationId = csr.id,
error = ConflictException(
"csr: ${csr.id} does not support creation of entities"
).toProblemDetail()
)
)
} else {
postDistributedInformation(
compactEntity(entity.filterAttributes(attrs, emptySet()), contexts),
csr,
createPath
).fold(
{
resultToUpdate.errors.add(
BatchEntityError(
entityId = entity.id.toUri(),
registrationId = csr.id,
error = it.toProblemDetail()
)
)
(it to csr).left()
},
{ resultToUpdate.success.add(BatchEntitySuccess(csr.id)) }
)
}
}
}
return if (allProcessedAttrs.isNotEmpty()) {
val remainingEntity = entity.omitAttributes(allProcessedAttrs)
if (remainingEntity.asNonCoreAttributes()) remainingEntity else null
} else entity
}

internal suspend fun postDistributedInformation(
entity: CompactedEntity,
csr: ContextSourceRegistration,
path: String,
): Either<APIException, Unit> = either {
val uri = URI("${csr.endpoint}$path")

val request = WebClient.create()
.method(HttpMethod.POST)
.uri { uriBuilder ->
uriBuilder.scheme(uri.scheme)
.host(uri.host)
.port(uri.port)
.path(uri.path)
.build()
}.headers { newHeaders ->
newHeaders[HttpHeaders.CONTENT_TYPE] = JSON_LD_CONTENT_TYPE
}.bodyValue(entity)

return runCatching {
val (statusCode, response, _) = request.awaitExchange { response ->
Triple(response.statusCode(), response.awaitBodyOrNull<String>(), response.headers())
}
if (statusCode.value() == HttpStatus.MULTI_STATUS.value()) {
// this part need clarification from the specification
ContextSourceException(
type = URI("https://uri.etsi.org/ngsi-ld/errors/MultiStatus"),
status = HttpStatus.MULTI_STATUS,
title = "Context source returned 207",
detail = response ?: "no message"
).left()
} else if (statusCode.is2xxSuccessful) {
logger.info("Successfully post data to CSR ${csr.id} at $uri")
Unit.right()
} else if (response == null) {
val message = "No error message received from CSR ${csr.id} at $uri"
logger.warn(message)
BadGatewayException(message).left()
} else {
logger.warn("Error creating an entity for CSR at $uri: $response")
ContextSourceException.fromResponse(response).left()
}
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Error contacting CSR at $uri: ${e.message}")
logger.warn(e.stackTraceToString())
GatewayTimeoutException(
"Error connecting to CSR at $uri: \"${e.cause}:${e.message}\""
).left()
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.egm.stellio.search.entity.web.JsonLdNgsiLdEntity
import com.egm.stellio.search.entity.web.entityId
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.Sub
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
Expand Down Expand Up @@ -90,7 +91,7 @@ class EntityOperationService(
entityService.createEntity(jsonLdNgsiLdEntity.second, jsonLdNgsiLdEntity.first, sub).map {
BatchEntitySuccess(jsonLdNgsiLdEntity.entityId())
}.mapLeft { apiException ->
BatchEntityError(jsonLdNgsiLdEntity.entityId(), arrayListOf(apiException.message))
BatchEntityError(jsonLdNgsiLdEntity.entityId(), apiException.toProblemDetail())
}.bind()
}
}.fold(
Expand All @@ -114,7 +115,7 @@ class EntityOperationService(
BatchEntitySuccess(id)
}
.mapLeft { apiException ->
BatchEntityError(id, arrayListOf(apiException.message))
BatchEntityError(id, apiException.toProblemDetail())
}.bind()
}
}.fold(
Expand Down Expand Up @@ -242,10 +243,10 @@ class EntityOperationService(
}.map {
BatchEntitySuccess(entity.entityId(), it)
}.mapLeft {
BatchEntityError(entity.entityId(), arrayListOf(it.message))
BatchEntityError(entity.entityId(), it.toProblemDetail())
}
}.fold(
onFailure = { BatchEntityError(entity.entityId(), arrayListOf(it.message!!)).left() },
onFailure = { BatchEntityError(entity.entityId(), it.toAPIException().toProblemDetail()).left() },
onSuccess = { it }
)

Expand Down
Loading
Loading