diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityDomainKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityDomainKafkaHandler.java index ff6575e05..a3d8147a2 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityDomainKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityDomainKafkaHandler.java @@ -1,16 +1,19 @@ package org.folio.consumers; import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalId; +import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.util.Collections; +import java.util.Map; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.folio.dao.util.IdType; import org.folio.dao.util.RecordType; import org.folio.kafka.AsyncRecordHandler; -import org.folio.rest.jooq.enums.RecordState; import org.folio.services.RecordService; import org.folio.services.util.KafkaUtil; import org.springframework.stereotype.Component; @@ -34,6 +37,10 @@ public AuthorityDomainKafkaHandler(RecordService recordService) { @Override public Future handle(KafkaConsumerRecord consumerRecord) { log.trace("handle:: Handling kafka record: '{}'", consumerRecord); + + var kafkaHeaders = consumerRecord.headers(); + var okapiHeaders = toOkapiHeaders(kafkaHeaders); + String authorityId = consumerRecord.key(); if (isUnexpectedDomainEvent(consumerRecord)) { log.trace("handle:: Expected only {} domain type. Skipping authority domain kafka record [ID: '{}']", @@ -47,12 +54,12 @@ public Future handle(KafkaConsumerRecord consumerRecord) logInput(authorityId, eventSubType, tenantId); return (switch (eventSubType) { - case SOFT_DELETE -> performSoftDelete(authorityId, tenantId); + case SOFT_DELETE -> performSoftDelete(authorityId, tenantId, okapiHeaders); case HARD_DELETE -> performHardDelete(authorityId, tenantId); }).onFailure(throwable -> logError(authorityId, eventSubType, tenantId)); } - private Future performSoftDelete(String authorityId, String tenantId) { + private Future performSoftDelete(String authorityId, String tenantId, Map okapiHeaders) { var condition = filterRecordByExternalId(authorityId); return recordService.getRecords(condition, RecordType.MARC_AUTHORITY, Collections.emptyList(), 0, 1, tenantId) .compose(recordCollection -> { @@ -62,7 +69,7 @@ private Future performSoftDelete(String authorityId, String tenantId) { } var matchedId = recordCollection.getRecords().get(0).getMatchedId(); - return recordService.updateRecordsState(matchedId, RecordState.DELETED, RecordType.MARC_AUTHORITY, tenantId); + return recordService.deleteRecordById(matchedId, IdType.RECORD, okapiHeaders); }).map(authorityId); } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractDeleteEventHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractDeleteEventHandler.java index 7cd92d8bc..2bcd302d4 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractDeleteEventHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/handlers/actions/AbstractDeleteEventHandler.java @@ -1,27 +1,29 @@ package org.folio.services.handlers.actions; +import io.vertx.core.Future; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.ActionProfile; import org.folio.DataImportEventPayload; -import org.folio.dao.util.RecordType; +import org.folio.dao.util.IdType; import org.folio.processing.events.services.handler.EventHandler; import org.folio.processing.exceptions.EventProcessingException; import org.folio.rest.jaxrs.model.ExternalIdsHolder; import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE; import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.Record; -import org.folio.rest.jooq.enums.RecordState; import org.folio.services.RecordService; import org.folio.services.util.TypeConnection; +import javax.ws.rs.NotFoundException; import java.util.concurrent.CompletableFuture; import static java.util.Objects.isNull; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.folio.ActionProfile.Action.DELETE; +import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders; /** * The abstraction handles the DELETE action @@ -66,8 +68,10 @@ public CompletableFuture handle(DataImportEventPayload p /* Handles DELETE action */ private void handlePayload(DataImportEventPayload payload, CompletableFuture future) { var payloadRecord = Json.decodeValue(payload.getContext().get(getRecordKey()), Record.class); + var okapiHeaders = toOkapiHeaders(payload); LOG.info("handlePayload:: Handling 'delete' event for the record id = {}", payloadRecord.getId()); - recordService.updateRecordsState(payloadRecord.getMatchedId(), RecordState.DELETED, RecordType.MARC_AUTHORITY, payload.getTenant()) + recordService.deleteRecordById(payloadRecord.getMatchedId(), IdType.RECORD, okapiHeaders) + .recover(throwable -> throwable instanceof NotFoundException ? Future.succeededFuture() : Future.failedFuture(throwable)) .onSuccess(ar -> { payload.setEventType(getNextEventType()); payload.getContext().remove(getRecordKey()); diff --git a/mod-source-record-storage-server/src/test/java/org/folio/consumers/AuthorityDomainKafkaHandlerTest.java b/mod-source-record-storage-server/src/test/java/org/folio/consumers/AuthorityDomainKafkaHandlerTest.java index 4de699fa8..e64bd57c0 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/consumers/AuthorityDomainKafkaHandlerTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/consumers/AuthorityDomainKafkaHandlerTest.java @@ -5,14 +5,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.vertx.core.json.Json; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -21,6 +25,7 @@ import org.folio.dao.RecordDao; import org.folio.dao.RecordDaoImpl; import org.folio.dao.util.IdType; +import org.folio.dao.util.ParsedRecordDaoUtil; import org.folio.dao.util.SnapshotDaoUtil; import org.folio.rest.jaxrs.model.ExternalIdsHolder; import org.folio.rest.jaxrs.model.ParsedRecord; @@ -29,6 +34,7 @@ import org.folio.rest.jaxrs.model.Snapshot; import org.folio.rest.jaxrs.model.SourceRecord; import org.folio.rest.jooq.enums.RecordState; +import org.folio.rest.util.OkapiConnectionParams; import org.folio.services.AbstractLBServiceTest; import org.folio.services.RecordService; import org.folio.services.RecordServiceImpl; @@ -54,6 +60,7 @@ public class AuthorityDomainKafkaHandlerTest extends AbstractLBServiceTest { private RecordService recordService; private Record record; private AuthorityDomainKafkaHandler handler; + private static final String currentDate = "20240718132044.6"; @BeforeClass public static void setUpClass() throws IOException { @@ -61,7 +68,10 @@ public static void setUpClass() throws IOException { .withContent( new ObjectMapper().readValue(TestUtil.readFileFromPath(RAW_MARC_RECORD_CONTENT_SAMPLE_PATH), String.class)); parsedRecord = new ParsedRecord().withId(recordId) - .withContent(TestUtil.readFileFromPath(PARSED_MARC_RECORD_CONTENT_SAMPLE_PATH)); + .withContent( + new JsonObject().put("leader", "01542ccm a2200361 4500") + .put("fields", new JsonArray() + .add(new JsonObject().put("005", currentDate)))); } @Before @@ -123,6 +133,16 @@ public void shouldSoftDeleteMarcAuthorityRecordOnSoftDeleteDomainEvent(TestConte context.assertTrue(result.result().isPresent()); SourceRecord updatedRecord = result.result().get(); context.assertTrue(updatedRecord.getDeleted()); + context.assertTrue(updatedRecord.getAdditionalInfo().getSuppressDiscovery()); + context.assertEquals("d", ParsedRecordDaoUtil.getLeaderStatus(updatedRecord.getParsedRecord())); + + //Complex verifying "005" field is NOT empty inside parsed record. + LinkedHashMap>> content = (LinkedHashMap>>) updatedRecord.getParsedRecord().getContent(); + LinkedHashMap map = content.get("fields").get(0); + String resulted005FieldValue = map.get("005"); + context.assertNotNull(resulted005FieldValue); + context.assertNotEquals(currentDate, resulted005FieldValue); + async.complete(); }); }); @@ -156,6 +176,9 @@ public void shouldHardDeleteMarcAuthorityRecordOnHardDeleteDomainEvent(TestConte private ConsumerRecord getConsumerRecord(HashMap payload) { ConsumerRecord consumerRecord = new ConsumerRecord<>("topic", 1, 1, recordId, Json.encode(payload)); consumerRecord.headers().add(new RecordHeader("domain-event-type", "DELETE".getBytes(StandardCharsets.UTF_8))); + consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_URL_HEADER, OKAPI_URL.getBytes(StandardCharsets.UTF_8))); + consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TENANT_HEADER, TENANT_ID.getBytes(StandardCharsets.UTF_8))); + consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TOKEN_HEADER, TOKEN.getBytes(StandardCharsets.UTF_8))); return consumerRecord; }