Skip to content

Commit

Permalink
feat: support for delete and temporal delete
Browse files Browse the repository at this point in the history
  • Loading branch information
bobeal committed Dec 8, 2024
1 parent d88aaed commit 383cca7
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class AttributeService(
JOIN temporal_entity_attribute
ON entity_payload.entity_id = temporal_entity_attribute.entity_id
AND temporal_entity_attribute.deleted_at IS NULL
WHERE entity_payload.deleted_at IS NULL
ORDER BY attribute_name
""".trimIndent()
).allToMappedList { rowToAttributeDetails(it) }.flatten().groupBy({ it.second }, { it.first }).toList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class EntityTypeService(
JOIN temporal_entity_attribute
ON entity_payload.entity_id = temporal_entity_attribute.entity_id
AND temporal_entity_attribute.deleted_at IS NULL
WHERE temporal_entity_attribute.deleted_at IS NULL
WHERE entity_payload.deleted_at IS NULL
ORDER BY type
""".trimIndent()
).allToMappedList { rowToEntityType(it) }.groupBy({ it.first }, { it.second }).toList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class EntityAttributeService(
DO UPDATE SET deleted_at = null,
attribute_type = :attribute_type,
attribute_value_type = :attribute_value_type,
created_at = :created_at,
modified_at = :created_at,
payload = :payload
""".trimIndent()
)
Expand All @@ -122,31 +122,6 @@ class EntityAttributeService(
.bind("payload", attribute.payload)
.execute()

@Transactional
suspend fun updateOnReplace(
attributeUUID: UUID,
attributeMetadata: AttributeMetadata,
modifiedAt: ZonedDateTime,
payload: String
): Either<APIException, Unit> =
databaseClient.sql(
"""
UPDATE temporal_entity_attribute
SET
attribute_type = :attribute_type,
attribute_value_type = :attribute_value_type,
modified_at = :modified_at,
payload = :payload
WHERE id = :id
""".trimIndent()
)
.bind("id", attributeUUID)
.bind("attribute_type", attributeMetadata.type.toString())
.bind("attribute_value_type", attributeMetadata.valueType.toString())
.bind("modified_at", modifiedAt)
.bind("payload", Json.of(payload))
.execute()

@Transactional
suspend fun updateOnUpdate(
attributeUUID: UUID,
Expand Down Expand Up @@ -279,32 +254,15 @@ class EntityAttributeService(
attributeMetadata.datasetId,
attribute.entityId
)
updateOnReplace(
attribute.id,
deleteAttribute(attribute.entityId, attribute.attributeName, attribute.datasetId, false, createdAt).bind()
addAttribute(
attribute.entityId,
attribute.attributeName,
attributeMetadata,
createdAt,
serializeObject(attributePayload)
attributePayload,
sub
).bind()

val attributeInstance = AttributeInstance(
attributeUuid = attribute.id,
timeProperty = AttributeInstance.TemporalProperty.MODIFIED_AT,
time = createdAt,
attributeMetadata = attributeMetadata,
payload = attributePayload,
sub = sub
)
attributeInstanceService.create(attributeInstance).bind()

if (attributeMetadata.observedAt != null) {
val attributeObservedAtInstance = AttributeInstance(
attributeUuid = attribute.id,
time = attributeMetadata.observedAt,
attributeMetadata = attributeMetadata,
payload = attributePayload
)
attributeInstanceService.create(attributeObservedAtInstance).bind()
}
}

@Transactional
Expand Down Expand Up @@ -437,6 +395,25 @@ class EntityAttributeService(
attributeInstanceService.deleteInstancesOfAttribute(entityId, attributeName, datasetId).bind()
}

@Transactional
suspend fun permanentlyDeleteAttributes(
entityId: URI,
): Either<APIException, Unit> = either {
logger.debug("Permanently deleting all attributes from entity {}", entityId)

val deletedTeas = databaseClient.sql(
"""
DELETE FROM temporal_entity_attribute
WHERE entity_id = :entity_id
RETURNING id
""".trimIndent()
)
.bind("entity_id", entityId)
.allToMappedList { toUuid(it["id"]) }

attributeInstanceService.deleteInstancesOfEntity(deletedTeas).bind()
}

suspend fun getForEntities(
entitiesIds: List<URI>,
entitiesQuery: EntitiesQuery
Expand Down Expand Up @@ -708,12 +685,10 @@ class EntityAttributeService(
ngsiLdAttributeInstance.datasetId
)!!
if (currentAttribute != null) {
replaceAttribute(
currentAttribute,
ngsiLdAttribute,
attributeMetadata,
partialUpdateAttribute(
entityUri,
Pair(ngsiLdAttribute.name, listOf(attributePayload)),
createdAt,
attributePayload,
sub
).map {
UpdateAttributeResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.egm.stellio.search.authorization.service.AuthorizationService
import com.egm.stellio.search.common.util.allToMappedList
import com.egm.stellio.search.common.util.deserializeAsMap
import com.egm.stellio.search.common.util.oneToResult
import com.egm.stellio.search.common.util.toOptionalZonedDateTime
import com.egm.stellio.search.common.util.toUri
import com.egm.stellio.search.common.util.wrapToAndClause
import com.egm.stellio.search.entity.model.EntitiesQuery
Expand All @@ -18,18 +19,17 @@ import com.egm.stellio.search.entity.model.EntitiesQueryFromPost
import com.egm.stellio.search.entity.model.Entity
import com.egm.stellio.search.entity.util.rowToEntity
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.AlreadyExistsException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.model.ResourceNotFoundException
import com.egm.stellio.shared.util.Sub
import com.egm.stellio.shared.util.buildQQuery
import com.egm.stellio.shared.util.buildScopeQQuery
import com.egm.stellio.shared.util.buildTypeQuery
import com.egm.stellio.shared.util.entityAlreadyExistsMessage
import com.egm.stellio.shared.util.entityNotFoundMessage
import org.springframework.r2dbc.core.DatabaseClient
import org.springframework.stereotype.Service
import java.net.URI
import java.time.ZonedDateTime

@Service
class EntityQueryService(
Expand Down Expand Up @@ -83,6 +83,7 @@ class EntityQueryService(
LEFT JOIN temporal_entity_attribute tea
ON tea.entity_id = entity_payload.entity_id AND tea.deleted_at is null
WHERE $filterQuery
AND entity_payload.deleted_at is null
ORDER BY entity_id
LIMIT :limit
OFFSET :offset
Expand All @@ -108,6 +109,7 @@ class EntityQueryService(
LEFT JOIN temporal_entity_attribute tea
ON tea.entity_id = entity_payload.entity_id AND tea.deleted_at is null
WHERE $filterQuery
AND entity_payload.deleted_at is null
""".trimIndent()

return databaseClient
Expand Down Expand Up @@ -232,17 +234,15 @@ class EntityQueryService(
.bind("entities_ids", entitiesIds)
.allToMappedList { it.rowToEntity() }

suspend fun checkEntityExistence(
entityId: URI,
inverse: Boolean = false
): Either<APIException, Unit> {
suspend fun checkEntityExistence(entityId: URI): Either<APIException, Unit> {
val selectQuery =
"""
select
exists(
select 1
from entity_payload
where entity_id = :entity_id
and deleted_at is null
) as entityExists;
""".trimIndent()

Expand All @@ -251,15 +251,34 @@ class EntityQueryService(
.bind("entity_id", entityId)
.oneToResult { it["entityExists"] as Boolean }
.flatMap {
if (it && !inverse || !it && inverse)
if (it)
Unit.right()
else if (it)
AlreadyExistsException(entityAlreadyExistsMessage(entityId.toString())).left()
else
ResourceNotFoundException(entityNotFoundMessage(entityId.toString())).left()
}
}

/**
* Used for checks before creating a (temporal) entity. Allows to know if the entity does not exist,
* or, if it exists, whether it is currently deleted (in which case, it may be possible to create it again
* if authorized)
*/
suspend fun getEntityState(
entityId: URI
): Either<APIException, Pair<URI, ZonedDateTime?>> {
val selectQuery =
"""
select entity_id, deleted_at
from entity_payload
where entity_id = :entity_id
""".trimIndent()

return databaseClient
.sql(selectQuery)
.bind("entity_id", entityId)
.oneToResult { Pair(toUri(it["entity_id"]), toOptionalZonedDateTime(it["deleted_at"])) }
}

suspend fun filterExistingEntitiesAsIds(entitiesIds: List<URI>): List<URI> {
if (entitiesIds.isEmpty()) {
return emptyList()
Expand All @@ -270,6 +289,7 @@ class EntityQueryService(
select entity_id
from entity_payload
where entity_id in (:entities_ids)
and deleted_at is null
""".trimIndent()

return databaseClient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.egm.stellio.search.entity.service

import arrow.core.Either
import arrow.core.Either.Left
import arrow.core.Either.Right
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import arrow.core.toOption
Expand All @@ -24,7 +27,9 @@ import com.egm.stellio.search.entity.model.updateResultFromDetailedResult
import com.egm.stellio.search.entity.util.prepareAttributes
import com.egm.stellio.search.entity.util.rowToEntity
import com.egm.stellio.search.scope.ScopeService
import com.egm.stellio.search.temporal.model.AttributeInstance.TemporalProperty
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.AlreadyExistsException
import com.egm.stellio.shared.model.ExpandedAttribute
import com.egm.stellio.shared.model.ExpandedAttributeInstances
import com.egm.stellio.shared.model.ExpandedAttributes
Expand All @@ -39,6 +44,7 @@ import com.egm.stellio.shared.util.JsonLdUtils.JSONLD_TYPE
import com.egm.stellio.shared.util.JsonLdUtils.NGSILD_SCOPE_PROPERTY
import com.egm.stellio.shared.util.JsonUtils.serializeObject
import com.egm.stellio.shared.util.Sub
import com.egm.stellio.shared.util.entityAlreadyExistsMessage
import com.egm.stellio.shared.util.getSpecificAccessPolicy
import com.egm.stellio.shared.util.ngsiLdDateTime
import io.r2dbc.postgresql.codec.Json
Expand Down Expand Up @@ -68,8 +74,16 @@ class EntityService(
expandedEntity: ExpandedEntity,
sub: Sub? = null
): Either<APIException, Unit> = either {
authorizationService.userCanCreateEntities(sub.toOption()).bind()
entityQueryService.checkEntityExistence(ngsiLdEntity.id, true).bind()
entityQueryService.getEntityState(ngsiLdEntity.id).let {
when (it) {
is Left -> authorizationService.userCanCreateEntities(sub.toOption()).bind()
is Right ->
if (it.value.second == null)
AlreadyExistsException(entityAlreadyExistsMessage(ngsiLdEntity.id.toString())).left().bind()
else
authorizationService.userCanAdminEntity(ngsiLdEntity.id, sub.toOption()).bind()
}
}

val createdAt = ngsiLdDateTime()
val attributesMetadata = ngsiLdEntity.prepareAttributes().bind()
Expand Down Expand Up @@ -104,6 +118,13 @@ class EntityService(
"""
INSERT INTO entity_payload (entity_id, types, scopes, created_at, payload, specific_access_policy)
VALUES (:entity_id, :types, :scopes, :created_at, :payload, :specific_access_policy)
ON CONFLICT (entity_id)
DO UPDATE SET types = :types,
scopes = :scopes,
modified_at = :created_at,
deleted_at = null,
payload = :payload,
specific_access_policy = :specific_access_policy
""".trimIndent()
)
.bind("entity_id", ngsiLdEntity.id)
Expand Down Expand Up @@ -545,35 +566,63 @@ class EntityService(
}

@Transactional
suspend fun upsertEntityPayload(entityId: URI, payload: String): Either<APIException, Unit> =
databaseClient.sql(
suspend fun deleteEntity(entityId: URI, sub: Sub? = null): Either<APIException, Unit> = either {
entityQueryService.checkEntityExistence(entityId).bind()
authorizationService.userCanAdminEntity(entityId, sub.toOption()).bind()

val deletedAt = ngsiLdDateTime()
val entity = deleteEntityPayload(entityId, deletedAt).bind()
entityAttributeService.deleteAttributes(entityId, deletedAt).bind()
scopeService.addHistoryEntry(entityId, emptyList(), TemporalProperty.DELETED_AT, deletedAt, sub).bind()

entityEventService.publishEntityDeleteEvent(sub, entity)
}

@Transactional
suspend fun deleteEntityPayload(entityId: URI, deletedAt: ZonedDateTime): Either<APIException, Entity> = either {
val entity = databaseClient.sql(
"""
INSERT INTO entity_payload (entity_id, payload)
VALUES (:entity_id, :payload)
ON CONFLICT (entity_id)
DO UPDATE SET payload = :payload
WITH entity_before_delete AS (
SELECT *
FROM entity_payload
WHERE entity_id = :entity_id
),
update_entity AS (
UPDATE entity_payload
SET deleted_at = :deleted_at,
payload = null,
scopes = null,
specific_access_policy = null,
types = '{}'
WHERE entity_id = :entity_id
)
SELECT * FROM entity_before_delete
""".trimIndent()
)
.bind("payload", Json.of(payload))
.bind("entity_id", entityId)
.execute()
.bind("deleted_at", deletedAt)
.oneToResult {
it.rowToEntity()
}
.bind()
entity
}

@Transactional
suspend fun deleteEntity(entityId: URI, sub: Sub? = null): Either<APIException, Unit> = either {
suspend fun permanentlyDeleteEntity(entityId: URI, sub: Sub? = null): Either<APIException, Unit> = either {
entityQueryService.checkEntityExistence(entityId).bind()
authorizationService.userCanAdminEntity(entityId, sub.toOption()).bind()

val entity = deleteEntityPayload(entityId).bind()

entityAttributeService.deleteAttributes(entityId, ngsiLdDateTime()).bind()
val entity = permanentyDeleteEntityPayload(entityId).bind()
entityAttributeService.permanentlyDeleteAttributes(entityId).bind()
scopeService.deleteHistory(entityId).bind()
authorizationService.removeRightsOnEntity(entityId).bind()

entityEventService.publishEntityDeleteEvent(sub, entity)
}

@Transactional
suspend fun deleteEntityPayload(entityId: URI): Either<APIException, Entity> = either {
suspend fun permanentyDeleteEntityPayload(entityId: URI): Either<APIException, Entity> = either {
val entity = databaseClient.sql(
"""
DELETE FROM entity_payload
Expand Down
Loading

0 comments on commit 383cca7

Please sign in to comment.