Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MODSOURCE-854] Use deleteRecordBy id for soft delete of authority #668

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package org.folio.consumers;

import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalId;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collections;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.IdType;
import org.folio.dao.util.RecordType;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordService;
import org.folio.services.util.KafkaUtil;
import org.springframework.stereotype.Component;
Expand All @@ -34,6 +37,10 @@ public AuthorityDomainKafkaHandler(RecordService recordService) {
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
log.trace("handle:: Handling kafka record: '{}'", consumerRecord);

var kafkaHeaders = consumerRecord.headers();
var okapiHeaders = toOkapiHeaders(kafkaHeaders);

String authorityId = consumerRecord.key();
if (isUnexpectedDomainEvent(consumerRecord)) {
log.trace("handle:: Expected only {} domain type. Skipping authority domain kafka record [ID: '{}']",
Expand All @@ -47,12 +54,12 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)

logInput(authorityId, eventSubType, tenantId);
return (switch (eventSubType) {
case SOFT_DELETE -> performSoftDelete(authorityId, tenantId);
case SOFT_DELETE -> performSoftDelete(authorityId, tenantId, okapiHeaders);
case HARD_DELETE -> performHardDelete(authorityId, tenantId);
}).onFailure(throwable -> logError(authorityId, eventSubType, tenantId));
}

private Future<String> performSoftDelete(String authorityId, String tenantId) {
private Future<String> performSoftDelete(String authorityId, String tenantId, Map<String, String> okapiHeaders) {
var condition = filterRecordByExternalId(authorityId);
return recordService.getRecords(condition, RecordType.MARC_AUTHORITY, Collections.emptyList(), 0, 1, tenantId)
.compose(recordCollection -> {
Expand All @@ -62,7 +69,7 @@ private Future<String> performSoftDelete(String authorityId, String tenantId) {
}
var matchedId = recordCollection.getRecords().get(0).getMatchedId();

return recordService.updateRecordsState(matchedId, RecordState.DELETED, RecordType.MARC_AUTHORITY, tenantId);
return recordService.deleteRecordById(matchedId, IdType.RECORD, okapiHeaders);
}).map(authorityId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
package org.folio.services.handlers.actions;

import io.vertx.core.Future;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.ActionProfile;
import org.folio.DataImportEventPayload;
import org.folio.dao.util.RecordType;
import org.folio.dao.util.IdType;
import org.folio.processing.events.services.handler.EventHandler;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.rest.jaxrs.model.ExternalIdsHolder;
import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE;
import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.services.RecordService;
import org.folio.services.util.TypeConnection;

import javax.ws.rs.NotFoundException;
import java.util.concurrent.CompletableFuture;

import static java.util.Objects.isNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.folio.ActionProfile.Action.DELETE;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;

/**
* The abstraction handles the DELETE action
Expand Down Expand Up @@ -66,8 +68,10 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload p
/* Handles DELETE action */
private void handlePayload(DataImportEventPayload payload, CompletableFuture<DataImportEventPayload> future) {
var payloadRecord = Json.decodeValue(payload.getContext().get(getRecordKey()), Record.class);
var okapiHeaders = toOkapiHeaders(payload);
LOG.info("handlePayload:: Handling 'delete' event for the record id = {}", payloadRecord.getId());
recordService.updateRecordsState(payloadRecord.getMatchedId(), RecordState.DELETED, RecordType.MARC_AUTHORITY, payload.getTenant())
recordService.deleteRecordById(payloadRecord.getMatchedId(), IdType.RECORD, okapiHeaders)
.recover(throwable -> throwable instanceof NotFoundException ? Future.succeededFuture() : Future.failedFuture(throwable))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RomanChernetskyi Let's log here smth. At the debug level - that record was not found (with specific Id). And warn level - with an error message and cause.

.onSuccess(ar -> {
payload.setEventType(getNextEventType());
payload.getContext().remove(getRecordKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -21,6 +25,7 @@
import org.folio.dao.RecordDao;
import org.folio.dao.RecordDaoImpl;
import org.folio.dao.util.IdType;
import org.folio.dao.util.ParsedRecordDaoUtil;
import org.folio.dao.util.SnapshotDaoUtil;
import org.folio.rest.jaxrs.model.ExternalIdsHolder;
import org.folio.rest.jaxrs.model.ParsedRecord;
Expand All @@ -29,6 +34,7 @@
import org.folio.rest.jaxrs.model.Snapshot;
import org.folio.rest.jaxrs.model.SourceRecord;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.rest.util.OkapiConnectionParams;
import org.folio.services.AbstractLBServiceTest;
import org.folio.services.RecordService;
import org.folio.services.RecordServiceImpl;
Expand All @@ -54,14 +60,18 @@ public class AuthorityDomainKafkaHandlerTest extends AbstractLBServiceTest {
private RecordService recordService;
private Record record;
private AuthorityDomainKafkaHandler handler;
private static final String currentDate = "20240718132044.6";

@BeforeClass
public static void setUpClass() throws IOException {
rawRecord = new RawRecord().withId(recordId)
.withContent(
new ObjectMapper().readValue(TestUtil.readFileFromPath(RAW_MARC_RECORD_CONTENT_SAMPLE_PATH), String.class));
parsedRecord = new ParsedRecord().withId(recordId)
.withContent(TestUtil.readFileFromPath(PARSED_MARC_RECORD_CONTENT_SAMPLE_PATH));
.withContent(
new JsonObject().put("leader", "01542ccm a2200361 4500")
.put("fields", new JsonArray()
.add(new JsonObject().put("005", currentDate))));
}

@Before
Expand Down Expand Up @@ -123,6 +133,16 @@ public void shouldSoftDeleteMarcAuthorityRecordOnSoftDeleteDomainEvent(TestConte
context.assertTrue(result.result().isPresent());
SourceRecord updatedRecord = result.result().get();
context.assertTrue(updatedRecord.getDeleted());
context.assertTrue(updatedRecord.getAdditionalInfo().getSuppressDiscovery());
context.assertEquals("d", ParsedRecordDaoUtil.getLeaderStatus(updatedRecord.getParsedRecord()));

//Complex verifying "005" field is NOT empty inside parsed record.
LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>> content = (LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>>) updatedRecord.getParsedRecord().getContent();
LinkedHashMap<String, String> map = content.get("fields").get(0);
String resulted005FieldValue = map.get("005");
context.assertNotNull(resulted005FieldValue);
context.assertNotEquals(currentDate, resulted005FieldValue);

async.complete();
});
});
Expand Down Expand Up @@ -156,6 +176,9 @@ public void shouldHardDeleteMarcAuthorityRecordOnHardDeleteDomainEvent(TestConte
private ConsumerRecord<String, String> getConsumerRecord(HashMap<String, String> payload) {
ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("topic", 1, 1, recordId, Json.encode(payload));
consumerRecord.headers().add(new RecordHeader("domain-event-type", "DELETE".getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_URL_HEADER, OKAPI_URL.getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TENANT_HEADER, TENANT_ID.getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TOKEN_HEADER, TOKEN.getBytes(StandardCharsets.UTF_8)));
return consumerRecord;
}

Expand Down