From 43120f479f9640165b412128cace7c05bc5d56f5 Mon Sep 17 00:00:00 2001 From: Mukhiddin Yusupov <133661057+mukhiddin-yusuf@users.noreply.github.com> Date: Tue, 3 Dec 2024 21:00:14 +0500 Subject: [PATCH] MODSOURCE-817: Fix data consistency in handling and updating Marc Bib records for links.instance-authority event (#652) MODSOURCE-817: Fix data consistency in handling and updating Marc Bib records for links.instance-authority event - make the handling of events for instance-authority links so that no Marc Bib modifications from are lost due to concurrent nature of handling events - make sure the event of the first in a sequence of updates sent by the producer and then received by the consumer is handled before with persisting the changes of related bibs than the events produced/consumed later Closes: MODSOURCE-817 --- NEWS.md | 3 +- .../AuthorityLinkChunkKafkaHandler.java | 208 +++--- .../main/java/org/folio/dao/RecordDao.java | 18 +- .../java/org/folio/dao/RecordDaoImpl.java | 606 ++++++++++-------- .../folio/dao/util/ErrorRecordDaoUtil.java | 30 +- .../folio/dao/util/ParsedRecordDaoUtil.java | 34 +- .../org/folio/dao/util/RawRecordDaoUtil.java | 29 +- .../org/folio/dao/util/RecordDaoUtil.java | 37 +- .../org/folio/dao/util/RecordMappers.java | 39 ++ .../org/folio/services/RecordService.java | 22 +- .../org/folio/services/RecordServiceImpl.java | 47 +- .../entities/RecordsModifierOperator.java | 24 + .../exceptions/RecordUpdateException.java | 4 + .../services/util/EventHandlingUtil.java | 5 +- .../org/folio/services/RecordServiceTest.java | 66 +- ...arsedRecordChunkConsumersVerticleTest.java | 10 +- 16 files changed, 717 insertions(+), 465 deletions(-) create mode 100644 mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java create mode 100644 mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java diff --git a/NEWS.md b/NEWS.md index 98237976d..7c531e168 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,5 @@ -## 2025-xx-xx 5.10.0-SNAPSHOT +## 2025-XX-XX 5.10.0-SNAPSHOT +* [MODSOURCE-817](https://folio-org.atlassian.net/browse/MODSOURCE-817) Fix data consistency in handling and updating Marc Bib records for links.instance-authority event * [MODSOURCE-816](https://folio-org.atlassian.net/browse/MODSOURCE-816) [RRT] Optimize execution plan for streaming SQL * [MODSOURCE-824](https://folio-org.atlassian.net/browse/MODSOURCE-824) Endpoint /batch/parsed-records/fetch does not return deleted records diff --git a/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java b/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java index 3c2fea647..83c9c85f6 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/consumers/AuthorityLinkChunkKafkaHandler.java @@ -1,6 +1,5 @@ package org.folio.consumers; -import static java.util.Collections.emptyList; import static java.util.Objects.nonNull; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.EMPTY; @@ -34,8 +33,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.dao.util.IdType; -import org.folio.dao.util.RecordDaoUtil; import org.folio.dao.util.RecordType; import org.folio.kafka.AsyncRecordHandler; import org.folio.kafka.KafkaConfig; @@ -54,6 +51,7 @@ import org.folio.rest.jaxrs.model.UpdateTarget; import org.folio.services.RecordService; import org.folio.services.SnapshotService; +import org.folio.services.entities.RecordsModifierOperator; import org.folio.services.handlers.links.DeleteLinkProcessor; import org.folio.services.handlers.links.LinkProcessor; import org.folio.services.handlers.links.UpdateLinkProcessor; @@ -86,21 +84,27 @@ public AuthorityLinkChunkKafkaHandler(RecordService recordService, KafkaConfig k @Override public Future handle(KafkaConsumerRecord consumerRecord) { LOGGER.trace("handle:: Handling kafka record: {}", consumerRecord); + LOGGER.info("handle:: Start Handling kafka record"); var userId = extractHeaderValue(XOkapiHeaders.USER_ID, consumerRecord.headers()); - return mapToEvent(consumerRecord) + + var result = mapToEvent(consumerRecord) .compose(this::createSnapshot) - .compose(event -> retrieveRecords(event, event.getTenant()) - .compose(recordCollection -> mapRecordFieldsChanges(event, recordCollection, userId)) - .compose(recordCollection -> recordService.saveRecords(recordCollection, - toOkapiHeaders(consumerRecord.headers(), event.getTenant()))) - .map(recordsBatchResponse -> sendReports(recordsBatchResponse, event, consumerRecord.headers())) - .map(recordsBatchResponse -> mapRecordsToBibUpdateEvents(recordsBatchResponse, event)) - .compose(marcBibUpdates -> sendEvents(marcBibUpdates, event, consumerRecord)) - ).recover(th -> { - LOGGER.error("Failed to handle {} event", MARC_BIB.moduleTopicName(), th); - return Future.failedFuture(th); - } - ); + .compose(linksUpdate -> { + var instanceIds = getBibRecordExternalIds(linksUpdate); + var okapiHeaders = toOkapiHeaders(consumerRecord.headers(), linksUpdate.getTenant()); + RecordsModifierOperator recordsModifier = recordsCollection -> + this.mapRecordFieldsChanges(linksUpdate, recordsCollection, userId); + + return recordService.saveRecordsByExternalIds(instanceIds, RecordType.MARC_BIB, recordsModifier, okapiHeaders) + .compose(recordsBatchResponse -> { + sendReports(recordsBatchResponse, linksUpdate, consumerRecord.headers()); + var marcBibUpdateStats = mapRecordsToBibUpdateEvents(recordsBatchResponse, linksUpdate); + return sendEvents(marcBibUpdateStats, linksUpdate, consumerRecord); + }); + }); + + LOGGER.info("handle:: Finish Handling kafka record"); + return result; } private Future mapToEvent(KafkaConsumerRecord consumerRecord) { @@ -115,98 +119,103 @@ private Future mapToEvent(KafkaConsumerRecord retrieveRecords(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, String tenantId) { - LOGGER.trace("Retrieving bibs for jobId {}, authorityId {}", - bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - var instanceIds = bibAuthorityLinksUpdate.getUpdateTargets().stream() + private Future createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { + var now = new Date(); + var snapshot = new Snapshot() + .withJobExecutionId(bibAuthorityLinksUpdate.getJobId()) + .withStatus(Snapshot.Status.COMMITTED) + .withProcessingStartedDate(now) + .withMetadata(new Metadata() + .withCreatedDate(now) + .withUpdatedDate(now)); + + return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant()) + .map(result -> bibAuthorityLinksUpdate); + } + + private List getBibRecordExternalIds(BibAuthorityLinksUpdate linksUpdate) { + return linksUpdate.getUpdateTargets().stream() .flatMap(updateTarget -> updateTarget.getLinks().stream() .map(Link::getInstanceId)) .distinct() - .collect(Collectors.toList()); - - var condition = RecordDaoUtil.getExternalIdsCondition(instanceIds, IdType.INSTANCE) - .and(RecordDaoUtil.filterRecordByDeleted(false)); - return recordService.getRecords(condition, RecordType.MARC_BIB, emptyList(), 0, instanceIds.size(), tenantId); + .toList(); } - private Future mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, - RecordCollection recordCollection, String userId) { + private RecordCollection mapRecordFieldsChanges(BibAuthorityLinksUpdate bibAuthorityLinksUpdate, + RecordCollection recordCollection, String userId) { LOGGER.debug("Retrieved {} bib records for jobId {}, authorityId {}", recordCollection.getTotalRecords(), bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - return getLinkProcessorForEvent(bibAuthorityLinksUpdate).map(linkProcessor -> { - recordCollection.getRecords().forEach(bibRecord -> { - var newRecordId = UUID.randomUUID().toString(); - var instanceId = bibRecord.getExternalIdsHolder().getInstanceId(); - var parsedRecord = bibRecord.getParsedRecord(); - var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord); - var fields = new LinkedList<>(parsedRecordContent.getDataFields()); - - var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId); - var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream() - .filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField())) - .collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields)); - - fields.forEach(field -> { - if (!updateTargetFieldCodes.contains(field.getTag())) { - return; - } - - var subfields = field.getSubfields(); - if (isEmpty(subfields)) { - return; - } - - var authorityId = getAuthorityIdSubfield(subfields); - if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) { - return; - } - - var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields); - LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}", - bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(), - instanceId, field.getTag(), subfields, newSubfields); - - var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2()); - newSubfields.forEach(newField::addSubfield); - - var dataFields = parsedRecordContent.getDataFields(); - var fieldPosition = dataFields.indexOf(field); - dataFields.remove(fieldPosition); - dataFields.add(fieldPosition, newField); - }); - - parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent)); - parsedRecord.setFormattedContent(EMPTY); - parsedRecord.setId(newRecordId); - bibRecord.setId(newRecordId); - bibRecord.getRawRecord().setId(newRecordId); - bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId()); - setUpdatedBy(bibRecord, userId); + var linkProcessor = getLinkProcessorForEvent(bibAuthorityLinksUpdate); + recordCollection.getRecords().forEach(bibRecord -> { + var newRecordId = UUID.randomUUID().toString(); + var instanceId = bibRecord.getExternalIdsHolder().getInstanceId(); + var parsedRecord = bibRecord.getParsedRecord(); + var parsedRecordContent = readParsedContentToObjectRepresentation(bibRecord); + var fields = new LinkedList<>(parsedRecordContent.getDataFields()); + + var updateTargetFieldCodes = extractUpdateTargetFieldCodesForInstance(bibAuthorityLinksUpdate, instanceId); + var subfieldChanges = bibAuthorityLinksUpdate.getSubfieldsChanges().stream() + .filter(subfieldsChange -> updateTargetFieldCodes.contains(subfieldsChange.getField())) + .collect(Collectors.toMap(SubfieldsChange::getField, SubfieldsChange::getSubfields)); + + fields.forEach(field -> { + if (!updateTargetFieldCodes.contains(field.getTag())) { + return; + } + + var subfields = field.getSubfields(); + if (isEmpty(subfields)) { + return; + } + + var authorityId = getAuthorityIdSubfield(subfields); + if (authorityId.isEmpty() || !bibAuthorityLinksUpdate.getAuthorityId().equals(authorityId.get().getData())) { + return; + } + + var newSubfields = linkProcessor.process(field.getTag(), subfieldChanges.get(field.getTag()), subfields); + LOGGER.trace("JobId {}, AuthorityId {}, instanceId {}, field {}, old subfields: {}, new subfields: {}", + bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId(), + instanceId, field.getTag(), subfields, newSubfields); + + var newField = new DataFieldImpl(field.getTag(), field.getIndicator1(), field.getIndicator2()); + newSubfields.forEach(newField::addSubfield); + + var dataFields = parsedRecordContent.getDataFields(); + var fieldPosition = dataFields.indexOf(field); + dataFields.remove(fieldPosition); + dataFields.add(fieldPosition, newField); }); - return recordCollection; + parsedRecord.setContent(mapObjectRepresentationToParsedContentJsonString(parsedRecordContent)); + parsedRecord.setFormattedContent(EMPTY); + parsedRecord.setId(newRecordId); + bibRecord.setId(newRecordId); + bibRecord.getRawRecord().setId(newRecordId); + bibRecord.setSnapshotId(bibAuthorityLinksUpdate.getJobId()); + setUpdatedBy(bibRecord, userId); }); + return recordCollection; } - private Future getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { + private LinkProcessor getLinkProcessorForEvent(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { var eventType = bibAuthorityLinksUpdate.getType(); switch (eventType) { - case DELETE: { + case DELETE -> { LOGGER.debug("Precessing DELETE event for jobId {}, authorityId {}", bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - return Future.succeededFuture(new DeleteLinkProcessor()); + return new DeleteLinkProcessor(); } - case UPDATE: { + case UPDATE -> { LOGGER.debug("Precessing UPDATE event for jobId {}, authorityId {}", bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()); - return Future.succeededFuture(new UpdateLinkProcessor()); + return new UpdateLinkProcessor(); } - default: { - return Future.failedFuture(new IllegalArgumentException( + default -> + throw new IllegalArgumentException( String.format("Unsupported event type: %s for jobId %s, authorityId %s", - eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId()))); - } + eventType, bibAuthorityLinksUpdate.getJobId(), bibAuthorityLinksUpdate.getAuthorityId())); } } @@ -216,7 +225,7 @@ private List extractUpdateTargetFieldCodesForInstance(BibAuthorityLinksU .filter(updateTarget -> updateTarget.getLinks().stream() .anyMatch(link -> link.getInstanceId().equals(instanceId))) .map(UpdateTarget::getField) - .collect(Collectors.toList()); + .toList(); } private List mapRecordsToBibUpdateEvents(RecordsBatchResponse batchResponse, @@ -252,7 +261,7 @@ private List toMarcBibUpdateEvents(RecordsBatchResponse batchResp .withTs(bibAuthorityLinksUpdate.getTs()) .withRecord(bibRecord); }) - .collect(Collectors.toList()); + .toList(); } private List toFailedLinkUpdateReports(List errorRecords, @@ -273,21 +282,7 @@ private List toFailedLinkUpdateReports(List errorRecor .withFailCause(bibRecord.getErrorRecord().getDescription()) .withStatus(FAIL); }) - .collect(Collectors.toList()); - } - - private Future createSnapshot(BibAuthorityLinksUpdate bibAuthorityLinksUpdate) { - var now = new Date(); - var snapshot = new Snapshot() - .withJobExecutionId(bibAuthorityLinksUpdate.getJobId()) - .withStatus(Snapshot.Status.COMMITTED) - .withProcessingStartedDate(now) - .withMetadata(new Metadata() - .withCreatedDate(now) - .withUpdatedDate(now)); - - return snapshotService.saveSnapshot(snapshot, bibAuthorityLinksUpdate.getTenant()) - .map(result -> bibAuthorityLinksUpdate); + .toList(); } private void setUpdatedBy(Record changedRecord, String userId) { @@ -300,8 +295,8 @@ private void setUpdatedBy(Record changedRecord, String userId) { } } - private RecordsBatchResponse sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event, - List headers) { + private void sendReports(RecordsBatchResponse batchResponse, BibAuthorityLinksUpdate event, + List headers) { var errorRecords = getErrorRecords(batchResponse); if (!errorRecords.isEmpty()) { LOGGER.info("Errors detected. Sending {} linking reports for jobId {}, authorityId {}", @@ -310,7 +305,6 @@ private RecordsBatchResponse sendReports(RecordsBatchResponse batchResponse, Bib toFailedLinkUpdateReports(errorRecords, event).forEach(report -> sendEventToKafka(LINKS_STATS, report.getTenant(), report.getJobId(), report, headers)); } - return batchResponse; } private Future sendEvents(List marcBibUpdateEvents, BibAuthorityLinksUpdate event, @@ -368,7 +362,7 @@ private KafkaProducerRecord createKafkaProducerRecord(KafkaTopic private List getErrorRecords(RecordsBatchResponse batchResponse) { return batchResponse.getRecords().stream() .filter(marcRecord -> nonNull(marcRecord.getErrorRecord())) - .collect(Collectors.toList()); + .toList(); } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index 7ca5ec355..34381eb0a 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Optional; import java.util.function.Function; + import net.sf.jsqlparser.JSQLParserException; import org.folio.dao.util.IdType; import org.folio.dao.util.MatchField; @@ -26,6 +27,7 @@ import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection; import org.folio.rest.jooq.enums.RecordState; import org.folio.services.RecordSearchParameters; +import org.folio.services.entities.RecordsModifierOperator; import org.folio.services.util.TypeConnection; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; @@ -209,7 +211,7 @@ Future getMatchedRecordsIdentifiers(MatchField mat Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, Map okapiHeaders); /** - * Saves {@link RecordCollection} to the db + * Saves {@link RecordCollection} to the db. * * @param recordCollection Record collection to save * @param okapiHeaders okapi headers @@ -217,6 +219,20 @@ Future getMatchedRecordsIdentifiers(MatchField mat */ Future saveRecords(RecordCollection recordCollection, Map okapiHeaders); + /** + * Saves {@link RecordCollection} to the db. + * + * @param externalIds external relation ids + * @param recordType record type + * @param recordsModifier records collection modifier operator + * @param okapiHeaders okapi headers + * @return future with saved {@link RecordsBatchResponse} + */ + Future saveRecordsByExternalIds(List externalIds, + RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders); + /** * Updates {{@link Record} in the db * diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index cfb22242d..b4b7271ca 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -14,6 +14,7 @@ import static org.folio.dao.util.RecordDaoUtil.filterRecordByType; import static org.folio.dao.util.RecordDaoUtil.getExternalHrid; import static org.folio.dao.util.RecordDaoUtil.getExternalId; +import static org.folio.dao.util.RecordDaoUtil.getExternalIdType; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; import static org.folio.rest.jooq.Tables.ERROR_RECORDS_LB; @@ -47,7 +48,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.reactivex.pgclient.PgPool; -import io.vertx.reactivex.sqlclient.SqlConnection; import io.vertx.sqlclient.Row; import java.sql.Connection; @@ -65,7 +65,6 @@ import java.util.Set; import java.util.UUID; import java.util.function.Function; -import java.util.stream.Collectors; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; @@ -76,6 +75,7 @@ import net.sf.jsqlparser.expression.operators.conditional.AndExpression; import net.sf.jsqlparser.expression.operators.conditional.OrExpression; import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.text.StrSubstitutor; import org.apache.commons.lang3.StringUtils; @@ -120,15 +120,17 @@ import org.folio.rest.jooq.tables.records.SnapshotsLbRecord; import org.folio.services.RecordSearchParameters; import org.folio.services.domainevent.RecordDomainEventPublisher; +import org.folio.services.entities.RecordsModifierOperator; +import org.folio.services.exceptions.RecordUpdateException; import org.folio.services.util.TypeConnection; import org.folio.services.util.parser.ParseFieldsResult; import org.folio.services.util.parser.ParseLeaderResult; import org.jooq.CommonTableExpression; import org.jooq.Condition; +import org.jooq.Configuration; import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.JSONB; -import org.jooq.LoaderError; import org.jooq.Name; import org.jooq.OrderField; import org.jooq.Record1; @@ -264,32 +266,10 @@ public Future getRecords(Condition condition, RecordType recor } @Override - public Future getRecords(Condition condition, RecordType recordType, Collection> orderFields, int offset, int limit, boolean returnTotalCount, String tenantId) { - Name cte = name(CTE); - Name prt = name(recordType.getTableName()); - return getQueryExecutor(tenantId).transaction(txQE -> txQE.query(dsl -> { - ResultQuery> countQuery; - if (returnTotalCount) { - countQuery = dsl.selectCount() - .from(RECORDS_LB) - .where(condition.and(recordType.getRecordImplicitCondition())); - } else { - countQuery = select(inline(null, Integer.class).as(COUNT)); - } - - return dsl - .with(cte.as(countQuery)) - .select(getAllRecordFieldsWithCount(prt)) - .from(RECORDS_LB) - .leftJoin(table(prt)).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, prt, name(ID)))) - .leftJoin(RAW_RECORDS_LB).on(RECORDS_LB.ID.eq(RAW_RECORDS_LB.ID)) - .leftJoin(ERROR_RECORDS_LB).on(RECORDS_LB.ID.eq(ERROR_RECORDS_LB.ID)) - .rightJoin(dsl.select().from(table(cte))).on(trueCondition()) - .where(condition.and(recordType.getRecordImplicitCondition())) - .orderBy(orderFields) - .offset(offset) - .limit(limit > 0 ? limit : DEFAULT_LIMIT_FOR_GET_RECORDS); - } + public Future getRecords(Condition condition, RecordType recordType, Collection> orderFields, + int offset, int limit, boolean returnTotalCount, String tenantId) { + return getQueryExecutor(tenantId).transaction(txQE -> txQE.query(dsl -> + readRecords(dsl, condition, recordType, offset, limit, returnTotalCount, orderFields) )).map(queryResult -> toRecordCollectionWithLimitCheck(queryResult, limit)); } @@ -387,7 +367,11 @@ public Future> getMatchedRecordsWithoutIndexersVersionUsage(MatchFi ) .offset(offset) .limit(limit > 0 ? limit : DEFAULT_LIMIT_FOR_GET_RECORDS) - )).map(queryResult -> queryResult.stream().map(res -> asRow(res.unwrap())).map(this::toRecord).collect(Collectors.toList())); + )).map(queryResult -> queryResult.stream() + .map(res -> asRow(res.unwrap())) + .map(this::toRecord) + .toList() + ); } private Condition getMatchedFieldCondition(MatchField matchedField, Filter.ComparisonPartType comparisonPartType, String partition) { @@ -455,7 +439,8 @@ private String getValueInSqlFormat(Value value) { } if (Value.ValueType.LIST.equals(value.getType())) { List listOfValues = ((ListValue) value).getValue().stream() - .map(v -> format(VALUE_IN_SINGLE_QUOTES, v)).collect(Collectors.toList()); + .map(v -> format(VALUE_IN_SINGLE_QUOTES, v)) + .toList(); return StringUtils.join(listOfValues, ", "); } return StringUtils.EMPTY; @@ -640,44 +625,6 @@ private void appendWhere(SelectJoinStep step, ParseLeaderResult parseLeaderResul .and(RECORDS_LB.EXTERNAL_ID.isNotNull()); } - private Flowable streamMarcRecordIdsWithoutIndexersVersionUsage(SqlConnection conn, ParseLeaderResult parseLeaderResult, - ParseFieldsResult parseFieldsResult, - RecordSearchParameters searchParameters) { - return conn.rxPrepare(getAlternativeQuery(parseLeaderResult, parseFieldsResult, searchParameters)) - .flatMapPublisher(pq -> pq.createStream(10000).toFlowable()); - } - - private String getAlternativeQuery(ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult, RecordSearchParameters searchParameters) { - SelectJoinStep> searchQuery = selectDistinct(RECORDS_LB.EXTERNAL_ID).from(RECORDS_LB); - appendJoinAlternative(searchQuery, parseLeaderResult, parseFieldsResult); - appendWhere(searchQuery, parseLeaderResult, parseFieldsResult, searchParameters); - if (searchParameters.getOffset() != null) { - searchQuery.offset(searchParameters.getOffset()); - } - if (searchParameters.getLimit() != null) { - searchQuery.limit(searchParameters.getLimit()); - } - - SelectJoinStep> countQuery = DSL.select(countDistinct(RECORDS_LB.EXTERNAL_ID)).from(RECORDS_LB); - appendJoinAlternative(countQuery, parseLeaderResult, parseFieldsResult); - appendWhere(countQuery, parseLeaderResult, parseFieldsResult, searchParameters); - - return DSL.select().from(searchQuery).rightJoin(countQuery).on(DSL.trueCondition()).getSQL(ParamType.INLINED); - } - - private void appendJoinAlternative(SelectJoinStep selectJoinStep, ParseLeaderResult parseLeaderResult, ParseFieldsResult parseFieldsResult) { - if (parseLeaderResult.isEnabled()) { - Table marcIndexersLeader = table(name("marc_indexers_leader")); - selectJoinStep.innerJoin(marcIndexersLeader).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersLeader, name(MARC_ID)))); - } - if (parseFieldsResult.isEnabled()) { - parseFieldsResult.getFieldsToJoin().forEach(fieldToJoin -> { - Table marcIndexers = table(name(MARC_INDEXERS_PARTITION_PREFIX + fieldToJoin)).as("i" + fieldToJoin); - selectJoinStep.innerJoin(marcIndexers).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexers, name(MARC_ID)))); - }); - } - } - @Override public Future getMatchedRecordsIdentifiers(MatchField matchedField, Filter.ComparisonPartType comparisonPartType, boolean returnTotalRecords, TypeConnection typeConnection, @@ -789,10 +736,10 @@ public Future saveRecord(Record record, Map okapiHeaders } @Override - public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record record, + public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Record recordDto, Map okapiHeaders) { - LOG.trace("saveRecord:: Saving {} record {}", record.getRecordType(), record.getId()); - return insertOrUpdateRecord(txQE, record) + LOG.trace("saveRecord:: Saving {} record {}", recordDto.getRecordType(), recordDto.getId()); + return insertOrUpdateRecord(txQE, recordDto) .onSuccess(created -> recordDomainEventPublisher.publishRecordCreated(created, okapiHeaders)); } @@ -800,212 +747,85 @@ public Future saveRecord(ReactiveClassicGenericQueryExecutor txQE, Recor public Future saveRecords(RecordCollection recordCollection, Map okapiHeaders) { var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER); logRecordCollection("saveRecords:: Saving", recordCollection, tenantId); + var firstRecord = recordCollection.getRecords().iterator().next(); + var snapshotId = firstRecord.getSnapshotId(); + var recordType = RecordType.valueOf(firstRecord.getRecordType().name()); Promise finalPromise = Promise.promise(); Context context = Vertx.currentContext(); - if (context == null) return Future.failedFuture("saveRecords must be executed by a Vertx thread"); - context.owner().executeBlocking(promise -> { - Set matchedIds = new HashSet<>(); - Set snapshotIds = new HashSet<>(); - Set recordTypes = new HashSet<>(); - - List dbRecords = new ArrayList<>(); - List dbRawRecords = new ArrayList<>(); - List> dbParsedRecords = new ArrayList<>(); - List dbErrorRecords = new ArrayList<>(); - - List errorMessages = new ArrayList<>(); - recordCollection.getRecords() - .stream() - .map(RecordDaoUtil::ensureRecordHasId) - .map(RecordDaoUtil::ensureRecordHasMatchedId) - .map(RecordDaoUtil::ensureRecordHasSuppressDiscovery) - .map(RecordDaoUtil::ensureRecordForeignKeys) - .forEach(record -> { - // collect unique matched ids to query to determine generation - matchedIds.add(UUID.fromString(record.getMatchedId())); - - // make sure only one snapshot id - snapshotIds.add(record.getSnapshotId()); - if (snapshotIds.size() > 1) { - throw new BadRequestException("Batch record collection only supports single snapshot"); - } - - if (Objects.nonNull(record.getRecordType())) { - recordTypes.add(record.getRecordType().name()); - } else { - throw new BadRequestException(StringUtils.defaultIfEmpty(record.getErrorRecord().getDescription(), String.format("Record with id %s has not record type", record.getId()))); - } + if(context == null) { + return Future.failedFuture("saveRecords must be executed by a Vertx thread"); + } - // make sure only one record type - if (recordTypes.size() > 1) { - throw new BadRequestException("Batch record collection only supports single record type"); - } + context.owner().executeBlocking( + () -> saveRecords(recordCollection, snapshotId, recordType, tenantId), + false, + r -> { + if (r.failed()) { + LOG.warn("saveRecords:: Error during batch record save", r.cause()); + finalPromise.fail(r.cause()); + } else { + LOG.debug("saveRecords:: batch record save was successful"); + finalPromise.complete(r.result()); + } + }); - // if record has parsed record, validate by attempting format - if (Objects.nonNull(record.getParsedRecord())) { - try { - RecordType recordType = toRecordType(record.getRecordType().name()); - recordType.formatRecord(record); - Record2 dbParsedRecord = recordType.toDatabaseRecord2(record.getParsedRecord()); - dbParsedRecords.add(dbParsedRecord); - } catch (Exception e) { - // create error record and remove from record - Object content = Objects.nonNull(record.getParsedRecord()) - ? record.getParsedRecord().getContent() - : null; - ErrorRecord errorRecord = new ErrorRecord() - .withId(record.getId()) - .withDescription(e.getMessage()) - .withContent(content); - errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); - record.withErrorRecord(errorRecord) - .withParsedRecord(null) - .withLeaderRecordStatus(null); - } - } - if (Objects.nonNull(record.getRawRecord())) { - dbRawRecords.add(RawRecordDaoUtil.toDatabaseRawRecord(record.getRawRecord())); - } - if (Objects.nonNull(record.getErrorRecord())) { - dbErrorRecords.add(ErrorRecordDaoUtil.toDatabaseErrorRecord(record.getErrorRecord())); - } - dbRecords.add(RecordDaoUtil.toDatabaseRecord(record)); - }); + return finalPromise.future() + .onSuccess(response -> response.getRecords() + .forEach(r -> recordDomainEventPublisher.publishRecordCreated(r, okapiHeaders)) + ); + } - UUID snapshotId = UUID.fromString(snapshotIds.stream().findFirst().orElseThrow()); + @Override + public Future saveRecordsByExternalIds(List externalIds, + RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders) { + var condition = RecordDaoUtil.getExternalIdsCondition(externalIds, + getExternalIdType(Record.RecordType.fromValue(recordType.name()))) + .and(RecordDaoUtil.filterRecordByDeleted(false)); - RecordType recordType = toRecordType(recordTypes.stream().findFirst().orElseThrow()); + var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER); + Promise finalPromise = Promise.promise(); + Context context = Vertx.currentContext(); + if(context == null) { + return Future.failedFuture("saveRecordsByExternalIds:: operation must be executed by a Vertx thread"); + } + context.owner().executeBlocking( + () -> { + final RecordCollection recordCollection; try (Connection connection = getConnection(tenantId)) { - DSL.using(connection).transaction(ctx -> { + recordCollection = DSL.using(connection).transactionResult(ctx -> { DSLContext dsl = DSL.using(ctx); - - // validate snapshot - Optional snapshot = DSL.using(ctx).selectFrom(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(snapshotId)) - .fetchOptional(); - if (snapshot.isPresent()) { - if (Objects.isNull(snapshot.get().getProcessingStartedDate())) { - throw new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.get().getStatus())); - } - } else { - throw new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, snapshotId)); - } - - List ids = new ArrayList<>(); - Map matchedGenerations = new HashMap<>(); - - // lookup latest generation by matched id and committed snapshot updated before current snapshot - dsl.select(RECORDS_LB.MATCHED_ID, RECORDS_LB.ID, RECORDS_LB.GENERATION) - .distinctOn(RECORDS_LB.MATCHED_ID) - .from(RECORDS_LB) - .innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID)) - .where(RECORDS_LB.MATCHED_ID.in(matchedIds) - .and(SNAPSHOTS_LB.STATUS.in(JobExecutionStatus.COMMITTED, JobExecutionStatus.ERROR, JobExecutionStatus.CANCELLED)) - .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl - .select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) - .from(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(snapshotId))))) - .orderBy(RECORDS_LB.MATCHED_ID.asc(), RECORDS_LB.GENERATION.desc()) - .fetchStream().forEach(r -> { - UUID id = r.get(RECORDS_LB.ID); - UUID matchedId = r.get(RECORDS_LB.MATCHED_ID); - int generation = r.get(RECORDS_LB.GENERATION); - ids.add(id); - matchedGenerations.put(matchedId, generation); - }); - - // update matching records state - if (!ids.isEmpty()) { - dsl.update(RECORDS_LB) - .set(RECORDS_LB.STATE, RecordState.OLD) - .where(RECORDS_LB.ID.in(ids)) - .execute(); - } - - // batch insert records updating generation if required - List recordsLoadingErrors = dsl.loadInto(RECORDS_LB) - .batchAfter(1000) - .bulkAfter(500) - .commitAfter(1000) - .onErrorAbort() - .loadRecords(dbRecords.stream().map(record -> { - Integer generation = matchedGenerations.get(record.getMatchedId()); - if (Objects.nonNull(generation)) { - record.setGeneration(generation + 1); - } else if (Objects.isNull(record.getGeneration())) { - record.setGeneration(0); - } - return record; - }).collect(Collectors.toList())) - .fieldsCorresponding() - .execute() - .errors(); - - recordsLoadingErrors.forEach(error -> { - if (error.exception().sqlState().equals(UNIQUE_VIOLATION_SQL_STATE)) { - throw new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record"); - } - LOG.warn("saveRecords:: Error occurred on batch execution: {}", error.exception().getCause().getMessage()); - LOG.debug("saveRecords:: Failed to execute statement from batch: {}", error.query()); - }); - - // batch insert raw records - dsl.loadInto(RAW_RECORDS_LB) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbRawRecords) - .fieldsCorresponding() - .execute(); - - // batch insert parsed records - recordType.toLoaderOptionsStep(dsl) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbParsedRecords) - .fieldsCorresponding() - .execute(); - - if (!dbErrorRecords.isEmpty()) { - // batch insert error records - dsl.loadInto(ERROR_RECORDS_LB) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbErrorRecords) - .fieldsCorresponding() - .execute(); - } - - promise.complete(new RecordsBatchResponse() - .withRecords(recordCollection.getRecords()) - .withTotalRecords(recordCollection.getRecords().size()) - .withErrorMessages(errorMessages)); + var queryResult = readRecords(dsl, condition, recordType, 0, externalIds.size(), false, emptyList()); + var records = queryResult.fetch(this::toRecord); + return new RecordCollection().withRecords(records).withTotalRecords(records.size()); }); - } catch (DuplicateEventException e) { - LOG.info("saveRecords:: Skipped saving records due to duplicate event: {}", e.getMessage()); - promise.fail(e); } catch (SQLException | DataAccessException e) { - LOG.warn("saveRecords:: Failed to save records", e); - promise.fail(e.getCause()); + LOG.warn("saveRecordsByExternalIds:: Failed to read records", e); + throw e; + } + + if (recordCollection == null || CollectionUtils.isEmpty(recordCollection.getRecords())) { + LOG.warn("saveRecordsByExternalIds:: No records returned from the fetch query"); + return new RecordsBatchResponse().withTotalRecords(0); } + + var modifiedRecords = recordsModifier.apply(recordCollection); + var snapshotId = modifiedRecords.getRecords().iterator().next().getSnapshotId(); + return saveRecords(modifiedRecords, snapshotId, recordType, tenantId); }, - false, r -> { if (r.failed()) { - LOG.warn("saveRecords:: Error during batch record save", r.cause()); + LOG.warn("saveRecordsByExternalIds:: Error during batch record save", r.cause()); finalPromise.fail(r.cause()); } else { - LOG.debug("saveRecords:: batch record save was successful"); + LOG.debug("saveRecordsByExternalIds:: batch record save was successful"); finalPromise.complete(r.result()); } - }); + } + ); return finalPromise.future() .onSuccess(response -> response.getRecords() @@ -1013,6 +833,229 @@ public Future saveRecords(RecordCollection recordCollectio ); } + private ResultQuery readRecords(DSLContext dsl, Condition condition, RecordType recordType, int offset, int limit, + boolean returnTotalCount, Collection> orderFields) { + Name cte = name(CTE); + Name prt = name(recordType.getTableName()); + var finalCondition = condition.and(recordType.getRecordImplicitCondition()); + + ResultQuery> countQuery; + if (returnTotalCount) { + countQuery = dsl.selectCount() + .from(RECORDS_LB) + .where(finalCondition); + } else { + countQuery = select(inline(null, Integer.class).as(COUNT)); + } + + return dsl + .with(cte.as(countQuery)) + .select(getAllRecordFieldsWithCount(prt)) + .from(RECORDS_LB) + .leftJoin(table(prt)).on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, prt, name(ID)))) + .leftJoin(RAW_RECORDS_LB).on(RECORDS_LB.ID.eq(RAW_RECORDS_LB.ID)) + .leftJoin(ERROR_RECORDS_LB).on(RECORDS_LB.ID.eq(ERROR_RECORDS_LB.ID)) + .rightJoin(dsl.select().from(table(cte))).on(trueCondition()) + .where(finalCondition) + .orderBy(orderFields) + .offset(offset) + .limit(limit > 0 ? limit : DEFAULT_LIMIT_FOR_GET_RECORDS); + } + + private RecordsBatchResponse saveRecords(RecordCollection recordCollection, String snapshotId, RecordType recordType, + String tenantId) throws SQLException { + Set matchedIds = new HashSet<>(); + List dbRecords = new ArrayList<>(); + List dbRawRecords = new ArrayList<>(); + List> dbParsedRecords = new ArrayList<>(); + List dbErrorRecords = new ArrayList<>(); + + List errorMessages = new ArrayList<>(); + + recordCollection.getRecords() + .stream() + .map(RecordDaoUtil::ensureRecordHasId) + .map(RecordDaoUtil::ensureRecordHasMatchedId) + .map(RecordDaoUtil::ensureRecordHasSuppressDiscovery) + .map(RecordDaoUtil::ensureRecordForeignKeys) + .forEach(record -> { + // collect unique matched ids to query to determine generation + matchedIds.add(UUID.fromString(record.getMatchedId())); + + // make sure only one snapshot id + if (!Objects.equals(snapshotId, record.getSnapshotId())) { + throw new BadRequestException("Batch record collection only supports single snapshot"); + } + validateRecordType(record, recordType); + + // if record has parsed record, validate by attempting format + if (Objects.nonNull(record.getParsedRecord())) { + try { + recordType.formatRecord(record); + Record2 dbParsedRecord = recordType.toDatabaseRecord2(record.getParsedRecord()); + dbParsedRecords.add(dbParsedRecord); + } catch (Exception e) { + // create error record and remove from record + Object content = Optional.ofNullable(record.getParsedRecord()) + .map(ParsedRecord::getContent) + .orElse(null); + var errorRecord = new ErrorRecord() + .withId(record.getId()) + .withDescription(e.getMessage()) + .withContent(content); + errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); + record.withErrorRecord(errorRecord) + .withParsedRecord(null) + .withLeaderRecordStatus(null); + } + } + if (Objects.nonNull(record.getRawRecord())) { + dbRawRecords.add(RawRecordDaoUtil.toDatabaseRawRecord(record.getRawRecord())); + } + if (Objects.nonNull(record.getErrorRecord())) { + dbErrorRecords.add(ErrorRecordDaoUtil.toDatabaseErrorRecord(record.getErrorRecord())); + } + dbRecords.add(RecordDaoUtil.toDatabaseRecord(record)); + }); + + try (Connection connection = getConnection(tenantId)) { + return DSL.using(connection).transactionResult(ctx -> { + DSLContext dsl = DSL.using(ctx); + + // validate snapshot + validateSnapshot(UUID.fromString(snapshotId), ctx); + + List ids = new ArrayList<>(); + Map matchedGenerations = new HashMap<>(); + + // lookup the latest generation by matched id and committed snapshot updated before current snapshot + dsl.select(RECORDS_LB.MATCHED_ID, RECORDS_LB.ID, RECORDS_LB.GENERATION) + .distinctOn(RECORDS_LB.MATCHED_ID) + .from(RECORDS_LB) + .innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID)) + .where(RECORDS_LB.MATCHED_ID.in(matchedIds) + .and(SNAPSHOTS_LB.STATUS.in(JobExecutionStatus.COMMITTED, JobExecutionStatus.ERROR, JobExecutionStatus.CANCELLED)) + .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl + .select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) + .from(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(UUID.fromString(snapshotId)))))) + .orderBy(RECORDS_LB.MATCHED_ID.asc(), RECORDS_LB.GENERATION.desc()) + .fetchStream().forEach(r -> { + UUID id = r.get(RECORDS_LB.ID); + UUID matchedId = r.get(RECORDS_LB.MATCHED_ID); + int generation = r.get(RECORDS_LB.GENERATION); + ids.add(id); + matchedGenerations.put(matchedId, generation); + }); + + // update matching records state + if(!ids.isEmpty()) + { + dsl.update(RECORDS_LB) + .set(RECORDS_LB.STATE, RecordState.OLD) + .where(RECORDS_LB.ID.in(ids)) + .execute(); + } + + // batch insert records updating generation if required + var recordsLoadingErrors = dsl.loadInto(RECORDS_LB) + .batchAfter(1000) + .bulkAfter(500) + .commitAfter(1000) + .onErrorAbort() + .loadRecords(dbRecords.stream() + .map(recordDto -> { + Integer generation = matchedGenerations.get(recordDto.getMatchedId()); + if (Objects.nonNull(generation)) { + recordDto.setGeneration(generation + 1); + } else if (Objects.isNull(recordDto.getGeneration())) { + recordDto.setGeneration(0); + } + return recordDto; + }) + .toList()) + .fieldsCorresponding() + .execute() + .errors(); + + recordsLoadingErrors.forEach(error -> { + if (error.exception().sqlState().equals(UNIQUE_VIOLATION_SQL_STATE)) { + throw new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record"); + } + LOG.warn("saveRecords:: Error occurred on batch execution: {}", error.exception().getCause().getMessage()); + LOG.debug("saveRecords:: Failed to execute statement from batch: {}", error.query()); + }); + + // batch insert raw records + dsl.loadInto(RAW_RECORDS_LB) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbRawRecords) + .fieldsCorresponding() + .execute(); + + // batch insert parsed records + recordType.toLoaderOptionsStep(dsl) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbParsedRecords) + .fieldsCorresponding() + .execute(); + + if (!dbErrorRecords.isEmpty()) { + // batch insert error records + dsl.loadInto(ERROR_RECORDS_LB) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbErrorRecords) + .fieldsCorresponding() + .execute(); + } + + return new RecordsBatchResponse() + .withRecords(recordCollection.getRecords()) + .withTotalRecords(recordCollection.getRecords().size()) + .withErrorMessages(errorMessages); + }); + } catch (DuplicateEventException e) { + LOG.info("saveRecords:: Skipped saving records due to duplicate event: {}", e.getMessage()); + throw e; + } catch (SQLException | DataAccessException ex) { + LOG.warn("saveRecords:: Failed to save records", ex); + Throwable throwable = ex.getCause() != null ? ex.getCause() : ex; + throw new RecordUpdateException(throwable); + } + } + + private void validateRecordType(Record recordDto, RecordType recordType) { + if (recordDto.getRecordType() == null) { + var error = recordDto.getErrorRecord() != null ? recordDto.getErrorRecord().getDescription() : ""; + throw new BadRequestException( + StringUtils.defaultIfEmpty(error, String.format("Record with id %s has not record type", recordDto.getId()))); + } + + if (RecordType.valueOf(recordDto.getRecordType().name()) != recordType) { + throw new BadRequestException("Batch record collection only supports single record type"); + } + } + + private void validateSnapshot(UUID snapshotId, Configuration ctx) { + Optional snapshot = DSL.using(ctx).selectFrom(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(snapshotId)) + .fetchOptional(); + if (snapshot.isPresent() && Objects.isNull(snapshot.get().getProcessingStartedDate())) { + throw new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.get().getStatus())); + } else if (snapshot.isEmpty()) { + throw new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, snapshotId)); + } + } + @Override public Future updateRecord(Record record, Map okapiHeaders) { var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER); @@ -1282,7 +1325,7 @@ public Future updateParsedRecords(RecordCollection r blockingPromise.complete(new ParsedRecordsBatchResponse() .withErrorMessages(errorMessages) - .withParsedRecords(recordsUpdated.stream().map(Record::getParsedRecord).collect(Collectors.toList())) + .withParsedRecords(recordsUpdated.stream().map(Record::getParsedRecord).toList()) .withTotalRecords(recordsUpdated.size())); }); } catch (SQLException e) { @@ -1484,7 +1527,7 @@ public Future updateMarcAuthorityRecordsStateAsDeleted(String matchedId, S .compose(recordCollection -> { List> futures = recordCollection.getRecords().stream() .map(recordToUpdate -> updateMarcAuthorityRecordWithDeletedState(txQE, ensureRecordForeignKeys(recordToUpdate))) - .collect(Collectors.toList()); + .toList(); Promise result = Promise.promise(); GenericCompositeFuture.all(futures).onComplete(ar -> { @@ -1631,11 +1674,11 @@ private Future updateExternalIdsForRecord(ReactiveClassicGenericQueryEx }); } - private Record validateParsedRecordId(Record record) { - if (Objects.isNull(record.getParsedRecord()) || StringUtils.isEmpty(record.getParsedRecord().getId())) { + private Record validateParsedRecordId(Record recordDto) { + if (Objects.isNull(recordDto.getParsedRecord()) || StringUtils.isEmpty(recordDto.getParsedRecord().getId())) { throw new BadRequestException("Each parsed record should contain an id"); } - return record; + return recordDto; } private Field[] getRecordFields(Name prt) { @@ -1678,7 +1721,8 @@ private RecordCollection toRecordCollection(QueryResult result) { List records = result.stream().map(res -> asRow(res.unwrap())).map(row -> { recordCollection.setTotalRecords(row.getInteger(COUNT)); return toRecord(row); - }).collect(Collectors.toList()); + }) + .toList(); if (!records.isEmpty() && Objects.nonNull(records.get(0).getId())) { recordCollection.withRecords(records); } @@ -1690,7 +1734,8 @@ private StrippedParsedRecordCollection toStrippedParsedRecordCollection(QueryRes List records = result.stream().map(res -> asRow(res.unwrap())).map(row -> { recordCollection.setTotalRecords(row.getInteger(COUNT)); return toStrippedParsedRecord(row); - }).collect(Collectors.toList()); + }) + .toList(); if (!records.isEmpty() && Objects.nonNull(records.get(0).getId())) { recordCollection.withRecords(records); } @@ -1715,7 +1760,8 @@ private SourceRecordCollection toSourceRecordCollection(QueryResult result) { sourceRecordCollection.setTotalRecords(row.getInteger(COUNT)); return RecordDaoUtil.toSourceRecord(RecordDaoUtil.toRecord(row)) .withParsedRecord(ParsedRecordDaoUtil.toParsedRecord(row)); - }).collect(Collectors.toList()); + }) + .toList(); if (!sourceRecords.isEmpty() && Objects.nonNull(sourceRecords.get(0).getRecordId())) { sourceRecordCollection.withSourceRecords(sourceRecords); } @@ -1732,20 +1778,38 @@ private SourceRecord toSourceRecord(Row row) { } private Record toRecord(Row row) { - Record record = RecordDaoUtil.toRecord(row); + Record recordDto = RecordDaoUtil.toRecord(row); RawRecord rawRecord = RawRecordDaoUtil.toJoinedRawRecord(row); if (Objects.nonNull(rawRecord.getContent())) { - record.setRawRecord(rawRecord); + recordDto.setRawRecord(rawRecord); } ParsedRecord parsedRecord = ParsedRecordDaoUtil.toJoinedParsedRecord(row); if (Objects.nonNull(parsedRecord.getContent())) { - record.setParsedRecord(parsedRecord); + recordDto.setParsedRecord(parsedRecord); } ErrorRecord errorRecord = ErrorRecordDaoUtil.toJoinedErrorRecord(row); if (Objects.nonNull(errorRecord.getContent())) { - record.setErrorRecord(errorRecord); + recordDto.setErrorRecord(errorRecord); + } + return recordDto; + } + + private Record toRecord(org.jooq.Record dbRecord) { + Record recordDto = RecordDaoUtil.toRecord(dbRecord); + RawRecord rawRecord = RawRecordDaoUtil.toJoinedRawRecord(dbRecord); + if (Objects.nonNull(rawRecord.getContent())) { + recordDto.setRawRecord(rawRecord); + } + + ParsedRecord parsedRecord = ParsedRecordDaoUtil.toJoinedParsedRecord(dbRecord); + if (Objects.nonNull(parsedRecord.getContent())) { + recordDto.setParsedRecord(parsedRecord); + } + ErrorRecord errorRecord = ErrorRecordDaoUtil.toJoinedErrorRecord(dbRecord); + if (Objects.nonNull(errorRecord.getContent())) { + recordDto.setErrorRecord(errorRecord); } - return record; + return recordDto; } private StrippedParsedRecord toStrippedParsedRecord(Row row) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java index ff10bc84c..6e76924ed 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ErrorRecordDaoUtil.java @@ -17,6 +17,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; +import org.jooq.Record; /** * Utility class for managing {@link ErrorRecord} @@ -32,7 +33,7 @@ private ErrorRecordDaoUtil() { } /** * Searches for {@link ErrorRecord} by id using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param id id * @return future with optional ErrorRecord @@ -45,7 +46,7 @@ public static Future> findById(ReactiveClassicGenericQuery /** * Saves {@link ErrorRecord} to the db using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param errorRecord error record * @return future with updated ErrorRecord @@ -62,7 +63,7 @@ public static Future save(ReactiveClassicGenericQueryExecutor query /** * Convert database query result {@link Row} to {@link ErrorRecord} - * + * * @param row query result row * @return ErrorRecord */ @@ -76,7 +77,7 @@ public static ErrorRecord toErrorRecord(Row row) { /** * Convert database query result {@link Row} to {@link ErrorRecord} - * + * * @param row query result row * @return ErrorRecord */ @@ -91,9 +92,26 @@ public static ErrorRecord toJoinedErrorRecord(Row row) { .withDescription(row.getString(DESCRIPTION)); } + /** + * Convert database query result {@link Row} to {@link ErrorRecord} + * + * @param dbRecord query result record + * @return ErrorRecord + */ + public static ErrorRecord toJoinedErrorRecord(Record dbRecord) { + ErrorRecord errorRecord = new ErrorRecord(); + UUID id = dbRecord.get(org.folio.rest.jooq.tables.ErrorRecordsLb.ERROR_RECORDS_LB.ID); + if (Objects.nonNull(id)) { + errorRecord.withId(id.toString()); + } + return errorRecord + .withContent(dbRecord.get(ERROR_RECORD_CONTENT, String.class)) + .withDescription(dbRecord.get(org.folio.rest.jooq.tables.ErrorRecordsLb.ERROR_RECORDS_LB.DESCRIPTION)); + } + /** * Convert database query result {@link Row} to {@link Optional} {@link ErrorRecord} - * + * * @param row query result row * @return optional ErrorRecord */ @@ -103,7 +121,7 @@ public static Optional toOptionalErrorRecord(Row row) { /** * Convert {@link ErrorRecord} to database record {@link ErrorRecordsLbRecord} - * + * * @param errorRecord error record * @return ErrorRecordsLbRecord */ diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java index 4f8979624..767c9fa7d 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/ParsedRecordDaoUtil.java @@ -138,16 +138,23 @@ public static ParsedRecord toParsedRecord(Row row) { * @return ParsedRecord */ public static ParsedRecord toJoinedParsedRecord(Row row) { - ParsedRecord parsedRecord = new ParsedRecord(); UUID id = row.getUUID(ID); - if (Objects.nonNull(id)) { - parsedRecord.withId(id.toString()); - } Object content = row.getValue(PARSED_RECORD_CONTENT); - if (Objects.nonNull(content)) { - parsedRecord.withContent(normalize(content).getMap()); - } - return parsedRecord; + + return asParsedRecord(id, content); + } + + /** + * Convert database query result {@link org.jooq.Record} to {@link ParsedRecord} + * + * @param dbRecord query result record + * @return ParsedRecord + */ + public static ParsedRecord toJoinedParsedRecord(org.jooq.Record dbRecord) { + UUID id = dbRecord.get(ID_FIELD); + Object content = dbRecord.get(PARSED_RECORD_CONTENT, String.class); + + return asParsedRecord(id, content); } /** @@ -258,4 +265,15 @@ public static JsonObject normalize(Object content) { ? new JsonObject((String) content) : JsonObject.mapFrom(content); } + + private static ParsedRecord asParsedRecord(UUID id, Object content) { + ParsedRecord parsedRecord = new ParsedRecord(); + if (Objects.nonNull(id)) { + parsedRecord.withId(id.toString()); + } + if (Objects.nonNull(content)) { + parsedRecord.withContent(normalize(content).getMap()); + } + return parsedRecord; + } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java index 0de3850e1..d41851fbf 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RawRecordDaoUtil.java @@ -16,6 +16,7 @@ import io.vertx.core.Future; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; +import org.jooq.Record; /** * Utility class for managing {@link RawRecord} @@ -30,7 +31,7 @@ private RawRecordDaoUtil() { } /** * Searches for {@link RawRecord} by id using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param id id * @return future with optional RawRecord @@ -43,7 +44,7 @@ public static Future> findById(ReactiveClassicGenericQueryEx /** * Saves {@link RawRecord} to the db using {@link ReactiveClassicGenericQueryExecutor} - * + * * @param queryExecutor query executor * @param rawRecord raw record * @return future with updated RawRecord @@ -60,7 +61,7 @@ public static Future save(ReactiveClassicGenericQueryExecutor queryEx /** * Convert database query result {@link Row} to {@link RawRecord} - * + * * @param row query result row * @return RawRecord */ @@ -73,7 +74,7 @@ public static RawRecord toRawRecord(Row row) { /** * Convert database query result {@link Row} to {@link RawRecord} - * + * * @param row query result row * @return RawRecord */ @@ -87,9 +88,25 @@ public static RawRecord toJoinedRawRecord(Row row) { .withContent(row.getString(RAW_RECORD_CONTENT)); } + /** + * Convert database query result {@link Record} to {@link RawRecord} + * + * @param dbRecord query result record + * @return RawRecord + */ + public static RawRecord toJoinedRawRecord(Record dbRecord) { + RawRecord rawRecord = new RawRecord(); + UUID id = dbRecord.get(org.folio.rest.jooq.tables.RawRecordsLb.RAW_RECORDS_LB.ID); + if (Objects.nonNull(id)) { + rawRecord.withId(id.toString()); + } + return rawRecord + .withContent(dbRecord.get(RAW_RECORD_CONTENT, String.class)); + } + /** * Convert database query result {@link Row} to {@link Optional} {@link RawRecord} - * + * * @param row query result row * @return optional RawRecord */ @@ -99,7 +116,7 @@ public static Optional toOptionalRawRecord(Row row) { /** * Convert {@link RawRecord} to database record {@link RawRecordsLbRecord} - * + * * @param rawRecord raw record * @return RawRecordsLbRecord */ diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java index d9ecf8f59..3aefa6970 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordDaoUtil.java @@ -291,31 +291,46 @@ public static SourceRecord toSourceRecord(Record record) { */ public static Record toRecord(Row row) { RecordsLb pojo = RowMappers.getRecordsLbMapper().apply(row); - Record record = new Record(); + return asRecord(pojo); + } + + /** + * Convert database query result {@link org.jooq.Record} to {@link Record} + * + * @param dbRecord query result record + * @return Record + */ + public static Record toRecord(org.jooq.Record dbRecord) { + RecordsLb pojo = RecordMappers.getDbRecordToRecordsLbMapper().apply(dbRecord); + return asRecord(pojo); + } + + private static Record asRecord(RecordsLb pojo) { + Record recordDto = new Record(); if (Objects.nonNull(pojo.getId())) { - record.withId(pojo.getId().toString()); + recordDto.withId(pojo.getId().toString()); } if (Objects.nonNull(pojo.getSnapshotId())) { - record.withSnapshotId(pojo.getSnapshotId().toString()); + recordDto.withSnapshotId(pojo.getSnapshotId().toString()); } if (Objects.nonNull(pojo.getMatchedId())) { - record.withMatchedId(pojo.getMatchedId().toString()); + recordDto.withMatchedId(pojo.getMatchedId().toString()); } if (Objects.nonNull(pojo.getRecordType())) { - record.withRecordType(Record.RecordType.valueOf(pojo.getRecordType().toString())); + recordDto.withRecordType(Record.RecordType.valueOf(pojo.getRecordType().toString())); } if (Objects.nonNull(pojo.getState())) { - record.withState(State.valueOf(pojo.getState().toString())); + recordDto.withState(State.valueOf(pojo.getState().toString())); } - record + recordDto .withOrder(pojo.getOrder()) .withGeneration(pojo.getGeneration()) .withLeaderRecordStatus(pojo.getLeaderRecordStatus()) - .withDeleted(record.getState().equals(State.DELETED) - || DELETED_LEADER_RECORD_STATUS.contains(record.getLeaderRecordStatus())); + .withDeleted(recordDto.getState().equals(State.DELETED) + || DELETED_LEADER_RECORD_STATUS.contains(recordDto.getLeaderRecordStatus())); - return record + return recordDto .withAdditionalInfo(toAdditionalInfo(pojo)) .withExternalIdsHolder(toExternalIdsHolder(pojo)) .withMetadata(toMetadata(pojo)); @@ -702,7 +717,7 @@ private static UUID toUUID(String uuid) { } private static List toUUIDs(List uuids) { - return uuids.stream().map(RecordDaoUtil::toUUID).collect(Collectors.toList()); + return uuids.stream().map(RecordDaoUtil::toUUID).toList(); } private static RecordType toRecordType(String type) { diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java new file mode 100644 index 000000000..0cb50bc42 --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/util/RecordMappers.java @@ -0,0 +1,39 @@ +package org.folio.dao.util; + +import org.folio.rest.jooq.tables.pojos.RecordsLb; +import org.jooq.Record; + +import java.util.function.Function; + +public final class RecordMappers { + + private RecordMappers() {} + + public static Function getDbRecordToRecordsLbMapper() { + return jooqRecord -> { + org.folio.rest.jooq.tables.pojos.RecordsLb pojo = new org.folio.rest.jooq.tables.pojos.RecordsLb(); + pojo.setId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.ID)); + pojo.setSnapshotId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.SNAPSHOT_ID)); + pojo.setMatchedId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.MATCHED_ID)); + pojo.setGeneration(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.GENERATION)); + pojo.setRecordType(java.util.Arrays.stream(org.folio.rest.jooq.enums.RecordType.values()) + .filter(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.RECORD_TYPE)::equals) + .findFirst() + .orElse(null)); + pojo.setExternalId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.EXTERNAL_ID)); + pojo.setState(java.util.Arrays.stream(org.folio.rest.jooq.enums.RecordState.values()) + .filter(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.STATE)::equals) + .findFirst() + .orElse(null)); + pojo.setLeaderRecordStatus(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.LEADER_RECORD_STATUS)); + pojo.setOrder(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.ORDER)); + pojo.setSuppressDiscovery(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.SUPPRESS_DISCOVERY)); + pojo.setCreatedByUserId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.CREATED_BY_USER_ID)); + pojo.setCreatedDate(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.CREATED_DATE)); + pojo.setUpdatedByUserId(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.UPDATED_BY_USER_ID)); + pojo.setUpdatedDate(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.UPDATED_DATE)); + pojo.setExternalHrid(jooqRecord.get(org.folio.rest.jooq.tables.RecordsLb.RECORDS_LB.EXTERNAL_HRID)); + return pojo; + }; + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java index 14ded8a17..93da75c36 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordService.java @@ -1,11 +1,12 @@ package org.folio.services; +import io.reactivex.Flowable; +import io.vertx.core.Future; +import io.vertx.sqlclient.Row; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; - -import io.vertx.sqlclient.Row; import org.folio.dao.util.IdType; import org.folio.dao.util.RecordType; import org.folio.rest.jaxrs.model.FetchParsedRecordsBatchRequest; @@ -22,12 +23,10 @@ import org.folio.rest.jaxrs.model.SourceRecordCollection; import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection; import org.folio.rest.jooq.enums.RecordState; +import org.folio.services.entities.RecordsModifierOperator; import org.jooq.Condition; import org.jooq.OrderField; -import io.reactivex.Flowable; -import io.vertx.core.Future; - public interface RecordService { /** @@ -83,6 +82,19 @@ public interface RecordService { */ Future saveRecords(RecordCollection recordsCollection, Map okapiHeaders); + /** + * Saves collection of records. + * + * @param externalIds external relation ids + * @param recordType record type + * @param recordsModifier records collection modifier operator + * @param okapiHeaders okapi headers + * @return future with response containing list of successfully saved records and error messages for records that were not saved + */ + Future saveRecordsByExternalIds(List externalIds, RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders); + /** * Updates record with given id * diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index c39dba0c0..8a3c6d30e 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -11,6 +11,7 @@ import static org.folio.dao.util.RecordDaoUtil.ensureRecordHasSuppressDiscovery; import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalHridValuesWithQualifier; import static org.folio.dao.util.RecordDaoUtil.filterRecordByState; +import static org.folio.dao.util.RecordDaoUtil.getExternalIdType; import static org.folio.dao.util.RecordDaoUtil.getExternalIdsConditionWithQualifier; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_FOUND_TEMPLATE; import static org.folio.dao.util.SnapshotDaoUtil.SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE; @@ -29,7 +30,6 @@ import io.vertx.core.json.JsonObject; import io.vertx.pgclient.PgException; import io.vertx.sqlclient.Row; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,11 +39,11 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutionException; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; - import net.sf.jsqlparser.JSQLParserException; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -75,6 +75,7 @@ import org.folio.rest.jaxrs.model.SourceRecordCollection; import org.folio.rest.jaxrs.model.StrippedParsedRecordCollection; import org.folio.rest.jooq.enums.RecordState; +import org.folio.services.entities.RecordsModifierOperator; import org.folio.services.exceptions.DuplicateRecordException; import org.folio.services.exceptions.RecordUpdateException; import org.folio.services.util.AdditionalFieldsUtil; @@ -187,6 +188,42 @@ public Future saveRecords(RecordCollection recordCollectio .recover(RecordServiceImpl::mapToDuplicateExceptionIfNeeded); } + @Override + public Future saveRecordsByExternalIds(List externalIds, + RecordType recordType, + RecordsModifierOperator recordsModifier, + Map okapiHeaders) { + if (CollectionUtils.isEmpty(externalIds)) { + LOG.warn("saveRecordsByExternalIds:: Skipping the records save, no external IDs are provided"); + return Future.succeededFuture(new RecordsBatchResponse().withTotalRecords(0)); + } + + if (recordsModifier == null) { + LOG.warn("saveRecordsByExternalIds:: Skipping the records save, no operator is provided to modify the existing records"); + return Future.succeededFuture(new RecordsBatchResponse().withTotalRecords(0)); + } + + RecordsModifierOperator recordsMatchedIdsSetter = recordCollection -> { + try { + for (var sourceRecord : recordCollection.getRecords()) { + setMatchedIdForRecord(sourceRecord, okapiHeaders.get(OKAPI_TENANT_HEADER)) + .toCompletionStage().toCompletableFuture().get(); + } + return recordCollection; + } catch (InterruptedException ex) { + LOG.warn("saveRecordsByExternalIds:: Setting record matched id is interrupted: {}", ex.getMessage()); + Thread.currentThread().interrupt(); + throw new RecordUpdateException(ex.getMessage()); + } catch (ExecutionException ex) { + LOG.warn("saveRecordsByExternalIds:: Failed to set record matched id: {}", ex.getMessage()); + throw new RecordUpdateException(ex.getMessage()); + } + }; + RecordsModifierOperator recordsModifierWithMatchedIdsSetter = recordsModifier.andThen(recordsMatchedIdsSetter); + + return recordDao.saveRecordsByExternalIds(externalIds, recordType, recordsModifierWithMatchedIdsSetter, okapiHeaders); + } + @Override public Future updateRecord(Record record, Map okapiHeaders) { return recordDao.updateRecord(ensureRecordForeignKeys(record), okapiHeaders); @@ -383,7 +420,7 @@ private Future setMatchedIdForRecord(Record record, String tenantId) { } Promise promise = Promise.promise(); String externalId = RecordDaoUtil.getExternalId(record.getExternalIdsHolder(), record.getRecordType()); - IdType idType = RecordDaoUtil.getExternalIdType(record.getRecordType()); + IdType idType = getExternalIdType(record.getRecordType()); if (externalId != null && idType != null && record.getState() == Record.State.ACTUAL) { setMatchedIdFromExistingSourceRecord(record, tenantId, promise, externalId, idType); @@ -455,7 +492,7 @@ private void filterFieldsByDataRange(AsyncResult .map(JsonObject.class::cast) .filter(field -> checkFieldRange(field, data)) .map(JsonObject::getMap) - .collect(Collectors.toList()); + .toList(); parsedContent.put("fields", filteredFields); recordToFilter.getParsedRecord().setContent(parsedContent.getMap()); diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java b/mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java new file mode 100644 index 000000000..5dcd87483 --- /dev/null +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/entities/RecordsModifierOperator.java @@ -0,0 +1,24 @@ +package org.folio.services.entities; + +import org.folio.rest.jaxrs.model.RecordCollection; +import java.util.Objects; +import java.util.function.UnaryOperator; + +@FunctionalInterface +public +interface RecordsModifierOperator extends UnaryOperator { + + static RecordsModifierOperator identity() { + return s -> s; + } + + default RecordsModifierOperator andThen(RecordsModifierOperator after) { + Objects.requireNonNull(after); + return s -> after.apply(this.apply(s)); + } + + default RecordsModifierOperator compose(RecordsModifierOperator before) { + Objects.requireNonNull(before); + return s -> this.apply(before.apply(s)); + } +} diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java b/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java index 69232837c..8ae129f63 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/exceptions/RecordUpdateException.java @@ -8,4 +8,8 @@ public class RecordUpdateException extends RuntimeException { public RecordUpdateException(String message) { super(message); } + + public RecordUpdateException(Throwable cause) { + super(cause); + } } diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java index 8b836df5d..9c7c846c8 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/util/EventHandlingUtil.java @@ -28,7 +28,6 @@ import org.folio.processing.events.utils.PomReaderUtil; import org.folio.rest.jaxrs.model.Event; import org.folio.rest.jaxrs.model.EventMetadata; -import org.folio.rest.tools.utils.ModuleName; import org.folio.services.domainevent.SourceRecordDomainEventType; public final class EventHandlingUtil { @@ -102,8 +101,8 @@ public static KafkaProducerRecord createProducerRecord(String ev } public static String constructModuleName() { - return PomReaderUtil.INSTANCE.constructModuleVersionAndVersion(ModuleName.getModuleName(), - ModuleName.getModuleVersion()); + return PomReaderUtil.INSTANCE.constructModuleVersionAndVersion(PomReaderUtil.INSTANCE.getModuleName(), + PomReaderUtil.INSTANCE.getVersion()); } public static String createTopicName(String eventType, String tenantId, KafkaConfig kafkaConfig) { diff --git a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java index e13c4c3eb..0257b09b9 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/services/RecordServiceTest.java @@ -12,7 +12,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.reactivex.Flowable; -import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.json.JsonArray; @@ -63,7 +62,6 @@ import org.folio.rest.jaxrs.model.RecordsBatchResponse; import org.folio.rest.jaxrs.model.Snapshot; import org.folio.rest.jaxrs.model.SourceRecord; -import org.folio.rest.jaxrs.model.SourceRecordCollection; import org.folio.rest.jaxrs.model.StrippedParsedRecord; import org.folio.rest.jooq.enums.RecordState; import org.folio.services.domainevent.RecordDomainEventPublisher; @@ -149,7 +147,7 @@ public void shouldGetMarcBibRecordsBySnapshotId(TestContext context) { .filter(r -> r.getRecordType().equals(Record.RecordType.MARC_BIB)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareRecords(context, expected.get(1), get.result().getRecords().get(0)); compareRecords(context, expected.get(2), get.result().getRecords().get(1)); @@ -415,7 +413,7 @@ public void shouldStreamMarcBibRecordsBySnapshotId(TestContext context) { .filter(r -> r.getRecordType().equals(Record.RecordType.MARC_BIB)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); List actual = new ArrayList<>(); flowable.doFinally(() -> { @@ -1632,7 +1630,7 @@ private void getTotalRecordsAndRecordsDependsOnLimit(TestContext context, int li } List expected = records.stream() .filter(r -> r.getRecordType().equals(Record.RecordType.MARC_BIB)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); context.assertEquals(limit, get.result().getRecords().size()); async.complete(); @@ -1662,7 +1660,7 @@ private void getRecordsBySnapshotId(TestContext context, String snapshotId, Reco .filter(r -> r.getRecordType().equals(recordType)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareRecords(context, expected.get(0), get.result().getRecords().get(0)); async.complete(); @@ -1690,8 +1688,7 @@ private void getMarcRecordsBetweenDates(TestContext context, OffsetDateTime earl } List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) - .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareRecords(context, expected, get.result().getRecords()); async.complete(); @@ -1719,7 +1716,7 @@ private void streamRecordsBySnapshotId(TestContext context, String snapshotId, R .filter(r -> r.getRecordType().equals(recordType)) .filter(r -> r.getSnapshotId().equals(snapshotId)) .sorted(comparing(Record::getOrder)) - .collect(Collectors.toList()); + .toList(); List actual = new ArrayList<>(); flowable.doFinally(() -> { @@ -1823,8 +1820,6 @@ private void saveMarcRecords(TestContext context, Record.RecordType marcBib) { } context.assertEquals(0, batch.result().getErrorMessages().size()); context.assertEquals(expected.size(), batch.result().getTotalRecords()); - expected.sort(comparing(Record::getId)); - batch.result().getRecords().sort(comparing(Record::getId)); compareRecords(context, expected, batch.result().getRecords()); RecordDaoUtil.countByCondition(postgresClientFactory.getQueryExecutor(TENANT_ID), DSL.trueCondition()) .onComplete(count -> { @@ -1854,8 +1849,6 @@ private void saveMarcRecordsWithExpectedErrors(TestContext context, Record.Recor } context.assertEquals(0, batch.result().getErrorMessages().size()); context.assertEquals(expected.size(), batch.result().getTotalRecords()); - expected.sort(comparing(Record::getId)); - batch.result().getRecords().sort(comparing(Record::getId)); compareRecords(context, expected, batch.result().getRecords()); checkRecordErrorRecords(context, batch.result().getRecords(), TestMocks.getErrorRecord(0).getContent().toString(), TestMocks.getErrorRecord(0).getDescription()); @@ -1898,9 +1891,7 @@ private void getMarcSourceRecords(TestContext context, RecordType parsedRecordTy List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -1927,14 +1918,10 @@ private void streamMarcSourceRecords(TestContext context, RecordType parsedRecor List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .sorted(comparing(SourceRecord::getOrder)) - .collect(Collectors.toList()); + .toList(); List actual = new ArrayList<>(); flowable.doFinally(() -> { - - actual.sort(comparing(SourceRecord::getRecordId)); context.assertEquals(expected.size(), actual.size()); compareSourceRecords(context, expected, actual); @@ -1968,9 +1955,7 @@ private void getMarcSourceRecordsByListOfIds(TestContext context, Record.RecordT List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - sortByRecordId(get); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -1978,10 +1963,6 @@ private void getMarcSourceRecordsByListOfIds(TestContext context, Record.RecordT }); } - private void sortByRecordId(AsyncResult get) { - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); - } - private void getMarcSourceRecordsBetweenDates(TestContext context, Record.RecordType recordType, RecordType parsedRecordType, OffsetDateTime earliestDate, OffsetDateTime latestDate) { @@ -2004,9 +1985,7 @@ private void getMarcSourceRecordsBetweenDates(TestContext context, Record.Record List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -2059,9 +2038,7 @@ private void getMarcSourceRecordsByListOfIdsThatAreDeleted(TestContext context, List expected = records.stream() .filter(r -> r.getRecordType().equals(recordType)) .map(RecordDaoUtil::toSourceRecord) - .sorted(comparing(SourceRecord::getRecordId)) - .collect(Collectors.toList()); - get.result().getSourceRecords().sort(comparing(SourceRecord::getRecordId)); + .toList(); context.assertEquals(expected.size(), get.result().getTotalRecords()); compareSourceRecords(context, expected, get.result().getSourceRecords()); async.complete(); @@ -2133,8 +2110,6 @@ private void updateParsedMarcRecords(TestContext context, Record.RecordType reco } context.assertEquals(0, update.result().getErrorMessages().size()); context.assertEquals(expected.size(), update.result().getTotalRecords()); - expected.sort(comparing(ParsedRecord::getId)); - update.result().getParsedRecords().sort(comparing(ParsedRecord::getId)); compareParsedRecords(context, expected, update.result().getParsedRecords()); GenericCompositeFuture.all(updated.stream().map(record -> recordDao .getRecordByMatchedId(record.getMatchedId(), TENANT_ID) @@ -2283,7 +2258,12 @@ private CompositeFuture saveRecords(List records) { private void compareRecords(TestContext context, List expected, List actual) { context.assertEquals(expected.size(), actual.size()); for (Record record : expected) { - compareRecords(context, record, record); + var actualRecord = actual.stream() + .filter(r -> Objects.equals(r.getId(), record.getId())) + .findFirst(); + if (actualRecord.isPresent()) { + compareRecords(context, record, actualRecord.get()); + } } } @@ -2348,7 +2328,12 @@ private void compareRecords(TestContext context, Record expected, StrippedParsed private void compareSourceRecords(TestContext context, List expected, List actual) { context.assertEquals(expected.size(), actual.size()); for (SourceRecord sourceRecord : expected) { - compareSourceRecords(context, sourceRecord, sourceRecord); + var sourceRecordActual = actual.stream() + .filter(sr -> Objects.equals(sr.getRecordId(), sourceRecord.getRecordId())) + .findFirst(); + if (sourceRecordActual.isPresent()) { + compareSourceRecords(context, sourceRecord, sourceRecordActual.get()); + } } } @@ -2381,7 +2366,10 @@ private void compareSourceRecords(TestContext context, SourceRecord expected, So private void compareParsedRecords(TestContext context, List expected, List actual) { context.assertEquals(expected.size(), actual.size()); for (ParsedRecord parsedRecord : expected) { - compareParsedRecords(context, parsedRecord, parsedRecord); + var actualParsedRecord = actual.stream().filter(a -> Objects.equals(a.getId(), parsedRecord.getId())).findFirst(); + if (actualParsedRecord.isPresent()) { + compareParsedRecords(context, parsedRecord, actualParsedRecord.get()); + } } } diff --git a/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java b/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java index 010cda28c..3fa774b8c 100644 --- a/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java +++ b/mod-source-record-storage-server/src/test/java/org/folio/verticle/consumers/ParsedRecordChunkConsumersVerticleTest.java @@ -5,6 +5,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import net.mguenther.kafka.junit.KeyValue; import net.mguenther.kafka.junit.ObserveKeyValues; @@ -30,6 +31,7 @@ import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.MockitoAnnotations; @@ -62,6 +64,9 @@ public class ParsedRecordChunkConsumersVerticleTest extends AbstractLBServiceTes private static String recordId = UUID.randomUUID().toString(); + @Rule + public RunTestOnContext rule = new RunTestOnContext(); + private static RawRecord rawMarcRecord; private static ParsedRecord parsedMarcRecord; @@ -166,7 +171,7 @@ private void sendEventWithSavedMarcRecordCollectionPayloadAfterProcessingParsedR } @Test - public void shouldSendDIErrorEventsWhenParsedRecordChunkWasNotSaved() throws InterruptedException { + public void shouldSendDIErrorEventsWhenParsedRecordChunkWasNotSaved(TestContext context) throws InterruptedException { Record validRecord = TestMocks.getRecord(0).withSnapshotId(snapshotId); Record additionalRecord = getAdditionalRecord(validRecord, snapshotId, validRecord.getRecordType()); List records = List.of(validRecord, additionalRecord); @@ -174,7 +179,8 @@ public void shouldSendDIErrorEventsWhenParsedRecordChunkWasNotSaved() throws Int sendRecordsToKafka(jobExecutionId, records); - check_DI_ERROR_eventsSent(jobExecutionId, records, "ERROR: insert or update on table \"raw_records_lb\" violates foreign key constraint \"fk_raw_records_records\"" ); + check_DI_ERROR_eventsSent(jobExecutionId, records, + "ERROR: insert or update on table \"raw_records_lb\" violates foreign key constraint \"fk_raw_records_records\""); } @Test