Skip to content

Commit

Permalink
fix(core): error sending Kafka events for batch operations when many …
Browse files Browse the repository at this point in the history
…times the same entity id (#1211)
  • Loading branch information
bobeal authored Aug 5, 2024
1 parent eb4def0 commit 5a89a8a
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 222 deletions.
1 change: 0 additions & 1 deletion search-service/config/detekt/baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
<ID>LongMethod:AttributeInstanceService.kt$AttributeInstanceService$@Transactional suspend fun create(attributeInstance: AttributeInstance): Either&lt;APIException, Unit&gt;</ID>
<ID>LongMethod:EnabledAuthorizationServiceTests.kt$EnabledAuthorizationServiceTests$@Test fun `it should return serialized access control entities with other rigths if user is owner`()</ID>
<ID>LongMethod:EntityAccessControlHandler.kt$EntityAccessControlHandler$@PostMapping("/{subjectId}/attrs", consumes = [MediaType.APPLICATION_JSON_VALUE, JSON_LD_CONTENT_TYPE]) suspend fun addRightsOnEntities( @RequestHeader httpHeaders: HttpHeaders, @PathVariable subjectId: String, @RequestBody requestBody: Mono&lt;String&gt; ): ResponseEntity&lt;*&gt;</ID>
<ID>LongMethod:EntityOperationHandlerTests.kt$EntityOperationHandlerTests$@Test fun `create batch entity should return a 207 when some entities already exist`()</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun mergePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:PatchAttributeTests.kt$PatchAttributeTests.Companion$@JvmStatic fun partialUpdatePatchProvider(): Stream&lt;Arguments&gt;</ID>
<ID>LongMethod:QueryServiceTests.kt$QueryServiceTests$@Test fun `it should query temporal entities as requested by query params`()</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.authorization.AuthorizationService
import com.egm.stellio.search.model.EntityPayload
import com.egm.stellio.search.model.UpdateResult
import com.egm.stellio.search.web.*
import com.egm.stellio.shared.model.APIException
Expand All @@ -21,7 +22,8 @@ import java.net.URI
class EntityOperationService(
private val entityPayloadService: EntityPayloadService,
private val temporalEntityAttributeService: TemporalEntityAttributeService,
private val authorizationService: AuthorizationService
private val authorizationService: AuthorizationService,
private val entityEventService: EntityEventService
) {

/**
Expand Down Expand Up @@ -85,7 +87,13 @@ class EntityOperationService(
val creationResults = entities.map { jsonLdNgsiLdEntity ->
either {
entityPayloadService.createEntity(jsonLdNgsiLdEntity.second, jsonLdNgsiLdEntity.first, sub)
.map {
.onRight {
entityEventService.publishEntityCreateEvent(
sub,
jsonLdNgsiLdEntity.second.id,
jsonLdNgsiLdEntity.second.types
)
}.map {
BatchEntitySuccess(jsonLdNgsiLdEntity.entityId())
}.mapLeft { apiException ->
BatchEntityError(jsonLdNgsiLdEntity.entityId(), arrayListOf(apiException.message))
Expand All @@ -104,14 +112,17 @@ class EntityOperationService(
return BatchOperationResult(creationResults.second.toMutableList(), creationResults.first.toMutableList())
}

suspend fun delete(entitiesIds: Set<URI>): BatchOperationResult {
val deletionResults = entitiesIds.map {
val entityId = it
suspend fun delete(entities: Set<EntityPayload>, sub: Sub?): BatchOperationResult {
val deletionResults = entities.map { entity ->
val entityId = entity.entityId
either {
entityPayloadService.deleteEntity(entityId)
.map {
.onRight {
authorizationService.removeRightsOnEntity(entityId)
}
.onRight {
entityEventService.publishEntityDeleteEvent(sub, entity)
}
.map {
BatchEntitySuccess(entityId)
}
Expand Down Expand Up @@ -229,35 +240,57 @@ class EntityOperationService(
jsonLdEntity.getModifiableMembers(),
disallowOverwrite,
sub
).bind()
).bind().also {
entityEventService.publishEntityReplaceEvent(
sub,
ngsiLdEntity.id,
ngsiLdEntity.types
)
}
}

suspend fun updateEntity(
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<APIException, UpdateResult> {
): Either<APIException, UpdateResult> = either {
val (jsonLdEntity, ngsiLdEntity) = entity
return entityPayloadService.appendAttributes(
entityPayloadService.appendAttributes(
ngsiLdEntity.id,
jsonLdEntity.getModifiableMembers(),
disallowOverwrite,
sub
)
).bind().also {
entityEventService.publishAttributeChangeEvents(
sub,
ngsiLdEntity.id,
jsonLdEntity.members,
it,
true
)
}
}

@SuppressWarnings("UnusedParameter")
suspend fun mergeEntity(
entity: JsonLdNgsiLdEntity,
disallowOverwrite: Boolean,
sub: Sub?
): Either<APIException, UpdateResult> {
): Either<APIException, UpdateResult> = either {
val (jsonLdEntity, ngsiLdEntity) = entity
return entityPayloadService.mergeEntity(
entityPayloadService.mergeEntity(
ngsiLdEntity.id,
jsonLdEntity.getModifiableMembers(),
null,
sub
)
).bind().also {
entityEventService.publishAttributeChangeEvents(
sub,
ngsiLdEntity.id,
jsonLdEntity.members,
it,
true
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import arrow.core.*
import arrow.core.raise.either
import com.egm.stellio.search.authorization.AuthorizationService
import com.egm.stellio.search.model.Query
import com.egm.stellio.search.service.EntityEventService
import com.egm.stellio.search.service.EntityOperationService
import com.egm.stellio.search.service.EntityPayloadService
import com.egm.stellio.search.service.QueryService
Expand Down Expand Up @@ -36,8 +35,7 @@ class EntityOperationHandler(
private val entityOperationService: EntityOperationService,
private val entityPayloadService: EntityPayloadService,
private val queryService: QueryService,
private val authorizationService: AuthorizationService,
private val entityEventService: EntityEventService
private val authorizationService: AuthorizationService
) {

/**
Expand All @@ -53,7 +51,6 @@ class EntityOperationHandler(
val (parsedEntities, unparsableEntities) = prepareEntitiesFromRequestBody(requestBody, httpHeaders).bind()

val (uniqueEntities, duplicateEntities) =

entityOperationService.splitEntitiesByUniqueness(parsedEntities)
val (existingEntities, newEntities) = entityOperationService.splitEntitiesByExistence(uniqueEntities)
val (unauthorizedEntities, authorizedEntities) = newEntities.partition {
Expand Down Expand Up @@ -121,11 +118,6 @@ class EntityOperationHandler(
else -> entityOperationService.replace(existingEntitiesAuthorized, sub.getOrNull())
}

if (options == "update")
publishUpdateEvents(sub.getOrNull(), updateOperationResult, parsedEntities)
else
publishReplaceEvents(sub.getOrNull(), updateOperationResult, parsedEntities)

batchOperationResult.errors.addAll(updateOperationResult.errors)
batchOperationResult.success.addAll(updateOperationResult.success)
}
Expand Down Expand Up @@ -171,8 +163,6 @@ class EntityOperationHandler(
val updateOperationResult =
entityOperationService.update(existingEntitiesAuthorized, disallowOverwrite, sub.getOrNull())

publishUpdateEvents(sub.getOrNull(), updateOperationResult, parsedEntities)

batchOperationResult.errors.addAll(updateOperationResult.errors)
batchOperationResult.success.addAll(updateOperationResult.success)
}
Expand Down Expand Up @@ -213,8 +203,6 @@ class EntityOperationHandler(
val mergeOperationResult =
entityOperationService.merge(existingEntitiesAuthorized, sub.getOrNull())

publishUpdateEvents(sub.getOrNull(), mergeOperationResult, parsedEntities)

batchOperationResult.errors.addAll(mergeOperationResult.errors)
batchOperationResult.success.addAll(mergeOperationResult.success)
}
Expand Down Expand Up @@ -261,15 +249,7 @@ class EntityOperationHandler(
}

if (entitiesUserCanDelete.isNotEmpty()) {
val deleteOperationResult = entityOperationService.delete(entitiesUserCanDelete.toSet())

deleteOperationResult.success.map { it.entityId }.forEach { uri ->
val entity = entitiesBeforeDelete.find { it.entityId == uri }!!
entityEventService.publishEntityDeleteEvent(
sub.getOrNull(),
entity
)
}
val deleteOperationResult = entityOperationService.delete(entitiesBeforeDelete.toSet(), sub.getOrNull())

batchOperationResult.errors.addAll(deleteOperationResult.errors)
batchOperationResult.success.addAll(deleteOperationResult.success)
Expand Down Expand Up @@ -391,50 +371,8 @@ class EntityOperationHandler(
if (entitiesToCreate.isNotEmpty()) {
val createOperationResult = entityOperationService.create(entitiesToCreate, sub.getOrNull())
authorizationService.createOwnerRights(createOperationResult.getSuccessfulEntitiesIds(), sub)
entitiesToCreate
.filter { it.second.id in createOperationResult.getSuccessfulEntitiesIds() }
.forEach {
val ngsiLdEntity = it.second
entityEventService.publishEntityCreateEvent(
sub.getOrNull(),
ngsiLdEntity.id,
ngsiLdEntity.types
)
}
batchOperationResult.errors.addAll(createOperationResult.errors)
batchOperationResult.success.addAll(createOperationResult.success)
}
}

private suspend fun publishReplaceEvents(
sub: String?,
updateBatchOperationResult: BatchOperationResult,
jsonLdNgsiLdEntities: List<JsonLdNgsiLdEntity>
) = jsonLdNgsiLdEntities.filter { it.entityId() in updateBatchOperationResult.getSuccessfulEntitiesIds() }
.forEach {
entityEventService.publishEntityReplaceEvent(
sub,
it.entityId(),
it.second.types
)
}

private suspend fun publishUpdateEvents(
sub: String?,
updateBatchOperationResult: BatchOperationResult,
jsonLdNgsiLdEntities: List<JsonLdNgsiLdEntity>
) {
updateBatchOperationResult.success.forEach {
val (jsonLdEntity, _) = jsonLdNgsiLdEntities.find { jsonLdNgsiLdEntity ->
jsonLdNgsiLdEntity.entityId() == it.entityId
}!!
entityEventService.publishAttributeChangeEvents(
sub,
it.entityId,
jsonLdEntity.members,
it.updateResult!!,
true
)
}
}
}
Loading

0 comments on commit 5a89a8a

Please sign in to comment.