Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/MODQM-407' into MODQM-407
Browse files Browse the repository at this point in the history
  • Loading branch information
TsaghikKhachatryan committed Mar 18, 2024
2 parents e5650a9 + d8c5e51 commit cb3ae5f
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 134 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
* [MODSOURMAN-1122](https://issues.folio.org/browse/MODSOURMAN-1122) Add additional check for the childSnapshotWrappers
* [MODSOURMAN-1140](https://folio-org.atlassian.net/browse/MODSOURMAN-1140) Invalidate cache before saving new parsed content in cache
* [MODSOURMAN-1133](https://folio-org.atlassian.net/browse/MODSOURMAN-1133) Adjust SQL condition to include DISCARDED holding and items
* [MODDATAIMP-1001](https://folio-org.atlassian.net/browse/MODDATAIMP-1001) Remove 999 validation for instance creation
* [MODSOURMAN-956](https://folio-org.atlassian.net/browse/MODSOURMAN-956) Stop processing the job with incorrect profile

## 2023-10-13 v3.7.0
* [MODSOURMAN-1045](https://issues.folio.org/browse/MODSOURMAN-1045) Allow create action with non-matches for instance without match profile
Expand Down
2 changes: 1 addition & 1 deletion mod-source-record-manager-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<version>3.5.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
Expand Down
12 changes: 6 additions & 6 deletions mod-source-record-manager-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
<dependency>
<groupId>org.folio</groupId>
<artifactId>data-import-processing-core</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.2.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.folio</groupId>
Expand Down Expand Up @@ -285,7 +285,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<version>3.5.0</version>
<executions>
<execution>
<id>add_generated_sources_folder</id>
Expand All @@ -305,7 +305,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.8.1</version>
<version>2.16.2</version>
<configuration>
<generateBackupPoms>false</generateBackupPoms>
</configuration>
Expand Down Expand Up @@ -396,7 +396,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.2.0</version>
<version>3.3.1</version>
<executions>
<execution>
<id>copy-resources</id>
Expand Down Expand Up @@ -437,7 +437,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<version>3.5.2</version>
<executions>
<execution>
<phase>package</phase>
Expand Down Expand Up @@ -472,7 +472,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.3</version>
<version>3.0.1</version>
<configuration>
<preparationGoals>clean verify</preparationGoals>
<tagNameFormat>v@{project.version}</tagNameFormat>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private RecordProcessingLogDto mapJobLogEntryRow(Row row) {
.withSourceRecordType(entityType)
.withJobExecutionId(row.getValue(JOB_EXECUTION_ID).toString())
.withIncomingRecordId(row.getValue(INCOMING_RECORD_ID).toString())
.withSourceRecordId(row.getValue(SOURCE_ID).toString())
.withSourceRecordId(row.getValue(SOURCE_ID) != null ? row.getValue(SOURCE_ID).toString() : null)
.withSourceRecordOrder(isEmpty(row.getString(INVOICE_ACTION_STATUS))
? row.getInteger(SOURCE_RECORD_ORDER).toString()
: row.getString(INVOICE_LINE_NUMBER))
Expand Down Expand Up @@ -553,48 +553,48 @@ private static RecordProcessingLogDtoCollection processMultipleHoldingsAndItemsI
if (!ifNeedToMerge(entries)) {
return recordProcessingLogDto;
}
Map<String, List<ProcessedHoldingsInfo>> relatedHoldingsInfoBySourceRecordId =
Map<String, List<ProcessedHoldingsInfo>> relatedHoldingsInfoByIncomingRecordId =
entries.stream()
.collect(Collectors.groupingBy(
RecordProcessingLogDto::getSourceRecordId,
RecordProcessingLogDto::getIncomingRecordId,
Collectors.mapping(RecordProcessingLogDto::getRelatedHoldingsInfo,
Collectors.flatMapping(List::stream, toList())
)));

Map<String, List<ProcessedItemInfo>> relatedItemInfoBySourceId =
Map<String, List<ProcessedItemInfo>> relatedItemInfoByIncomingRecordId =
entries.stream()
.collect(Collectors.groupingBy(
RecordProcessingLogDto::getSourceRecordId,
RecordProcessingLogDto::getIncomingRecordId,
Collectors.mapping(RecordProcessingLogDto::getRelatedItemInfo,
Collectors.flatMapping(List::stream, toList())
)));

List<RecordProcessingLogDto> mergedEntries = relatedHoldingsInfoBySourceRecordId.entrySet()
List<RecordProcessingLogDto> mergedEntries = relatedHoldingsInfoByIncomingRecordId.entrySet()
.stream().map(e -> {
String sourceRecordId = e.getKey();
List<ProcessedItemInfo> relatedItemInfos = relatedItemInfoBySourceId.get(sourceRecordId);
String incomingRecordId = e.getKey();
List<ProcessedItemInfo> relatedItemInfos = relatedItemInfoByIncomingRecordId.get(incomingRecordId);

RecordProcessingLogDto firstRecordWithCurrentSourceId = entries.stream()
.filter(record -> record.getSourceRecordId().equals(sourceRecordId))
RecordProcessingLogDto firstRecordWithCurrentIncomingRecordId = entries.stream()
.filter(record -> record.getIncomingRecordId().equals(incomingRecordId))
.findFirst().orElseGet(RecordProcessingLogDto::new);

return firstRecordWithCurrentSourceId
return firstRecordWithCurrentIncomingRecordId
.withRelatedHoldingsInfo(e.getValue().stream().distinct().toList())
.withRelatedItemInfo(relatedItemInfos.stream().distinct().toList());
}).collect(toList());
return recordProcessingLogDto.withEntries(mergedEntries);
}

private static boolean ifNeedToMerge(List<RecordProcessingLogDto> entries) {
Map<String, Long> sourceRecordIdCounts = entries.stream()
Map<String, Long> holdingsIncomingRecordIdCounts = entries.stream()
.filter(e -> e.getRelatedHoldingsInfo() != null && !e.getRelatedHoldingsInfo().isEmpty())
.collect(Collectors.groupingBy(RecordProcessingLogDto::getSourceRecordId, Collectors.counting()));
.collect(Collectors.groupingBy(RecordProcessingLogDto::getIncomingRecordId, Collectors.counting()));

Map<String, Long> sourceItemRecordIdCounts = entries.stream()
Map<String, Long> itemIncomingRecordIdCounts = entries.stream()
.filter(e -> e.getRelatedItemInfo() != null && !e.getRelatedItemInfo().isEmpty())
.collect(Collectors.groupingBy(RecordProcessingLogDto::getSourceRecordId, Collectors.counting()));
.collect(Collectors.groupingBy(RecordProcessingLogDto::getIncomingRecordId, Collectors.counting()));

return sourceRecordIdCounts.values().stream().anyMatch(count -> count > 1) ||
sourceItemRecordIdCounts.values().stream().anyMatch(count -> count > 1);
return holdingsIncomingRecordIdCounts.values().stream().anyMatch(count -> count > 1) ||
itemIncomingRecordIdCounts.values().stream().anyMatch(count -> count > 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public Future<Boolean> processChunk(RawRecordsDto incomingChunk, JobExecution jo
prepareChunk(incomingChunk);
return mapJobExecution(incomingChunk, jobExecution, false, params);
}

private Future<Boolean> mapJobExecution(RawRecordsDto incomingChunk, JobExecution jobExecution, boolean acceptInstanceId, OkapiConnectionParams params) {
if (isNotSupportedJobProfileExists(jobExecution)) {
throw new UnsupportedProfileException("Unsupported type of Job Profile.");
Expand Down Expand Up @@ -111,10 +112,10 @@ private boolean isExistsMatchProfileToInstanceWithActionUpdateMarcBib(Collection
}

private ProfileSnapshotWrapper getChildSnapshotWrapperByType(ProfileSnapshotWrapper profileSnapshotWrapper,
ProfileSnapshotWrapper.ContentType contentType) {
ProfileSnapshotWrapper.ContentType contentType) {
if (!CollectionUtils.isEmpty(profileSnapshotWrapper.getChildSnapshotWrappers())) {
List<ProfileSnapshotWrapper> childSnapshotWrappers = profileSnapshotWrapper.getChildSnapshotWrappers();
for(ProfileSnapshotWrapper snapshotWrapper : childSnapshotWrappers) {
for (ProfileSnapshotWrapper snapshotWrapper : childSnapshotWrappers) {
if (snapshotWrapper.getContentType() == contentType) {
return snapshotWrapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,29 @@
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.impl.KafkaHeaderImpl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.NotFoundException;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.IterableUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.MappingProfile;
import org.folio.services.exceptions.InvalidJobProfileForFileException;
import org.folio.services.journal.JournalUtil;
import org.folio.dao.JobExecutionSourceChunkDao;
import org.folio.dataimport.util.OkapiConnectionParams;
Expand Down Expand Up @@ -95,6 +99,7 @@
import org.folio.services.parsers.RecordParserBuilder;
import org.folio.services.util.RecordConversionUtil;
import org.folio.services.validation.JobProfileSnapshotValidationService;
import org.folio.verticle.consumers.util.JobExecutionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -177,7 +182,9 @@ public Future<List<Record>> parseRawRecordsChunkForJobExecution(RawRecordsDto ch

return isJobProfileCompatibleWithRecordsType(jobExecution.getJobProfileSnapshotWrapper(), parsedRecords)
? Future.succeededFuture(parsedRecords)
: Future.failedFuture(prepareWrongJobProfileErrorMessage(jobExecution, parsedRecords));
: Future.failedFuture(new InvalidJobProfileForFileException(
prepareWrongJobProfileErrorMessage(jobExecution, parsedRecords))
);
})
.compose(parsedRecords -> ensureMappingMetaDataSnapshot(jobExecution.getId(), parsedRecords, params)
.map(parsedRecords))
Expand Down Expand Up @@ -391,21 +398,30 @@ private boolean deleteMarcActionExists(JobExecution jobExecution) {
}

private boolean isCreateInstanceActionExists(JobExecution jobExecution) {
return containsCreateInstanceActionWithMatch(jobExecution.getJobProfileSnapshotWrapper());
return containsCreateInstanceActionWithoutMarcBib(jobExecution.getJobProfileSnapshotWrapper());
}

private boolean containsCreateInstanceActionWithMatch(ProfileSnapshotWrapper profileSnapshot) {
private boolean containsCreateInstanceActionWithoutMarcBib(ProfileSnapshotWrapper profileSnapshot) {
for (ProfileSnapshotWrapper childWrapper : profileSnapshot.getChildSnapshotWrappers()) {
if (childWrapper.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE
&& actionProfileMatches(childWrapper, List.of(FolioRecord.INSTANCE), Action.CREATE)) {
return childWrapper.getReactTo() != NON_MATCH;
} else if (containsCreateInstanceActionWithMatch(childWrapper)) {
return childWrapper.getReactTo() != NON_MATCH && !containsMarcBibToInstanceMappingProfile(childWrapper);
} else if (containsCreateInstanceActionWithoutMarcBib(childWrapper)) {
return true;
}
}
return false;
}

private boolean containsMarcBibToInstanceMappingProfile(ProfileSnapshotWrapper actionWrapper) {
return actionWrapper.getChildSnapshotWrappers()
.stream()
.map(mappingWrapper -> Optional.ofNullable(mappingWrapper.getContent()))
.filter(Optional::isPresent)
.map(content -> DatabindCodec.mapper().convertValue(content.get(), MappingProfile.class))
.anyMatch(mappingProfile -> mappingProfile.getIncomingRecordType() == EntityType.MARC_BIBLIOGRAPHIC);
}

private boolean isCreateAuthorityActionExists(JobExecution jobExecution) {
return containsMarcActionProfile(
jobExecution.getJobProfileSnapshotWrapper(),
Expand Down Expand Up @@ -858,6 +874,7 @@ private Future<List<Record>> saveRecords(OkapiConnectionParams params, JobExecut
}

private String prepareWrongJobProfileErrorMessage(JobExecution jobExecution, List<Record> records) {
JobExecutionUtils.cache.put(jobExecution.getId(), JobExecution.Status.ERROR);
return String.format(WRONG_JOB_PROFILE_ERROR_MESSAGE, jobExecution.getJobProfileInfo().getName(), records.get(0).getRecordType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.folio.services.exceptions;

/**
* This exception is thrown when an invalid job profile is selected for an uploaded file.
*/
public class InvalidJobProfileForFileException extends Exception {

public InvalidJobProfileForFileException(String message) {
super(message);
}

public InvalidJobProfileForFileException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_ERROR;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED;
import static org.folio.rest.jaxrs.model.JournalRecord.ActionType.UPDATE;
import static org.folio.rest.jaxrs.model.JournalRecord.EntityType.AUTHORITY;
import static org.folio.rest.jaxrs.model.JournalRecord.EntityType.HOLDINGS;
import static org.folio.rest.jaxrs.model.JournalRecord.EntityType.INSTANCE;
Expand Down Expand Up @@ -106,7 +107,6 @@ public static List<JournalRecord> buildJournalRecordsByEvent(DataImportEventPayl
JournalRecord.ActionStatus actionStatus) throws JournalRecordMapperException {
try {
HashMap<String, String> eventPayloadContext = eventPayload.getContext();
String incomingRecordId = eventPayloadContext.get(INCOMING_RECORD_ID);

String recordAsString = extractRecord(eventPayloadContext);
Record record;
Expand All @@ -119,6 +119,7 @@ record = new Record()
} else {
record = Json.decodeValue(recordAsString, Record.class);
}
String incomingRecordId = eventPayloadContext.get(INCOMING_RECORD_ID) != null ? eventPayloadContext.get(INCOMING_RECORD_ID) : record.getId();

String entityAsString = eventPayloadContext.get(entityType.value());
JournalRecord journalRecord = buildCommonJournalRecord(actionStatus, actionType, record, eventPayload, eventPayloadContext, incomingRecordId)
Expand All @@ -142,7 +143,8 @@ record = Json.decodeValue(recordAsString, Record.class);
}

if (!isEmpty(entityAsString)) {
if (entityType == INSTANCE || entityType == PO_LINE || entityType == AUTHORITY) {
if (entityType == INSTANCE || entityType == PO_LINE || entityType == AUTHORITY ||
(entityType == MARC_BIBLIOGRAPHIC && actionType == UPDATE)) {
JsonObject entityJson = new JsonObject(entityAsString);
journalRecord.setEntityId(entityJson.getString(ID_KEY));
if (entityType == INSTANCE || entityType == PO_LINE) {
Expand Down Expand Up @@ -174,6 +176,11 @@ record = Json.decodeValue(recordAsString, Record.class);
} else {
return Lists.newArrayList(journalRecord);
}
} else {
if (eventPayload.getEventType().equals(DI_ERROR.value()) && eventPayloadContext.containsKey(MARC_BIBLIOGRAPHIC.value())) {
var journalRecordWithMarcBib = buildJournalRecordWithMarcBibType(actionStatus, actionType, record, eventPayload, eventPayloadContext, incomingRecordId);
return Lists.newArrayList(journalRecord, journalRecordWithMarcBib);
}
}
return Lists.newArrayList(journalRecord);
} catch (Exception e) {
Expand All @@ -200,7 +207,7 @@ private static JournalRecord buildJournalRecordWithMarcBibType(JournalRecord.Act
String actionTypeFromContext = eventPayloadContext.get(MARC_BIB_RECORD_CREATED);

if (actionTypeFromContext.equals(Boolean.TRUE.toString())) actionTypeForMarcBib = JournalRecord.ActionType.CREATE;
else actionTypeForMarcBib = JournalRecord.ActionType.UPDATE;
else actionTypeForMarcBib = UPDATE;
}

return buildCommonJournalRecord(actionStatus, actionTypeForMarcBib, currentRecord, eventPayload, eventPayloadContext, incomingRecordId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.vertx.core.Vertx;
import io.vertx.core.impl.future.FailedFuture;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
Expand All @@ -16,7 +15,9 @@
import org.folio.kafka.exception.DuplicateEventException;
import org.folio.rest.jaxrs.model.Event;
import org.folio.rest.jaxrs.model.RawRecordsDto;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.services.ChunkProcessingService;
import org.folio.services.exceptions.InvalidJobProfileForFileException;
import org.folio.services.JobExecutionService;
import org.folio.services.exceptions.RawChunkRecordsParsingException;
import org.folio.services.exceptions.RecordsPublishingException;
Expand All @@ -43,7 +44,7 @@ public class RawMarcChunksKafkaHandler implements AsyncRecordHandler<String, byt
private final Vertx vertx;

public RawMarcChunksKafkaHandler(@Autowired @Qualifier("eventDrivenChunkProcessingService")
ChunkProcessingService eventDrivenChunkProcessingService,
ChunkProcessingService eventDrivenChunkProcessingService,
@Autowired RawRecordsFlowControlService flowControlService,
@Autowired JobExecutionService jobExecutionService,
@Autowired Vertx vertx) {
Expand Down Expand Up @@ -95,6 +96,13 @@ public Future<String> handle(KafkaConsumerRecord<String, byte[]> record) {
} else if (th instanceof RecordsPublishingException) {
LOGGER.warn("handle:: RawRecordsDto entries publishing to Kafka has failed for chunkId: {} chunkNumber: {} - {} for jobExecutionId: {}", chunkId, chunkNumber, rawRecordsDto.getRecordsMetadata(), jobExecutionId, th);
return Future.failedFuture(th);
} else if (th instanceof InvalidJobProfileForFileException) {
jobExecutionService.updateJobExecutionStatus(jobExecutionId, new StatusDto()
.withStatus(StatusDto.Status.ERROR)
.withErrorStatus(StatusDto.ErrorStatus.FILE_PROCESSING_ERROR),
okapiParams);
LOGGER.warn("handle:: Invalid job profile selected for uploaded file for chunkId: {} chunkNumber: {} - {} for jobExecutionId: {} chunkNUmber - {}", chunkId, chunkNumber, rawRecordsDto.getRecordsMetadata(), jobExecutionId, chunkNumber);
return Future.failedFuture(th);
} else {
LOGGER.warn("handle:: RawRecordsDto processing has failed with errors chunkId: {} chunkNumber: {} - {} for jobExecutionId: {}", chunkId, chunkNumber, rawRecordsDto.getRecordsMetadata(), jobExecutionId, th);
return Future.failedFuture(new RawChunkRecordsParsingException(th, rawRecordsDto));
Expand Down
Loading

0 comments on commit cb3ae5f

Please sign in to comment.