From b47cf32852ccf77904dd20ffe46f4232d166495d Mon Sep 17 00:00:00 2001 From: Yaroslav_Kiriak Date: Thu, 26 Oct 2023 10:28:21 +0300 Subject: [PATCH] MODSOURMAN-1020: add IncomingRecord entity DAO and service layers; save IncomingRecords with JournalRecords after incoming records are parsed --- NEWS.md | 3 + .../java/org/folio/dao/IncomingRecordDao.java | 23 ++++++ .../org/folio/dao/IncomingRecordDaoImpl.java | 52 ++++++++++++ .../org/folio/dao/JobExecutionDaoImpl.java | 6 +- .../org/folio/dao/JournalRecordDaoImpl.java | 2 +- .../services/ChangeEngineServiceImpl.java | 24 +++++- .../folio/services/IncomingRecordService.java | 19 +++++ .../services/IncomingRecordServiceImpl.java | 20 +++++ .../folio/services/JournalRecordService.java | 10 +++ .../services/JournalRecordServiceImpl.java | 8 +- .../folio/services/journal/JournalUtil.java | 41 ++++++++++ .../create_incoming_records_table.sql | 10 +++ .../templates/db_scripts/schema.json | 5 ++ .../folio/dao/IncomingRecordDaoImplTest.java | 66 +++++++++++++++ .../services/ChangeEngineServiceImplTest.java | 7 ++ ...tDrivenChunkProcessingServiceImplTest.java | 5 +- .../IncomingRecordServiceImplUnitTest.java | 34 ++++++++ .../org/folio/services/JournalUtilTest.java | 82 ++++++++++++++++++- ...ProcessedEventHandlingServiceImplTest.java | 28 +++---- ramls/change-manager.raml | 1 + ramls/raml-storage | 2 +- 21 files changed, 421 insertions(+), 27 deletions(-) create mode 100644 mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDao.java create mode 100644 mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDaoImpl.java create mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordService.java create mode 100644 mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordServiceImpl.java create mode 100644 mod-source-record-manager-server/src/main/resources/templates/db_scripts/create_incoming_records_table.sql create mode 100644 mod-source-record-manager-server/src/test/java/org/folio/dao/IncomingRecordDaoImplTest.java create mode 100644 mod-source-record-manager-server/src/test/java/org/folio/services/IncomingRecordServiceImplUnitTest.java diff --git a/NEWS.md b/NEWS.md index 55396ce13..869cb0829 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,6 @@ +## 2023-xx-xx v3.8.0-SNAPSHOT +* [MODSOURMAN-1020](https://issues.folio.org/browse/MODSOURMAN-1020) Add table to save incoming records for DI logs + ## 2023-10-13 v3.7.0 * [MODSOURMAN-1045](https://issues.folio.org/browse/MODSOURMAN-1045) Allow create action with non-matches for instance without match profile * [MODSOURMAN-1003](https://issues.folio.org/browse/MODSOURMAN-1003) Allow create action with non-matches for instance diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDao.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDao.java new file mode 100644 index 000000000..062f98584 --- /dev/null +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDao.java @@ -0,0 +1,23 @@ +package org.folio.dao; + +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import org.folio.rest.jaxrs.model.IncomingRecord; + +import java.util.List; + +/** + * DAO interface for the {@link IncomingRecord} entity + */ +public interface IncomingRecordDao { + + /** + * Saves {@link IncomingRecord} entities into DB + * + * @param incomingRecords {@link IncomingRecord} entities to save + * @param tenantId tenant id + * @return future with created incomingRecords entities represented as row set + */ + Future>> saveBatch(List incomingRecords, String tenantId); +} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDaoImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDaoImpl.java new file mode 100644 index 000000000..03d4ff1f9 --- /dev/null +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/IncomingRecordDaoImpl.java @@ -0,0 +1,52 @@ +package org.folio.dao; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonObject; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.Tuple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.dao.util.PostgresClientFactory; +import org.folio.rest.jaxrs.model.IncomingRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.UUID; + +import static java.lang.String.format; +import static org.folio.rest.persist.PostgresClient.convertToPsqlStandard; + +@Repository +public class IncomingRecordDaoImpl implements IncomingRecordDao { + + private static final Logger LOGGER = LogManager.getLogger(); + public static final String INCOMING_RECORDS_TABLE = "incoming_records"; + private static final String INSERT_SQL = "INSERT INTO %s.%s (id, job_execution_id, incoming_record) VALUES ($1, $2, $3)"; + + @Autowired + private PostgresClientFactory pgClientFactory; + + @Override + public Future>> saveBatch(List incomingRecords, String tenantId) { + LOGGER.info("saveBatch:: Save IncomingRecord entity to the {} table", INCOMING_RECORDS_TABLE); + Promise>> promise = Promise.promise(); + try { + String query = format(INSERT_SQL, convertToPsqlStandard(tenantId), INCOMING_RECORDS_TABLE); + List tuples = incomingRecords.stream().map(this::prepareInsertQueryParameters).toList(); + LOGGER.debug("IncomingRecordDaoImpl:: Save query = {}; tuples = {}", query, tuples); + pgClientFactory.createInstance(tenantId).execute(query, tuples, promise); + } catch (Exception e) { + LOGGER.warn("saveBatch:: Error saving IncomingRecord entity", e); + promise.fail(e); + } + return promise.future().onFailure(e -> LOGGER.warn("saveBatch:: Error saving JournalRecord entity", e)); + } + + private Tuple prepareInsertQueryParameters(IncomingRecord incomingRecord) { + return Tuple.of(UUID.fromString(incomingRecord.getId()), UUID.fromString(incomingRecord.getJobExecutionId()), + JsonObject.mapFrom(incomingRecord)); + } +} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java index 0b95756cb..e3fa04dd3 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/JobExecutionDaoImpl.java @@ -56,6 +56,7 @@ import static java.lang.String.format; import static java.util.Objects.nonNull; import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.folio.dao.IncomingRecordDaoImpl.INCOMING_RECORDS_TABLE; import static org.folio.dao.util.JobExecutionDBConstants.COMPLETED_DATE_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.CURRENTLY_PROCESSED_FIELD; import static org.folio.dao.util.JobExecutionDBConstants.ERROR_STATUS_FIELD; @@ -597,12 +598,13 @@ public Future hardDeleteJobExecutions(long diffNumberOfDays, String ten return Future.succeededFuture(); } - UUID[] uuids = jobExecutionIds.stream().map(UUID::fromString).collect(Collectors.toList()).toArray(UUID[]::new); + UUID[] uuids = jobExecutionIds.stream().map(UUID::fromString).toList().toArray(UUID[]::new); Future> jobExecutionProgressFuture = Future.future(rowSetPromise -> deleteFromRelatedTable(PROGRESS_TABLE_NAME, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)); Future> jobExecutionSourceChunksFuture = Future.future(rowSetPromise -> deleteFromRelatedTableWithDeprecatedNaming(JOB_EXECUTION_SOURCE_CHUNKS_TABLE_NAME, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)); Future> journalRecordsFuture = Future.future(rowSetPromise -> deleteFromRelatedTable(JOURNAL_RECORDS_TABLE_NAME, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)); - return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture) + Future> incomingRecordsFuture = Future.future(rowSetPromise -> deleteFromRelatedTable(INCOMING_RECORDS_TABLE, uuids, sqlConnection, tenantId, rowSetPromise, postgresClient)); + return CompositeFuture.all(jobExecutionProgressFuture, jobExecutionSourceChunksFuture, journalRecordsFuture, incomingRecordsFuture) .compose(ar -> Future.>future(rowSetPromise -> deleteFromJobExecutionTable(uuids, sqlConnection, tenantId, rowSetPromise, postgresClient))) .map(true); })); diff --git a/mod-source-record-manager-server/src/main/java/org/folio/dao/JournalRecordDaoImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/dao/JournalRecordDaoImpl.java index e52d5cc58..d9110d669 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/dao/JournalRecordDaoImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/dao/JournalRecordDaoImpl.java @@ -192,7 +192,7 @@ private Tuple prepareInsertQueryParameters(JournalRecord journalRecord) { UUID.fromString(journalRecord.getJobExecutionId()), UUID.fromString(journalRecord.getSourceId()), journalRecord.getSourceRecordOrder(), - journalRecord.getEntityType().toString(), + journalRecord.getEntityType() != null ? journalRecord.getEntityType().toString() : EMPTY, journalRecord.getEntityId(), journalRecord.getEntityHrId() != null ? journalRecord.getEntityHrId() : EMPTY, journalRecord.getActionType().toString(), diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java index aa26f5d4e..7486f8b56 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/ChangeEngineServiceImpl.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger; import org.folio.MappingProfile; import org.folio.services.afterprocessing.FieldModificationService; +import org.folio.services.journal.JournalUtil; import org.folio.services.validation.JobProfileSnapshotValidationService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -123,6 +124,8 @@ public class ChangeEngineServiceImpl implements ChangeEngineService { private final JobProfileSnapshotValidationService jobProfileSnapshotValidationService; private final KafkaConfig kafkaConfig; private final FieldModificationService fieldModificationService; + private final IncomingRecordService incomingRecordService; + private final JournalRecordService journalRecordService; @Value("${srm.kafka.RawChunksKafkaHandler.maxDistributionNum:100}") private int maxDistributionNum; @@ -138,7 +141,9 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio @Autowired MappingMetadataService mappingMetadataService, @Autowired JobProfileSnapshotValidationService jobProfileSnapshotValidationService, @Autowired KafkaConfig kafkaConfig, - @Autowired FieldModificationService fieldModificationService) { + @Autowired FieldModificationService fieldModificationService, + @Autowired IncomingRecordService incomingRecordService, + @Autowired JournalRecordService journalRecordService) { this.jobExecutionSourceChunkDao = jobExecutionSourceChunkDao; this.jobExecutionService = jobExecutionService; this.marcRecordAnalyzer = marcRecordAnalyzer; @@ -148,6 +153,8 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio this.jobProfileSnapshotValidationService = jobProfileSnapshotValidationService; this.kafkaConfig = kafkaConfig; this.fieldModificationService = fieldModificationService; + this.incomingRecordService = incomingRecordService; + this.journalRecordService = journalRecordService; } @Override @@ -159,9 +166,13 @@ public Future> parseRawRecordsChunkForJobExecution(RawRecordsDto ch params.getTenantId(), acceptInstanceId, params); futureParsedRecords - .compose(parsedRecords -> isJobProfileCompatibleWithRecordsType(jobExecution.getJobProfileSnapshotWrapper(), parsedRecords) - ? Future.succeededFuture(parsedRecords) - : Future.failedFuture(prepareWrongJobProfileErrorMessage(jobExecution, parsedRecords))) + .compose(parsedRecords -> { + saveIncomingAndJournalRecords(parsedRecords, params.getTenantId()); + + return isJobProfileCompatibleWithRecordsType(jobExecution.getJobProfileSnapshotWrapper(), parsedRecords) + ? Future.succeededFuture(parsedRecords) + : Future.failedFuture(prepareWrongJobProfileErrorMessage(jobExecution, parsedRecords)); + }) .compose(parsedRecords -> ensureMappingMetaDataSnapshot(jobExecution.getId(), parsedRecords, params) .map(parsedRecords)) .onSuccess(parsedRecords -> { @@ -209,6 +220,11 @@ public Future> parseRawRecordsChunkForJobExecution(RawRecordsDto ch return promise.future(); } + private void saveIncomingAndJournalRecords(List parsedRecords, String tenantId) { + incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId); + journalRecordService.saveBatch(JournalUtil.buildJournalRecordsByRecords(parsedRecords), tenantId); + } + /** * Checks whether job profile snapshot is compatible with record type of the specified {@code records}. * Returns {@code true} for the specified records that have not been parsed successfully and therefore diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordService.java b/mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordService.java new file mode 100644 index 000000000..b2f8623db --- /dev/null +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordService.java @@ -0,0 +1,19 @@ +package org.folio.services; + +import org.folio.rest.jaxrs.model.IncomingRecord; + +import java.util.List; + +/** + * {@link IncomingRecord} Service interface + */ +public interface IncomingRecordService { + + /** + * Saves {@link IncomingRecord}s into DB + * + * @param incomingRecords incoming records to be saved + * @param tenantId tenant + */ + void saveBatch(List incomingRecords, String tenantId); +} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordServiceImpl.java new file mode 100644 index 000000000..0358f8372 --- /dev/null +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/IncomingRecordServiceImpl.java @@ -0,0 +1,20 @@ +package org.folio.services; + +import org.folio.dao.IncomingRecordDao; +import org.folio.rest.jaxrs.model.IncomingRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public class IncomingRecordServiceImpl implements IncomingRecordService { + + @Autowired + private IncomingRecordDao incomingRecordDao; + + @Override + public void saveBatch(List incomingRecords, String tenantId) { + incomingRecordDao.saveBatch(incomingRecords, tenantId); + } +} diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordService.java b/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordService.java index d17c65c38..b7e937e23 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordService.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordService.java @@ -2,10 +2,12 @@ import io.vertx.core.Future; import org.folio.rest.jaxrs.model.JobExecutionSummaryDto; +import org.folio.rest.jaxrs.model.JournalRecord; import org.folio.rest.jaxrs.model.JobLogEntryDtoCollection; import org.folio.rest.jaxrs.model.JournalRecordCollection; import org.folio.rest.jaxrs.model.RecordProcessingLogDto; +import java.util.List; import java.util.Optional; /** @@ -75,4 +77,12 @@ public interface JournalRecordService { * @return Future with JournalRecords updated number */ Future updateErrorJournalRecordsByOrderIdAndJobExecution(String jobExecutionId, String orderId, String error, String tenantId); + + /** + * Saves set of {@link JournalRecord} entities + * + * @param journalRecords journal records to save + * @param tenantId tenant id + */ + void saveBatch(List journalRecords, String tenantId); } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordServiceImpl.java b/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordServiceImpl.java index 6b9c74c3a..99b3464b7 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordServiceImpl.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/JournalRecordServiceImpl.java @@ -1,15 +1,16 @@ package org.folio.services; import io.vertx.core.Future; -import io.vertx.core.json.JsonObject; import org.folio.dao.JournalRecordDao; import org.folio.rest.jaxrs.model.JobExecutionSummaryDto; +import org.folio.rest.jaxrs.model.JournalRecord; import org.folio.rest.jaxrs.model.JobLogEntryDtoCollection; import org.folio.rest.jaxrs.model.JournalRecordCollection; import org.folio.rest.jaxrs.model.RecordProcessingLogDto; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.List; import java.util.Optional; /** @@ -55,4 +56,9 @@ public Future> getJobExecutionSummaryDto(String public Future updateErrorJournalRecordsByOrderIdAndJobExecution(String jobExecutionId, String orderId, String error, String tenantId) { return journalRecordDao.updateErrorJournalRecordsByOrderIdAndJobExecution(jobExecutionId, orderId, error, tenantId); } + + @Override + public void saveBatch(List journalRecords, String tenantId) { + journalRecordDao.saveBatch(journalRecords, tenantId); + } } diff --git a/mod-source-record-manager-server/src/main/java/org/folio/services/journal/JournalUtil.java b/mod-source-record-manager-server/src/main/java/org/folio/services/journal/JournalUtil.java index 7ebbc4b17..6b831b76f 100644 --- a/mod-source-record-manager-server/src/main/java/org/folio/services/journal/JournalUtil.java +++ b/mod-source-record-manager-server/src/main/java/org/folio/services/journal/JournalUtil.java @@ -9,9 +9,11 @@ import org.apache.logging.log4j.Logger; import org.folio.DataImportEventPayload; import org.folio.rest.jaxrs.model.DataImportEventTypes; +import org.folio.rest.jaxrs.model.IncomingRecord; import org.folio.rest.jaxrs.model.JournalRecord; import org.folio.rest.jaxrs.model.Record; +import java.util.Arrays; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -62,6 +64,45 @@ private static String extractRecord(HashMap context) { .orElse(EMPTY); } + public static List buildJournalRecordsByRecords(List records) { + return records.stream().map(record -> { + JournalRecord journalRecord = new JournalRecord() + .withId(UUID.randomUUID().toString()) + .withJobExecutionId(record.getSnapshotId()) + .withSourceId(record.getId()) + .withSourceRecordOrder(record.getOrder()) + .withActionType(JournalRecord.ActionType.PARSE) + .withActionDate(new Date()) + .withActionStatus(record.getErrorRecord() == null ? JournalRecord.ActionStatus.COMPLETED : JournalRecord.ActionStatus.ERROR); + if (record.getRecordType() != null) { + Arrays.stream(JournalRecord.EntityType.values()) + .filter(v -> v.value().startsWith(record.getRecordType().value())) + .findFirst().ifPresent(journalRecord::setEntityType); + } + if (record.getErrorRecord() != null) { + journalRecord.setError(record.getErrorRecord().getDescription()); + } + return journalRecord; + }).toList(); + } + + public static List buildIncomingRecordsByRecords(List records) { + return records.stream().map(record -> { + IncomingRecord incomingRecord = new IncomingRecord() + .withId(record.getId()) + .withJobExecutionId(record.getSnapshotId()) + .withOrder(record.getOrder()) + .withRawRecordContent(record.getRawRecord().getContent()); + if (record.getRecordType() != null) { + incomingRecord.setRecordType(IncomingRecord.RecordType.fromValue(record.getRecordType().value())); + } + if (record.getParsedRecord() != null) { + incomingRecord.setParsedRecordContent(String.valueOf(record.getParsedRecord().getContent())); + } + return incomingRecord; + }).toList(); + } + public static List buildJournalRecordsByEvent(DataImportEventPayload eventPayload, JournalRecord.ActionType actionType, JournalRecord.EntityType entityType, JournalRecord.ActionStatus actionStatus) throws JournalRecordMapperException { try { diff --git a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/create_incoming_records_table.sql b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/create_incoming_records_table.sql new file mode 100644 index 000000000..29632fb6e --- /dev/null +++ b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/create_incoming_records_table.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS incoming_records ( + id uuid NOT NULL, + job_execution_id uuid NOT NULL, + incoming_record jsonb NOT NULL, + CONSTRAINT incoming_records_pkey PRIMARY KEY (id), + CONSTRAINT incoming_records_jobexecutionid_fkey FOREIGN KEY (job_execution_id) + REFERENCES job_execution (id) +); + +CREATE INDEX IF NOT EXISTS incoming_records_jobexecutionid_index ON incoming_records USING BTREE (job_execution_id); diff --git a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json index 5d5590bf6..e4289f96d 100644 --- a/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json +++ b/mod-source-record-manager-server/src/main/resources/templates/db_scripts/schema.json @@ -276,6 +276,11 @@ "run": "after", "snippet": "ALTER TABLE journal_records ADD COLUMN IF NOT EXISTS tenant_id text;", "fromModuleVersion": "mod-source-record-manager-3.7.0" + }, + { + "run": "after", + "snippetPath": "create_incoming_records_table.sql", + "fromModuleVersion": "mod-source-record-manager-3.8.0" } ] } diff --git a/mod-source-record-manager-server/src/test/java/org/folio/dao/IncomingRecordDaoImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/dao/IncomingRecordDaoImplTest.java new file mode 100644 index 000000000..0f9b214a6 --- /dev/null +++ b/mod-source-record-manager-server/src/test/java/org/folio/dao/IncomingRecordDaoImplTest.java @@ -0,0 +1,66 @@ +package org.folio.dao; + +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.folio.dao.util.PostgresClientFactory; +import org.folio.rest.impl.AbstractRestTest; +import org.folio.rest.jaxrs.model.IncomingRecord; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; + +import java.io.IOException; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(VertxUnitRunner.class) +public class IncomingRecordDaoImplTest extends AbstractRestTest { + + private static final String TENANT_ID = "diku"; + private static final String ID = "5105b55a-b9a3-4f76-9402-a5243ea63c95"; + + @Spy + PostgresClientFactory postgresClientFactory = new PostgresClientFactory(Vertx.vertx()); + @InjectMocks + private IncomingRecordDao incomingRecordDao = new IncomingRecordDaoImpl(); + + @Before + public void setUp(TestContext context) throws IOException { + MockitoAnnotations.openMocks(this); + super.setUp(context); + } + + @Test + public void saveBatch(TestContext context) { + Async async = context.async(); + + IncomingRecord incomingRecord1 = new IncomingRecord() + .withId(ID).withJobExecutionId(ID).withRecordType(IncomingRecord.RecordType.MARC_BIB).withOrder(0) + .withRawRecordContent("rawRecord").withParsedRecordContent("parsedRecord"); + IncomingRecord incomingRecord2 = new IncomingRecord() + .withId(ID).withJobExecutionId(ID).withRecordType(IncomingRecord.RecordType.MARC_BIB).withOrder(0) + .withRawRecordContent("rawRecord").withParsedRecordContent("parsedRecord"); + + incomingRecordDao.saveBatch(List.of(incomingRecord1, incomingRecord2), TENANT_ID) + .onComplete(ar -> { + context.assertTrue(ar.succeeded()); + context.assertEquals(2, ar.result()); + ar.result().forEach(rowSet -> { + context.assertEquals(3, rowSet.rowCount()); + rowSet.forEach(row -> { + context.assertEquals(ID, row.getString("id")); + context.assertEquals(ID, assertThat(row.getString("job_execution_id"))); + context.assertEquals(JsonObject.mapFrom(incomingRecord1).encode(), assertThat(row.getString("incoming_record"))); + }); + }); + async.complete(); + }); + } +} diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java index 510a4521f..c9c8fabe1 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/ChangeEngineServiceImplTest.java @@ -62,6 +62,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -98,6 +99,10 @@ public class ChangeEngineServiceImplTest { private JobProfileSnapshotValidationService jobProfileSnapshotValidationService; @Mock private FieldModificationService fieldModificationService; + @Mock + private IncomingRecordService incomingRecordService; + @Mock + private JournalRecordService journalRecordService; @Captor private ArgumentCaptor> kafkaHeadersCaptor; @@ -251,6 +256,8 @@ public void shouldFillRecordIdHeaderForMarkRecordWhen004FieldIsMissing() { .findFirst(); assertTrue(optionalRecordIdHeader.isPresent()); + verify(incomingRecordService, times(1)).saveBatch(any(), any()); + verify(journalRecordService, times(1)).saveBatch(any(), any()); } @Test diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java index 4bf96108c..857caa6a5 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/EventDrivenChunkProcessingServiceImplTest.java @@ -112,6 +112,9 @@ public class EventDrivenChunkProcessingServiceImplTest extends AbstractRestTest private JobExecutionServiceImpl jobExecutionService; @InjectMocks @Spy + private IncomingRecordServiceImpl incomingRecordService; + @InjectMocks + @Spy private JournalRecordServiceImpl journalRecordService; @InjectMocks @Spy @@ -186,7 +189,7 @@ public void setUp() throws IOException { JobProfileSnapshotValidationServiceImpl jobProfileSnapshotValidationService = new JobProfileSnapshotValidationServiceImpl(); changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService, recordsPublishingService, mappingMetadataService, jobProfileSnapshotValidationService, kafkaConfig, - fieldModificationService); + fieldModificationService, incomingRecordService, journalRecordService); ReflectionTestUtils.setField(changeEngineService, "maxDistributionNum", 10); ReflectionTestUtils.setField(changeEngineService, "batchSize", 100); chunkProcessingService = new EventDrivenChunkProcessingServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, changeEngineService, jobExecutionProgressService); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/IncomingRecordServiceImplUnitTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/IncomingRecordServiceImplUnitTest.java new file mode 100644 index 000000000..505d636b0 --- /dev/null +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/IncomingRecordServiceImplUnitTest.java @@ -0,0 +1,34 @@ +package org.folio.services; + +import org.folio.dao.IncomingRecordDao; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + +@RunWith(MockitoJUnitRunner.class) +public class IncomingRecordServiceImplUnitTest { + + @Mock + private IncomingRecordDao incomingRecordDao; + + @InjectMocks + private IncomingRecordService incomingRecordService = new IncomingRecordServiceImpl(); + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void saveBatch() { + incomingRecordService.saveBatch(any(), any()); + verify(incomingRecordDao).saveBatch(any(), any()); + } +} diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/JournalUtilTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/JournalUtilTest.java index a0c0bc7e5..bfa81a8d4 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/JournalUtilTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/JournalUtilTest.java @@ -6,7 +6,11 @@ import io.vertx.ext.unit.junit.VertxUnitRunner; import org.folio.DataImportEventPayload; import org.folio.Record; +import org.folio.rest.jaxrs.model.ErrorRecord; import org.folio.rest.jaxrs.model.JournalRecord; +import org.folio.rest.jaxrs.model.IncomingRecord; +import org.folio.rest.jaxrs.model.ParsedRecord; +import org.folio.rest.jaxrs.model.RawRecord; import org.folio.services.journal.JournalRecordMapperException; import org.folio.services.journal.JournalUtil; import org.junit.Assert; @@ -18,8 +22,8 @@ import java.util.List; import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; import static org.folio.DataImportEventTypes.DI_ERROR; -import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING; import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_UPDATED; import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.COMPLETED; import static org.folio.rest.jaxrs.model.JournalRecord.ActionStatus.ERROR; @@ -40,6 +44,82 @@ public class JournalUtilTest { private static final String CENTRAL_TENANT_ID_KEY = "CENTRAL_TENANT_ID"; + @Test + public void shouldBuildJournalRecordsByRecordsWithoutError() { + String recordId = UUID.randomUUID().toString(); + String snapshotId = UUID.randomUUID().toString(); + + org.folio.rest.jaxrs.model.Record record = new org.folio.rest.jaxrs.model.Record() + .withId(recordId) + .withSnapshotId(snapshotId) + .withOrder(0) + .withRecordType(org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB); + + List journalRecords = JournalUtil.buildJournalRecordsByRecords(List.of(record)); + + assertThat(journalRecords).hasSize(1); + assertThat(journalRecords.get(0).getId()).isNotBlank(); + assertThat(journalRecords.get(0).getJobExecutionId()).isEqualTo(snapshotId); + assertThat(journalRecords.get(0).getSourceId()).isEqualTo(recordId); + assertThat(journalRecords.get(0).getSourceRecordOrder()).isEqualTo(record.getOrder()); + assertThat(journalRecords.get(0).getActionType()).isEqualTo(JournalRecord.ActionType.PARSE); + assertThat(journalRecords.get(0).getActionDate()).isNotNull(); + assertThat(journalRecords.get(0).getActionStatus()).isEqualTo(JournalRecord.ActionStatus.COMPLETED); + assertThat(journalRecords.get(0).getEntityType()).isEqualTo(MARC_BIBLIOGRAPHIC); + assertThat(journalRecords.get(0).getError()).isNull(); + } + + @Test + public void shouldBuildJournalRecordsByRecordsWithError() { + String recordId = UUID.randomUUID().toString(); + String snapshotId = UUID.randomUUID().toString(); + + ErrorRecord errorRecord = new ErrorRecord().withDescription("error"); + org.folio.rest.jaxrs.model.Record record = new org.folio.rest.jaxrs.model.Record() + .withId(recordId) + .withSnapshotId(snapshotId) + .withOrder(0) + .withRecordType(org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB) + .withErrorRecord(errorRecord); + + List journalRecords = JournalUtil.buildJournalRecordsByRecords(List.of(record)); + + assertThat(journalRecords).hasSize(1); + assertThat(journalRecords.get(0).getId()).isNotBlank(); + assertThat(journalRecords.get(0).getJobExecutionId()).isEqualTo(snapshotId); + assertThat(journalRecords.get(0).getSourceId()).isEqualTo(recordId); + assertThat(journalRecords.get(0).getSourceRecordOrder()).isEqualTo(record.getOrder()); + assertThat(journalRecords.get(0).getActionType()).isEqualTo(JournalRecord.ActionType.PARSE); + assertThat(journalRecords.get(0).getActionDate()).isNotNull(); + assertThat(journalRecords.get(0).getActionStatus()).isEqualTo(ERROR); + assertThat(journalRecords.get(0).getEntityType()).isEqualTo(MARC_BIBLIOGRAPHIC); + assertThat(journalRecords.get(0).getError()).isEqualTo(errorRecord.getDescription()); + } + + @Test + public void shouldBuildIncomingRecordsByRecords() { + String recordId = UUID.randomUUID().toString(); + String snapshotId = UUID.randomUUID().toString(); + + org.folio.rest.jaxrs.model.Record record = new org.folio.rest.jaxrs.model.Record() + .withId(recordId) + .withSnapshotId(snapshotId) + .withOrder(0) + .withRawRecord(new RawRecord().withContent("rawRecord")) + .withRecordType(org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB) + .withParsedRecord(new ParsedRecord().withContent("parsedRecord")); + + List incomingRecords = JournalUtil.buildIncomingRecordsByRecords(List.of(record)); + + assertThat(incomingRecords).hasSize(1); + assertThat(incomingRecords.get(0).getId()).isEqualTo(record.getId()); + assertThat(incomingRecords.get(0).getJobExecutionId()).isEqualTo(snapshotId); + assertThat(incomingRecords.get(0).getOrder()).isEqualTo(record.getOrder()); + assertThat(incomingRecords.get(0).getRawRecordContent()).isEqualTo("rawRecord"); + assertThat(incomingRecords.get(0).getRecordType()).isEqualTo(IncomingRecord.RecordType.MARC_BIB); + assertThat(incomingRecords.get(0).getParsedRecordContent()).isEqualTo("parsedRecord"); + } + @Test public void shouldBuildJournalRecordForInstance() throws JournalRecordMapperException { String instanceId = UUID.randomUUID().toString(); diff --git a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java index 27295edec..2c72f9e8d 100644 --- a/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java +++ b/mod-source-record-manager-server/src/test/java/org/folio/services/RecordProcessedEventHandlingServiceImplTest.java @@ -136,16 +136,18 @@ public class RecordProcessedEventHandlingServiceImplTest extends AbstractRestTes @Spy @InjectMocks private FieldModificationServiceImpl fieldModificationService; + @Spy + @InjectMocks + private IncomingRecordServiceImpl incomingRecordService; + @Spy + @InjectMocks + private JournalRecordServiceImpl journalRecordService; @Spy RecordsPublishingService recordsPublishingService; - private MappingRuleCache mappingRuleCache; - private ChangeEngineService changeEngineService; private ChunkProcessingService chunkProcessingService; private RecordProcessedEventHandlingServiceImpl recordProcessedEventHandlingService; private OkapiConnectionParams params; - private MappingMetadataService mappingMetadataService; - private KafkaConfig kafkaConfig; private InitJobExecutionsRqDto initJobExecutionsRqDto = new InitJobExecutionsRqDto() .withFiles(Collections.singletonList(new File().withName("importBib1.bib"))) @@ -166,17 +168,10 @@ public class RecordProcessedEventHandlingServiceImplTest extends AbstractRestTes .withId(jobProfile.getId()) .withDataType(DataType.MARC); - private final JsonObject userResponse = new JsonObject() - .put("users", - new JsonArray().add(new JsonObject() - .put("username", "diku_admin") - .put("personal", new JsonObject().put("firstName", "DIKU").put("lastName", "ADMINISTRATOR")))) - .put("totalRecords", 1); - @Before public void setUp() throws IOException { String[] hostAndPort = kafkaCluster.getBrokerList().split(":"); - kafkaConfig = KafkaConfig.builder() + KafkaConfig kafkaConfig = KafkaConfig.builder() .kafkaHost(hostAndPort[0]) .kafkaPort(hostAndPort[1]) .envId(KAFKA_ENV_ID) @@ -185,15 +180,16 @@ public void setUp() throws IOException { MockitoAnnotations.openMocks(this); - mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx); + MappingRuleCache mappingRuleCache = new MappingRuleCache(mappingRuleDao, vertx); marcRecordAnalyzer = new MarcRecordAnalyzer(); mappingRuleService = new MappingRuleServiceImpl(mappingRuleDao, mappingRuleCache); mappingRuleDao = when(mock(MappingRuleDaoImpl.class).get(any(), anyString())).thenReturn(Future.succeededFuture(Optional.of(new JsonObject(rules)))).getMock(); mappingParametersProvider = when(mock(MappingParametersProvider.class).get(anyString(), any(OkapiConnectionParams.class))).thenReturn(Future.succeededFuture(new MappingParameters())).getMock(); - mappingMetadataService = new MappingMetadataServiceImpl(mappingParametersProvider, mappingRuleService, mappingRulesSnapshotDao, mappingParamsSnapshotDao); + MappingMetadataService mappingMetadataService = new MappingMetadataServiceImpl(mappingParametersProvider, mappingRuleService, mappingRulesSnapshotDao, mappingParamsSnapshotDao); JobProfileSnapshotValidationServiceImpl jobProfileSnapshotValidationService = new JobProfileSnapshotValidationServiceImpl(); - changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, hrIdFieldService , recordsPublishingService, mappingMetadataService, jobProfileSnapshotValidationService, kafkaConfig, - fieldModificationService); + ChangeEngineService changeEngineService = new ChangeEngineServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, marcRecordAnalyzer, + hrIdFieldService, recordsPublishingService, mappingMetadataService, jobProfileSnapshotValidationService, kafkaConfig, fieldModificationService, + incomingRecordService, journalRecordService); ReflectionTestUtils.setField(changeEngineService, "maxDistributionNum", 10); ReflectionTestUtils.setField(changeEngineService, "batchSize", 100); chunkProcessingService = new EventDrivenChunkProcessingServiceImpl(jobExecutionSourceChunkDao, jobExecutionService, changeEngineService, jobExecutionProgressService); diff --git a/ramls/change-manager.raml b/ramls/change-manager.raml index f9ad9df07..fdd3c292e 100644 --- a/ramls/change-manager.raml +++ b/ramls/change-manager.raml @@ -33,6 +33,7 @@ types: sourceRecordState: !include sourceRecordState.json deleteJobExecutionsReq: !include raml-storage/schemas/mod-source-record-manager/deleteJobExecutionsReq.json deleteJobExecutionsResp: !include raml-storage/schemas/mod-source-record-manager/deleteJobExecutionsResp.json + incomingRecord: !include raml-storage/schemas/mod-source-record-manager/incomingRecord.json traits: validate: !include raml-storage/raml-util/traits/validation.raml diff --git a/ramls/raml-storage b/ramls/raml-storage index 52bb24f20..240b79049 160000 --- a/ramls/raml-storage +++ b/ramls/raml-storage @@ -1 +1 @@ -Subproject commit 52bb24f20ff1c34d76eaf509d81fec96e794cc9f +Subproject commit 240b79049e36b5a8131de5b8fe4b50147cf2f58f