Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): error sending Kafka events for batch operations when many times the same entity id #1211

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either&lt;APIException, Unit&gt;</ID>
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityOperationHandlerTests.kt$EntityOperationHandlerTests$@Test fun `create batch entity should return a 207 when some entities already exist`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:QueryServiceTests.kt$QueryServiceTests$@Test fun `it should query temporal entities as requested by query params`()</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.authorization.AuthorizationService
import com.egm.stellio.search.model.EntityPayload
import com.egm.stellio.search.model.UpdateResult
import com.egm.stellio.search.web.*
import com.egm.stellio.shared.model.APIException
Expand All @@ -21,7 +22,8 @@ import java.net.URI
class EntityOperationService(
private val entityPayloadService: EntityPayloadService,
private val temporalEntityAttributeService: TemporalEntityAttributeService,
private val authorizationService: AuthorizationService
private val authorizationService: AuthorizationService,
private val entityEventService: EntityEventService
) {

/**
Expand Down Expand Up @@ -85,7 +87,13 @@ class EntityOperationService(
val creationResults = entities.map { jsonLdNgsiLdEntity ->
either {
entityPayloadService.createEntity(jsonLdNgsiLdEntity.second, jsonLdNgsiLdEntity.first, sub)
.map {
.onRight {
entityEventService.publishEntityCreateEvent(
sub,
jsonLdNgsiLdEntity.second.id,
jsonLdNgsiLdEntity.second.types
)
}.map {
BatchEntitySuccess(jsonLdNgsiLdEntity.entityId())
}.mapLeft { apiException ->
BatchEntityError(jsonLdNgsiLdEntity.entityId(), arrayListOf(apiException.message))
Expand All @@ -104,14 +112,17 @@ class EntityOperationService(
return BatchOperationResult(creationResults.second.toMutableList(), creationResults.first.toMutableList())
}

suspend fun delete(entitiesIds: Set<URI>): BatchOperationResult {
val deletionResults = entitiesIds.map {
val entityId = it
suspend fun delete(entities: Set<EntityPayload>, sub: Sub?): BatchOperationResult {
val deletionResults = entities.map { entity ->
val entityId = entity.entityId
either {
entityPayloadService.deleteEntity(entityId)
.map {
.onRight {
authorizationService.removeRightsOnEntity(entityId)
}
.onRight {
entityEventService.publishEntityDeleteEvent(sub, entity)
}
.map {
BatchEntitySuccess(entityId)
}
Expand Down Expand Up @@ -229,35 +240,57 @@ class EntityOperationService(
jsonLdEntity.getModifiableMembers(),
disallowOverwrite,
sub
).bind()
).bind().also {
entityEventService.publishEntityReplaceEvent(
sub,
ngsiLdEntity.id,
ngsiLdEntity.types
)
}
}

suspend fun updateEntity(
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<APIException, UpdateResult> {
): Either<APIException, UpdateResult> = either {
val (jsonLdEntity, ngsiLdEntity) = entity
return entityPayloadService.appendAttributes(
entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getModifiableMembers(),
disallowOverwrite,
sub
)
).bind().also {
entityEventService.publishAttributeChangeEvents(
sub,
ngsiLdEntity.id,
jsonLdEntity.members,
it,
true
)
}
}

@SuppressWarnings("UnusedParameter")
suspend fun mergeEntity(
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<APIException, UpdateResult> {
): Either<APIException, UpdateResult> = either {
val (jsonLdEntity, ngsiLdEntity) = entity
return entityPayloadService.mergeEntity(
entityPayloadService.mergeEntity(
ngsiLdEntity.id,
jsonLdEntity.getModifiableMembers(),
null,
sub
)
).bind().also {
entityEventService.publishAttributeChangeEvents(
sub,
ngsiLdEntity.id,
jsonLdEntity.members,
it,
true
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import arrow.core.*
import arrow.core.raise.either
import com.egm.stellio.search.authorization.AuthorizationService
import com.egm.stellio.search.model.Query
import com.egm.stellio.search.service.EntityEventService
import com.egm.stellio.search.service.EntityOperationService
import com.egm.stellio.search.service.EntityPayloadService
import com.egm.stellio.search.service.QueryService
Expand Down Expand Up @@ -36,8 +35,7 @@ class EntityOperationHandler(
private val entityOperationService: EntityOperationService,
private val entityPayloadService: EntityPayloadService,
private val queryService: QueryService,
private val authorizationService: AuthorizationService,
private val entityEventService: EntityEventService
private val authorizationService: AuthorizationService
) {

/**
Expand All @@ -53,7 +51,6 @@ class EntityOperationHandler(
val (parsedEntities, unparsableEntities) = prepareEntitiesFromRequestBody(requestBody, httpHeaders).bind()

val (uniqueEntities, duplicateEntities) =

entityOperationService.splitEntitiesByUniqueness(parsedEntities)
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(uniqueEntities)
val (unauthorizedEntities, authorizedEntities) = newEntities.partition {
Expand Down Expand Up @@ -121,11 +118,6 @@ class EntityOperationHandler(
else -> entityOperationService.replace(existingEntitiesAuthorized, sub.getOrNull())
}

if (options == "update")
publishUpdateEvents(sub.getOrNull(), updateOperationResult, parsedEntities)
else
publishReplaceEvents(sub.getOrNull(), updateOperationResult, parsedEntities)

batchOperationResult.errors.addAll(updateOperationResult.errors)
batchOperationResult.success.addAll(updateOperationResult.success)
}
Expand Down Expand Up @@ -171,8 +163,6 @@ class EntityOperationHandler(
val updateOperationResult =
entityOperationService.update(existingEntitiesAuthorized, disallowOverwrite, sub.getOrNull())

publishUpdateEvents(sub.getOrNull(), updateOperationResult, parsedEntities)

batchOperationResult.errors.addAll(updateOperationResult.errors)
batchOperationResult.success.addAll(updateOperationResult.success)
}
Expand Down Expand Up @@ -213,8 +203,6 @@ class EntityOperationHandler(
val mergeOperationResult =
entityOperationService.merge(existingEntitiesAuthorized, sub.getOrNull())

publishUpdateEvents(sub.getOrNull(), mergeOperationResult, parsedEntities)

batchOperationResult.errors.addAll(mergeOperationResult.errors)
batchOperationResult.success.addAll(mergeOperationResult.success)
}
Expand Down Expand Up @@ -261,15 +249,7 @@ class EntityOperationHandler(
}

if (entitiesUserCanDelete.isNotEmpty()) {
val deleteOperationResult = entityOperationService.delete(entitiesUserCanDelete.toSet())

deleteOperationResult.success.map { it.entityId }.forEach { uri ->
val entity = entitiesBeforeDelete.find { it.entityId == uri }!!
entityEventService.publishEntityDeleteEvent(
sub.getOrNull(),
entity
)
}
val deleteOperationResult = entityOperationService.delete(entitiesBeforeDelete.toSet(), sub.getOrNull())

batchOperationResult.errors.addAll(deleteOperationResult.errors)
batchOperationResult.success.addAll(deleteOperationResult.success)
Expand Down Expand Up @@ -391,50 +371,8 @@ class EntityOperationHandler(
if (entitiesToCreate.isNotEmpty()) {
val createOperationResult = entityOperationService.create(entitiesToCreate, sub.getOrNull())
authorizationService.createOwnerRights(createOperationResult.getSuccessfulEntitiesIds(), sub)
entitiesToCreate
.filter { it.second.id in createOperationResult.getSuccessfulEntitiesIds() }
.forEach {
val ngsiLdEntity = it.second
entityEventService.publishEntityCreateEvent(
sub.getOrNull(),
ngsiLdEntity.id,
ngsiLdEntity.types
)
}
batchOperationResult.errors.addAll(createOperationResult.errors)
batchOperationResult.success.addAll(createOperationResult.success)
}
}

private suspend fun publishReplaceEvents(
sub: String?,
updateBatchOperationResult: BatchOperationResult,
jsonLdNgsiLdEntities: List<JsonLdNgsiLdEntity>
) = jsonLdNgsiLdEntities.filter { it.entityId() in updateBatchOperationResult.getSuccessfulEntitiesIds() }
.forEach {
entityEventService.publishEntityReplaceEvent(
sub,
it.entityId(),
it.second.types
)
}

private suspend fun publishUpdateEvents(
sub: String?,
updateBatchOperationResult: BatchOperationResult,
jsonLdNgsiLdEntities: List<JsonLdNgsiLdEntity>
) {
updateBatchOperationResult.success.forEach {
val (jsonLdEntity, _) = jsonLdNgsiLdEntities.find { jsonLdNgsiLdEntity ->
jsonLdNgsiLdEntity.entityId() == it.entityId
}!!
entityEventService.publishAttributeChangeEvents(
sub,
it.entityId,
jsonLdEntity.members,
it.updateResult!!,
true
)
}
}
}
Loading
Loading