From 86a863a47ae3a93ab9b53a49d0126bd6f8dc75c9 Mon Sep 17 00:00:00 2001 From: Volodymyr_Rohach Date: Thu, 21 Nov 2024 15:03:39 +0100 Subject: [PATCH] MODSOURCE-824: Implementation done. --- .../main/java/org/folio/dao/RecordDao.java | 3 +- .../java/org/folio/dao/RecordDaoImpl.java | 552 +++++++++--------- .../org/folio/services/RecordServiceImpl.java | 3 +- 3 files changed, 284 insertions(+), 274 deletions(-) diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java index 723f2a406..7ca5ec355 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDao.java @@ -70,10 +70,11 @@ public interface RecordDao { * @param externalIds list of ids * @param idType external id type on which source record will be searched * @param recordType record type + * @param includeDeleted searching by deleted records * @param tenantId tenant id * @return {@link Future} of {@link StrippedParsedRecordCollection} */ - Future getStrippedParsedRecords(List externalIds, IdType idType, RecordType recordType, String tenantId); + Future getStrippedParsedRecords(List externalIds, IdType idType, RecordType recordType, Boolean includeDeleted, String tenantId); /** * Searches for {@link Record} by {@link MatchField} with offset and limit diff --git a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java index e8441c9b1..cfb22242d 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/dao/RecordDaoImpl.java @@ -49,6 +49,7 @@ import io.vertx.reactivex.pgclient.PgPool; import io.vertx.reactivex.sqlclient.SqlConnection; import io.vertx.sqlclient.Row; + import java.sql.Connection; import java.sql.SQLException; import java.time.OffsetDateTime; @@ -67,6 +68,7 @@ import java.util.stream.Collectors; import javax.ws.rs.BadRequestException; import javax.ws.rs.NotFoundException; + import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.expression.BinaryExpression; import net.sf.jsqlparser.expression.Expression; @@ -210,30 +212,30 @@ public class RecordDaoImpl implements RecordDao { private static final String DELETE_MARC_INDEXERS_TEMP_TABLE = "marc_indexers_deleted_ids"; private static final String DELETE_OLD_MARC_INDEXERS_SQL = "WITH deleted_rows AS ( " + - " delete from marc_indexers mi " + - " where exists( " + - " select 1 " + - " from " + MARC_RECORDS_TRACKING.getName() + " mrt " + - " where mrt.is_dirty = true " + - " and mrt.marc_id = mi.marc_id " + - " and mrt.version > mi.version " + - " ) " + - " returning mi.marc_id), " + - "deleted_rows2 AS ( " + - " delete from marc_indexers mi " + - " where exists( " + - " select 1 " + - " from records_lb " + - " where records_lb.id = mi.marc_id " + - " and records_lb.state = 'OLD' " + - " ) " + - " returning mi.marc_id) " + - "INSERT INTO " + DELETE_MARC_INDEXERS_TEMP_TABLE + " " + - "SELECT DISTINCT marc_id " + - "FROM deleted_rows " + - "UNION " + - "SELECT marc_id " + - "FROM deleted_rows2"; + " delete from marc_indexers mi " + + " where exists( " + + " select 1 " + + " from " + MARC_RECORDS_TRACKING.getName() + " mrt " + + " where mrt.is_dirty = true " + + " and mrt.marc_id = mi.marc_id " + + " and mrt.version > mi.version " + + " ) " + + " returning mi.marc_id), " + + "deleted_rows2 AS ( " + + " delete from marc_indexers mi " + + " where exists( " + + " select 1 " + + " from records_lb " + + " where records_lb.id = mi.marc_id " + + " and records_lb.state = 'OLD' " + + " ) " + + " returning mi.marc_id) " + + "INSERT INTO " + DELETE_MARC_INDEXERS_TEMP_TABLE + " " + + "SELECT DISTINCT marc_id " + + "FROM deleted_rows " + + "UNION " + + "SELECT marc_id " + + "FROM deleted_rows2"; public static final String OR = " or "; public static final String MARC_INDEXERS = "marc_indexers"; public static final Field MARC_INDEXERS_MARC_ID = field(TABLE_FIELD_TEMPLATE, UUID.class, field(MARC_INDEXERS), field(MARC_ID)); @@ -292,11 +294,17 @@ public Future getRecords(Condition condition, RecordType recor } @Override - public Future getStrippedParsedRecords(List externalIds, IdType idType, RecordType recordType, String tenantId) { + public Future getStrippedParsedRecords(List externalIds, IdType idType, RecordType recordType, Boolean includeDeleted, String tenantId) { Name cte = name(CTE); Name prt = name(recordType.getTableName()); - Condition condition = RecordDaoUtil.getExternalIdsCondition(externalIds, idType) - .and(RECORDS_LB.STATE.eq(RecordState.ACTUAL)); + Condition condition; + if (includeDeleted != null && includeDeleted) { + condition = RecordDaoUtil.getExternalIdsCondition(externalIds, idType) + .and(RECORDS_LB.STATE.eq(RecordState.ACTUAL).or(RECORDS_LB.STATE.eq(RecordState.DELETED))); + } else { + condition = RecordDaoUtil.getExternalIdsCondition(externalIds, idType) + .and(RECORDS_LB.STATE.eq(RecordState.ACTUAL)); + } return getQueryExecutor(tenantId).transaction(txQE -> txQE.query(dsl -> dsl .with(cte.as(dsl.selectCount() .from(RECORDS_LB) @@ -622,7 +630,7 @@ private void appendWhere(SelectJoinStep step, ParseLeaderResult parseLeaderResul : DSL.noCondition(); Condition fieldsCondition = parseFieldsResult.isEnabled() ? exists(select(field("*")).from("cte") - .where("records_lb.id = cte.marc_id")) + .where("records_lb.id = cte.marc_id")) : DSL.noCondition(); step.where(leaderCondition) .and(fieldsCondition) @@ -685,9 +693,9 @@ public Future getMatchedRecordsIdentifiers(MatchFi countQuery = select(countDistinct(RECORDS_LB.ID)) .from(RECORDS_LB) .innerJoin(marcIndexersPartitionTable) - .on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID)))) + .on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID)))) .innerJoin(MARC_RECORDS_TRACKING) - .on(MARC_RECORDS_TRACKING.MARC_ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID))) + .on(MARC_RECORDS_TRACKING.MARC_ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID))) .and(MARC_RECORDS_TRACKING.VERSION.eq(field(TABLE_FIELD_TEMPLATE, Integer.class, marcIndexersPartitionTable, name(VERSION))))) .where(filterRecordByType(typeConnection.getRecordType().value()) .and(filterRecordByState(Record.State.ACTUAL.value())) @@ -702,9 +710,9 @@ public Future getMatchedRecordsIdentifiers(MatchFi .distinctOn(RECORDS_LB.ID) .from(RECORDS_LB) .innerJoin(marcIndexersPartitionTable) - .on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID)))) + .on(RECORDS_LB.ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID)))) .innerJoin(MARC_RECORDS_TRACKING) - .on(MARC_RECORDS_TRACKING.MARC_ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID))) + .on(MARC_RECORDS_TRACKING.MARC_ID.eq(field(TABLE_FIELD_TEMPLATE, UUID.class, marcIndexersPartitionTable, name(MARC_ID))) .and(MARC_RECORDS_TRACKING.VERSION.eq(field(TABLE_FIELD_TEMPLATE, Integer.class, marcIndexersPartitionTable, name(VERSION))))) .where(filterRecordByType(typeConnection.getRecordType().value()) .and(filterRecordByState(Record.State.ACTUAL.value())) @@ -794,211 +802,210 @@ public Future saveRecords(RecordCollection recordCollectio logRecordCollection("saveRecords:: Saving", recordCollection, tenantId); Promise finalPromise = Promise.promise(); Context context = Vertx.currentContext(); - if(context == null) return Future.failedFuture("saveRecords must be executed by a Vertx thread"); + if (context == null) return Future.failedFuture("saveRecords must be executed by a Vertx thread"); context.owner().executeBlocking(promise -> { - Set matchedIds = new HashSet<>(); - Set snapshotIds = new HashSet<>(); - Set recordTypes = new HashSet<>(); - - List dbRecords = new ArrayList<>(); - List dbRawRecords = new ArrayList<>(); - List> dbParsedRecords = new ArrayList<>(); - List dbErrorRecords = new ArrayList<>(); - - List errorMessages = new ArrayList<>(); - - recordCollection.getRecords() - .stream() - .map(RecordDaoUtil::ensureRecordHasId) - .map(RecordDaoUtil::ensureRecordHasMatchedId) - .map(RecordDaoUtil::ensureRecordHasSuppressDiscovery) - .map(RecordDaoUtil::ensureRecordForeignKeys) - .forEach(record -> { - // collect unique matched ids to query to determine generation - matchedIds.add(UUID.fromString(record.getMatchedId())); - - // make sure only one snapshot id - snapshotIds.add(record.getSnapshotId()); - if (snapshotIds.size() > 1) { - throw new BadRequestException("Batch record collection only supports single snapshot"); - } + Set matchedIds = new HashSet<>(); + Set snapshotIds = new HashSet<>(); + Set recordTypes = new HashSet<>(); - if(Objects.nonNull(record.getRecordType())) { - recordTypes.add(record.getRecordType().name()); - } else { - throw new BadRequestException(StringUtils.defaultIfEmpty(record.getErrorRecord().getDescription(), String.format("Record with id %s has not record type", record.getId()))); - } + List dbRecords = new ArrayList<>(); + List dbRawRecords = new ArrayList<>(); + List> dbParsedRecords = new ArrayList<>(); + List dbErrorRecords = new ArrayList<>(); - // make sure only one record type - if (recordTypes.size() > 1) { - throw new BadRequestException("Batch record collection only supports single record type"); - } + List errorMessages = new ArrayList<>(); - // if record has parsed record, validate by attempting format - if (Objects.nonNull(record.getParsedRecord())) { - try { - RecordType recordType = toRecordType(record.getRecordType().name()); - recordType.formatRecord(record); - Record2 dbParsedRecord = recordType.toDatabaseRecord2(record.getParsedRecord()); - dbParsedRecords.add(dbParsedRecord); - } catch (Exception e) { - // create error record and remove from record - Object content = Objects.nonNull(record.getParsedRecord()) - ? record.getParsedRecord().getContent() - : null; - ErrorRecord errorRecord = new ErrorRecord() - .withId(record.getId()) - .withDescription(e.getMessage()) - .withContent(content); - errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); - record.withErrorRecord(errorRecord) - .withParsedRecord(null) - .withLeaderRecordStatus(null); + recordCollection.getRecords() + .stream() + .map(RecordDaoUtil::ensureRecordHasId) + .map(RecordDaoUtil::ensureRecordHasMatchedId) + .map(RecordDaoUtil::ensureRecordHasSuppressDiscovery) + .map(RecordDaoUtil::ensureRecordForeignKeys) + .forEach(record -> { + // collect unique matched ids to query to determine generation + matchedIds.add(UUID.fromString(record.getMatchedId())); + + // make sure only one snapshot id + snapshotIds.add(record.getSnapshotId()); + if (snapshotIds.size() > 1) { + throw new BadRequestException("Batch record collection only supports single snapshot"); } - } - if (Objects.nonNull(record.getRawRecord())) { - dbRawRecords.add(RawRecordDaoUtil.toDatabaseRawRecord(record.getRawRecord())); - } - if (Objects.nonNull(record.getErrorRecord())) { - dbErrorRecords.add(ErrorRecordDaoUtil.toDatabaseErrorRecord(record.getErrorRecord())); - } - dbRecords.add(RecordDaoUtil.toDatabaseRecord(record)); - }); - UUID snapshotId = UUID.fromString(snapshotIds.stream().findFirst().orElseThrow()); - - RecordType recordType = toRecordType(recordTypes.stream().findFirst().orElseThrow()); + if (Objects.nonNull(record.getRecordType())) { + recordTypes.add(record.getRecordType().name()); + } else { + throw new BadRequestException(StringUtils.defaultIfEmpty(record.getErrorRecord().getDescription(), String.format("Record with id %s has not record type", record.getId()))); + } - try (Connection connection = getConnection(tenantId)) { - DSL.using(connection).transaction(ctx -> { - DSLContext dsl = DSL.using(ctx); + // make sure only one record type + if (recordTypes.size() > 1) { + throw new BadRequestException("Batch record collection only supports single record type"); + } - // validate snapshot - Optional snapshot = DSL.using(ctx).selectFrom(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(snapshotId)) - .fetchOptional(); - if (snapshot.isPresent()) { - if (Objects.isNull(snapshot.get().getProcessingStartedDate())) { - throw new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.get().getStatus())); + // if record has parsed record, validate by attempting format + if (Objects.nonNull(record.getParsedRecord())) { + try { + RecordType recordType = toRecordType(record.getRecordType().name()); + recordType.formatRecord(record); + Record2 dbParsedRecord = recordType.toDatabaseRecord2(record.getParsedRecord()); + dbParsedRecords.add(dbParsedRecord); + } catch (Exception e) { + // create error record and remove from record + Object content = Objects.nonNull(record.getParsedRecord()) + ? record.getParsedRecord().getContent() + : null; + ErrorRecord errorRecord = new ErrorRecord() + .withId(record.getId()) + .withDescription(e.getMessage()) + .withContent(content); + errorMessages.add(format(INVALID_PARSED_RECORD_MESSAGE_TEMPLATE, record.getId(), e.getMessage())); + record.withErrorRecord(errorRecord) + .withParsedRecord(null) + .withLeaderRecordStatus(null); + } } - } else { - throw new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, snapshotId)); - } + if (Objects.nonNull(record.getRawRecord())) { + dbRawRecords.add(RawRecordDaoUtil.toDatabaseRawRecord(record.getRawRecord())); + } + if (Objects.nonNull(record.getErrorRecord())) { + dbErrorRecords.add(ErrorRecordDaoUtil.toDatabaseErrorRecord(record.getErrorRecord())); + } + dbRecords.add(RecordDaoUtil.toDatabaseRecord(record)); + }); - List ids = new ArrayList<>(); - Map matchedGenerations = new HashMap<>(); + UUID snapshotId = UUID.fromString(snapshotIds.stream().findFirst().orElseThrow()); - // lookup latest generation by matched id and committed snapshot updated before current snapshot - dsl.select(RECORDS_LB.MATCHED_ID, RECORDS_LB.ID, RECORDS_LB.GENERATION) - .distinctOn(RECORDS_LB.MATCHED_ID) - .from(RECORDS_LB) - .innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID)) - .where(RECORDS_LB.MATCHED_ID.in(matchedIds) - .and(SNAPSHOTS_LB.STATUS.in(JobExecutionStatus.COMMITTED, JobExecutionStatus.ERROR, JobExecutionStatus.CANCELLED)) - .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl - .select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) - .from(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(snapshotId))))) - .orderBy(RECORDS_LB.MATCHED_ID.asc(), RECORDS_LB.GENERATION.desc()) - .fetchStream().forEach(r -> { - UUID id = r.get(RECORDS_LB.ID); - UUID matchedId = r.get(RECORDS_LB.MATCHED_ID); - int generation = r.get(RECORDS_LB.GENERATION); - ids.add(id); - matchedGenerations.put(matchedId, generation); - }); + RecordType recordType = toRecordType(recordTypes.stream().findFirst().orElseThrow()); - // update matching records state - if(!ids.isEmpty()) - { - dsl.update(RECORDS_LB) - .set(RECORDS_LB.STATE, RecordState.OLD) - .where(RECORDS_LB.ID.in(ids)) - .execute(); - } + try (Connection connection = getConnection(tenantId)) { + DSL.using(connection).transaction(ctx -> { + DSLContext dsl = DSL.using(ctx); - // batch insert records updating generation if required - List recordsLoadingErrors = dsl.loadInto(RECORDS_LB) - .batchAfter(1000) - .bulkAfter(500) - .commitAfter(1000) - .onErrorAbort() - .loadRecords(dbRecords.stream().map(record -> { - Integer generation = matchedGenerations.get(record.getMatchedId()); - if (Objects.nonNull(generation)) { - record.setGeneration(generation + 1); - } else if (Objects.isNull(record.getGeneration())) { - record.setGeneration(0); + // validate snapshot + Optional snapshot = DSL.using(ctx).selectFrom(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(snapshotId)) + .fetchOptional(); + if (snapshot.isPresent()) { + if (Objects.isNull(snapshot.get().getProcessingStartedDate())) { + throw new BadRequestException(format(SNAPSHOT_NOT_STARTED_MESSAGE_TEMPLATE, snapshot.get().getStatus())); } - return record; - }).collect(Collectors.toList())) - .fieldsCorresponding() - .execute() - .errors(); - - recordsLoadingErrors.forEach(error -> { - if (error.exception().sqlState().equals(UNIQUE_VIOLATION_SQL_STATE)) { - throw new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record"); + } else { + throw new NotFoundException(format(SNAPSHOT_NOT_FOUND_TEMPLATE, snapshotId)); } - LOG.warn("saveRecords:: Error occurred on batch execution: {}", error.exception().getCause().getMessage()); - LOG.debug("saveRecords:: Failed to execute statement from batch: {}", error.query()); - }); - // batch insert raw records - dsl.loadInto(RAW_RECORDS_LB) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbRawRecords) - .fieldsCorresponding() - .execute(); - - // batch insert parsed records - recordType.toLoaderOptionsStep(dsl) - .batchAfter(250) - .commitAfter(1000) - .onDuplicateKeyUpdate() - .onErrorAbort() - .loadRecords(dbParsedRecords) - .fieldsCorresponding() - .execute(); - - if (!dbErrorRecords.isEmpty()) { - // batch insert error records - dsl.loadInto(ERROR_RECORDS_LB) + List ids = new ArrayList<>(); + Map matchedGenerations = new HashMap<>(); + + // lookup latest generation by matched id and committed snapshot updated before current snapshot + dsl.select(RECORDS_LB.MATCHED_ID, RECORDS_LB.ID, RECORDS_LB.GENERATION) + .distinctOn(RECORDS_LB.MATCHED_ID) + .from(RECORDS_LB) + .innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID)) + .where(RECORDS_LB.MATCHED_ID.in(matchedIds) + .and(SNAPSHOTS_LB.STATUS.in(JobExecutionStatus.COMMITTED, JobExecutionStatus.ERROR, JobExecutionStatus.CANCELLED)) + .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl + .select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) + .from(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(snapshotId))))) + .orderBy(RECORDS_LB.MATCHED_ID.asc(), RECORDS_LB.GENERATION.desc()) + .fetchStream().forEach(r -> { + UUID id = r.get(RECORDS_LB.ID); + UUID matchedId = r.get(RECORDS_LB.MATCHED_ID); + int generation = r.get(RECORDS_LB.GENERATION); + ids.add(id); + matchedGenerations.put(matchedId, generation); + }); + + // update matching records state + if (!ids.isEmpty()) { + dsl.update(RECORDS_LB) + .set(RECORDS_LB.STATE, RecordState.OLD) + .where(RECORDS_LB.ID.in(ids)) + .execute(); + } + + // batch insert records updating generation if required + List recordsLoadingErrors = dsl.loadInto(RECORDS_LB) + .batchAfter(1000) + .bulkAfter(500) + .commitAfter(1000) + .onErrorAbort() + .loadRecords(dbRecords.stream().map(record -> { + Integer generation = matchedGenerations.get(record.getMatchedId()); + if (Objects.nonNull(generation)) { + record.setGeneration(generation + 1); + } else if (Objects.isNull(record.getGeneration())) { + record.setGeneration(0); + } + return record; + }).collect(Collectors.toList())) + .fieldsCorresponding() + .execute() + .errors(); + + recordsLoadingErrors.forEach(error -> { + if (error.exception().sqlState().equals(UNIQUE_VIOLATION_SQL_STATE)) { + throw new DuplicateEventException("SQL Unique constraint violation prevented repeatedly saving the record"); + } + LOG.warn("saveRecords:: Error occurred on batch execution: {}", error.exception().getCause().getMessage()); + LOG.debug("saveRecords:: Failed to execute statement from batch: {}", error.query()); + }); + + // batch insert raw records + dsl.loadInto(RAW_RECORDS_LB) .batchAfter(250) .commitAfter(1000) .onDuplicateKeyUpdate() .onErrorAbort() - .loadRecords(dbErrorRecords) + .loadRecords(dbRawRecords) .fieldsCorresponding() .execute(); - } - promise.complete(new RecordsBatchResponse() - .withRecords(recordCollection.getRecords()) - .withTotalRecords(recordCollection.getRecords().size()) - .withErrorMessages(errorMessages)); - }); - } catch (DuplicateEventException e) { - LOG.info("saveRecords:: Skipped saving records due to duplicate event: {}", e.getMessage()); - promise.fail(e); - } catch (SQLException | DataAccessException e) { - LOG.warn("saveRecords:: Failed to save records", e); - promise.fail(e.getCause()); - } - }, - false, - r -> { - if (r.failed()) { - LOG.warn("saveRecords:: Error during batch record save", r.cause()); - finalPromise.fail(r.cause()); - } else { - LOG.debug("saveRecords:: batch record save was successful"); - finalPromise.complete(r.result()); - } - }); + // batch insert parsed records + recordType.toLoaderOptionsStep(dsl) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbParsedRecords) + .fieldsCorresponding() + .execute(); + + if (!dbErrorRecords.isEmpty()) { + // batch insert error records + dsl.loadInto(ERROR_RECORDS_LB) + .batchAfter(250) + .commitAfter(1000) + .onDuplicateKeyUpdate() + .onErrorAbort() + .loadRecords(dbErrorRecords) + .fieldsCorresponding() + .execute(); + } + + promise.complete(new RecordsBatchResponse() + .withRecords(recordCollection.getRecords()) + .withTotalRecords(recordCollection.getRecords().size()) + .withErrorMessages(errorMessages)); + }); + } catch (DuplicateEventException e) { + LOG.info("saveRecords:: Skipped saving records due to duplicate event: {}", e.getMessage()); + promise.fail(e); + } catch (SQLException | DataAccessException e) { + LOG.warn("saveRecords:: Failed to save records", e); + promise.fail(e.getCause()); + } + }, + false, + r -> { + if (r.failed()) { + LOG.warn("saveRecords:: Error during batch record save", r.cause()); + finalPromise.fail(r.cause()); + } else { + LOG.debug("saveRecords:: batch record save was successful"); + finalPromise.complete(r.result()); + } + }); return finalPromise.future() .onSuccess(response -> response.getRecords() @@ -1011,9 +1018,9 @@ public Future updateRecord(Record record, Map okapiHeade var tenantId = okapiHeaders.get(OKAPI_TENANT_HEADER); LOG.trace("updateRecord:: Updating {} record {} for tenant {}", record.getRecordType(), record.getId(), tenantId); return getQueryExecutor(tenantId).transaction(txQE -> getRecordById(txQE, record.getId()) - .compose(optionalRecord -> optionalRecord - .map(r -> insertOrUpdateRecord(txQE, record)) - .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))) + .compose(optionalRecord -> optionalRecord + .map(r -> insertOrUpdateRecord(txQE, record)) + .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_TEMPLATE, record.getId())))))) .onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(updated, okapiHeaders)); } @@ -1092,35 +1099,35 @@ public Future> getSourceRecordByExternalId(String externa public Future> getSourceRecordByCondition(Condition condition, String tenantId) { return getQueryExecutor(tenantId) .transaction(txQE -> txQE.findOneRow(dsl -> dsl.selectFrom(RECORDS_LB) - .where(condition)) - .map(RecordDaoUtil::toOptionalRecord) - .compose(optionalRecord -> { - if (optionalRecord.isPresent()) { - return lookupAssociatedRecords(txQE, optionalRecord.get(), false) - .map(RecordDaoUtil::toSourceRecord) - .map(sourceRecord -> { - if (Objects.nonNull(sourceRecord.getParsedRecord())) { - return Optional.of(sourceRecord); - } - return Optional.empty(); - }); - } - return Future.succeededFuture(Optional.empty()); - })); + .where(condition)) + .map(RecordDaoUtil::toOptionalRecord) + .compose(optionalRecord -> { + if (optionalRecord.isPresent()) { + return lookupAssociatedRecords(txQE, optionalRecord.get(), false) + .map(RecordDaoUtil::toSourceRecord) + .map(sourceRecord -> { + if (Objects.nonNull(sourceRecord.getParsedRecord())) { + return Optional.of(sourceRecord); + } + return Optional.empty(); + }); + } + return Future.succeededFuture(Optional.empty()); + })); } @Override public Future calculateGeneration(ReactiveClassicGenericQueryExecutor txQE, Record record) { return txQE.query(dsl -> dsl.select(max(RECORDS_LB.GENERATION).as(RECORDS_LB.GENERATION)) - .from(RECORDS_LB.innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID))) - .where(RECORDS_LB.MATCHED_ID.eq(UUID.fromString(record.getMatchedId())) - .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl.select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) - .from(SNAPSHOTS_LB) - .where(SNAPSHOTS_LB.ID.eq(UUID.fromString(record.getSnapshotId()))))))) - .map(res -> { - Integer generation = res.get(RECORDS_LB.GENERATION); - return Objects.nonNull(generation) ? ++generation : 0; - }); + .from(RECORDS_LB.innerJoin(SNAPSHOTS_LB).on(RECORDS_LB.SNAPSHOT_ID.eq(SNAPSHOTS_LB.ID))) + .where(RECORDS_LB.MATCHED_ID.eq(UUID.fromString(record.getMatchedId())) + .and(SNAPSHOTS_LB.UPDATED_DATE.lessThan(dsl.select(SNAPSHOTS_LB.PROCESSING_STARTED_DATE) + .from(SNAPSHOTS_LB) + .where(SNAPSHOTS_LB.ID.eq(UUID.fromString(record.getSnapshotId()))))))) + .map(res -> { + Integer generation = res.get(RECORDS_LB.GENERATION); + return Objects.nonNull(generation) ? ++generation : 0; + }); } @Override @@ -1129,9 +1136,9 @@ public Future updateParsedRecord(Record record, Map GenericCompositeFuture.all(Lists.newArrayList( - updateExternalIdsForRecord(txQE, record), - ParsedRecordDaoUtil.update(txQE, record.getParsedRecord(), ParsedRecordDaoUtil.toRecordType(record)) - )).onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(record, okapiHeaders)) + updateExternalIdsForRecord(txQE, record), + ParsedRecordDaoUtil.update(txQE, record.getParsedRecord(), ParsedRecordDaoUtil.toRecordType(record)) + )).onSuccess(updated -> recordDomainEventPublisher.publishRecordUpdated(record, okapiHeaders)) .map(res -> record.getParsedRecord())); } @@ -1141,7 +1148,7 @@ public Future updateParsedRecords(RecordCollection r logRecordCollection("updateParsedRecords:: Updating", recordCollection, tenantId); Promise promise = Promise.promise(); Context context = Vertx.currentContext(); - if(context == null) return Future.failedFuture("updateParsedRecords must be called by a vertx thread"); + if (context == null) return Future.failedFuture("updateParsedRecords must be called by a vertx thread"); var recordsUpdated = new ArrayList(); context.owner().executeBlocking(blockingPromise -> @@ -1181,8 +1188,8 @@ public Future updateParsedRecords(RecordCollection r String externalId = getExternalId(externalIdsHolder, recordType); String externalHrid = getExternalHrid(externalIdsHolder, recordType); if (StringUtils.isNotEmpty(externalId)) { - updateStep = updateFirstStep - .set(RECORDS_LB.EXTERNAL_ID, UUID.fromString(externalId)); + updateStep = updateFirstStep + .set(RECORDS_LB.EXTERNAL_ID, UUID.fromString(externalId)); } if (StringUtils.isNotEmpty(externalHrid)) { updateStep = (Objects.isNull(updateStep) ? updateFirstStep : updateStep) @@ -1283,16 +1290,16 @@ public Future updateParsedRecords(RecordCollection r blockingPromise.fail(e); } }, - false, - result -> { - if (result.failed()) { - LOG.warn("updateParsedRecords:: Error during update of parsed records", result.cause()); - promise.fail(result.cause()); - } else { - LOG.debug("updateParsedRecords:: Parsed records update was successful"); - promise.complete(result.result()); - } - }); + false, + result -> { + if (result.failed()) { + LOG.warn("updateParsedRecords:: Error during update of parsed records", result.cause()); + promise.fail(result.cause()); + } else { + LOG.debug("updateParsedRecords:: Parsed records update was successful"); + promise.complete(result.result()); + } + }); return promise.future() .onSuccess(response -> @@ -1304,24 +1311,24 @@ public Future updateParsedRecords(RecordCollection r public Future> getRecordByExternalId(String externalId, IdType idType, String tenantId) { return getQueryExecutor(tenantId) - .transaction(txQE -> getRecordByExternalId(txQE, externalId, idType)); + .transaction(txQE -> getRecordByExternalId(txQE, externalId, idType)); } @Override public Future> getRecordByExternalId(ReactiveClassicGenericQueryExecutor txQE, String externalId, IdType idType) { Condition condition = RecordDaoUtil.getExternalIdCondition(externalId, idType) - .and(RECORDS_LB.STATE.eq(RecordState.ACTUAL) - .or(RECORDS_LB.STATE.eq(RecordState.DELETED))); + .and(RECORDS_LB.STATE.eq(RecordState.ACTUAL) + .or(RECORDS_LB.STATE.eq(RecordState.DELETED))); return txQE.findOneRow(dsl -> dsl.selectFrom(RECORDS_LB) - .where(condition) - .orderBy(RECORDS_LB.GENERATION.sort(SortOrder.DESC)) - .limit(1)) - .map(RecordDaoUtil::toOptionalRecord) - .compose(optionalRecord -> optionalRecord - .map(record -> lookupAssociatedRecords(txQE, record, false).map(Optional::of)) - .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_BY_ID_TYPE, idType, externalId))))) - .onFailure(v -> txQE.rollback()); + .where(condition) + .orderBy(RECORDS_LB.GENERATION.sort(SortOrder.DESC)) + .limit(1)) + .map(RecordDaoUtil::toOptionalRecord) + .compose(optionalRecord -> optionalRecord + .map(record -> lookupAssociatedRecords(txQE, record, false).map(Optional::of)) + .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_BY_ID_TYPE, idType, externalId))))) + .onFailure(v -> txQE.rollback()); } @Override @@ -1364,10 +1371,10 @@ public Future saveUpdatedRecord(ReactiveClassicGenericQueryExecutor txQE public Future updateSuppressFromDiscoveryForRecord(String id, IdType idType, Boolean suppress, String tenantId) { LOG.trace("updateSuppressFromDiscoveryForRecord:: Updating suppress from discovery with value {} for record with {} {} for tenant {}", suppress, idType, id, tenantId); return getQueryExecutor(tenantId).transaction(txQE -> getRecordByExternalId(txQE, id, idType) - .compose(optionalRecord -> optionalRecord - .map(record -> RecordDaoUtil.update(txQE, record.withAdditionalInfo(record.getAdditionalInfo().withSuppressDiscovery(suppress)))) - .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_BY_ID_TYPE, idType, id)))))) - .map(u -> true); + .compose(optionalRecord -> optionalRecord + .map(record -> RecordDaoUtil.update(txQE, record.withAdditionalInfo(record.getAdditionalInfo().withSuppressDiscovery(suppress)))) + .orElse(Future.failedFuture(new NotFoundException(format(RECORD_NOT_FOUND_BY_ID_TYPE, idType, id)))))) + .map(u -> true); } @Override @@ -1410,6 +1417,7 @@ public Future deleteRecords(int lastUpdatedDays, int limit, String tenantI /** * Deletes old versions of Marc Indexers based on tenant ID. + * * @param tenantId The ID of the tenant for which the Marc Indexers are being deleted. * @return A Future of Boolean that completes successfully with a value of 'true' if the deletion was successful, * or 'false' if it was not. diff --git a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java index 5b662d7ee..c39dba0c0 100644 --- a/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java +++ b/mod-source-record-storage-server/src/main/java/org/folio/services/RecordServiceImpl.java @@ -274,7 +274,8 @@ public Future fetchStrippedParsedRecords(FetchPa } var recordType = toRecordType(fetchRequest.getRecordType().name()); - return recordDao.getStrippedParsedRecords(ids, idType, recordType, tenantId) + var includeDeleted = fetchRequest.getIncludeDeleted(); + return recordDao.getStrippedParsedRecords(ids, idType, recordType, includeDeleted, tenantId) .onComplete(records -> filterFieldsByDataRange(records, fetchRequest)) .onFailure(ex -> { LOG.warn("fetchParsedRecords:: Failed to fetch parsed records. {}", ex.getMessage());