Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…rd-manager into MODSOURMAN-1212
  • Loading branch information
VRohach committed Sep 10, 2024
2 parents cfaa7f1 + 3165cd6 commit 056af87
Show file tree
Hide file tree
Showing 71 changed files with 3,806 additions and 634 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 2024-xx-xx v3.9.0
* [MODSOURMAN-1195](https://folio-org.atlassian.net/browse/MODSOURMAN-1195) Save job execution progress in batches
* [MODSOURMAN-1166](https://folio-org.atlassian.net/browse/MODSOURMAN-1166) Sorting by Autority, Order and Error columns is not working on Log details page
* [MODDATAIMP-1029](https://folio-org.atlassian.net/browse/MODDATAIMP-1029) The authority record loaded via data-import using Default - Create SRS MARC Authority job profile is duplicated on the job-summary page
* [MODSOURMAN-1152](https://folio-org.atlassian.net/browse/MODSOURMAN-1152) The error message is not displayed in the di log summary
Expand All @@ -11,6 +12,9 @@
* [MODSOURMAN-1188](https://issues.folio.org/browse/MODSOURMAN-1188) Change MARC mappings of 010 $z from "Cancelled" LCCN to "Canceled LCCN"
* [MODSOURMAN-1181](https://issues.folio.org/browse/MODSOURMAN-1181) Modify the get_job_log_entries function to increase performance.
* [MODSOURMAN-1185](https://folio-org.atlassian.net/browse/MODSOURMAN-1185) Logs are duplicated on the import logs page for order import
* [MODSOURMAN-1194](https://folio-org.atlassian.net/browse/MODSOURMAN-1194) Include subject metadata subfields in authority name fields
* [MODSOURMAN-1215](https://folio-org.atlassian.net/browse/MODSOURMAN-1215) Upgrade Spring from 5 to 6.1.12
* [MODINV-1069](https://folio-org.atlassian.net/browse/MODINV-1069) Fix DataImportConsumerVerticleTest in mod-inventory and Fix NPE in HoldingsItemMatcher, fix job log entries

## 2023-03-22 v3.8.0
* [MODSOURMAN-1131](https://folio-org.atlassian.net/browse/MODSOURMAN-1131) The import of file for creating orders is completed with errors
Expand Down
8 changes: 6 additions & 2 deletions mod-source-record-manager-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<dependency>
<groupId>org.folio</groupId>
<artifactId>mod-source-record-storage-client</artifactId>
<version>5.8.0</version>
<version>5.9.0-SNAPSHOT</version>
<type>jar</type>
</dependency>
<dependency>
Expand Down Expand Up @@ -151,6 +151,10 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java3</artifactId>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>data-import-utils</artifactId>
Expand Down Expand Up @@ -228,7 +232,7 @@
<dependency>
<groupId>org.folio</groupId>
<artifactId>mod-di-converter-storage-client</artifactId>
<version>2.1.0</version>
<version>2.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.folio</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.folio.rest.jaxrs.model.RecordProcessingLogDto;
import org.folio.rest.jaxrs.model.RecordProcessingLogDtoCollection;

import java.util.Collection;
import java.util.List;
import java.util.Optional;

Expand All @@ -32,7 +33,7 @@ public interface JournalRecordDao {
* @param tenantId tenant id
* @return future with created JournalRecord entities
*/
Future<List<RowSet<Row>>> saveBatch(List<JournalRecord> journalRecords, String tenantId);
Future<List<RowSet<Row>>> saveBatch(Collection<JournalRecord> journalRecords, String tenantId);

/**
* Searches for JournalRecord entities by jobExecutionId and sorts them using specified sort criteria and direction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
Expand Down Expand Up @@ -145,7 +146,10 @@ public class JournalRecordDaoImpl implements JournalRecordDao {
"instance_action_status", "holdings_action_status", "item_action_status", "authority_action_status", "order_action_status",
"invoice_action_status", "error");
private static final String JOURNAL_RECORDS_TABLE = "journal_records";
private static final String INSERT_SQL = "INSERT INTO %s.%s (id, job_execution_id, source_id, source_record_order, entity_type, entity_id, entity_hrid, action_type, action_status, error, action_date, title, instance_id, holdings_id, order_id, permanent_location_id, tenant_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)";
private static final String INSERT_SQL = "INSERT INTO %s.%s " +
"(id, job_execution_id, source_id, source_record_order, entity_type, entity_id, entity_hrid, action_type, action_status, error, action_date, title, instance_id, holdings_id, order_id, permanent_location_id, tenant_id) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) "+
"ON CONFLICT DO NOTHING";
private static final String SELECT_BY_JOB_EXECUTION_ID_QUERY = "SELECT * FROM %s.%s WHERE job_execution_id = $1";
private static final String ORDER_BY_PATTERN = " ORDER BY %s %s";
private static final String DELETE_BY_JOB_EXECUTION_ID_QUERY = "DELETE FROM %s.%s WHERE job_execution_id = $1";
Expand Down Expand Up @@ -175,7 +179,7 @@ public Future<String> save(JournalRecord journalRecord, String tenantId) {
}

@Override
public Future<List<RowSet<Row>>> saveBatch(List<JournalRecord> journalRecords, String tenantId) {
public Future<List<RowSet<Row>>> saveBatch(Collection<JournalRecord> journalRecords, String tenantId) {
LOGGER.info("saveBatch:: Trying to save list of JournalRecord entities to the {} table", JOURNAL_RECORDS_TABLE);
Promise<List<RowSet<Row>>> promise = Promise.promise();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.rest.resource.interfaces.InitAPI;
import org.folio.services.journal.JournalService;
import org.folio.services.journal.JournalUtil;
import org.folio.services.progress.JobExecutionProgressUtil;
import org.folio.spring.SpringContextUtil;
import org.folio.verticle.DataImportConsumersVerticle;
import org.folio.verticle.DataImportInitConsumersVerticle;
import org.folio.verticle.DataImportJournalConsumersVerticle;
import org.folio.verticle.JobExecutionProgressVerticle;
import org.folio.verticle.DataImportJournalBatchConsumerVerticle;
import org.folio.verticle.QuickMarcUpdateConsumersVerticle;
import org.folio.verticle.RawMarcChunkConsumersVerticle;
import org.folio.verticle.StoredRecordChunkConsumersVerticle;
Expand Down Expand Up @@ -51,6 +54,9 @@ public class InitAPIImpl implements InitAPI {
@Value("${srm.kafka.DataImportJournalConsumersVerticle.instancesNumber:1}")
private int dataImportJournalConsumerInstancesNumber;

@Value("${srm.kafka.JobExecutionProgressVerticle.instancesNumber:1}")
private int jobExecutionProgressInstancesNumber;

@Value("${srm.kafka.QuickMarcUpdateConsumersVerticle.instancesNumber:1}")
private int quickMarcUpdateConsumerInstancesNumber;

Expand All @@ -67,6 +73,10 @@ public void init(Vertx vertx, Context context, Handler<AsyncResult<Boolean>> han
try {
SpringContextUtil.init(vertx, context, ApplicationConfig.class);
SpringContextUtil.autowireDependencies(this, context);

JobExecutionProgressUtil.registerCodecs(vertx);
JournalUtil.registerCodecs(vertx);

initJournalService(vertx);
deployConsumersVerticles(vertx)
.onSuccess(car -> {
Expand Down Expand Up @@ -99,6 +109,7 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
Promise<String> deployStoredMarcChunkConsumer = Promise.promise();
Promise<String> deployDataImportConsumer = Promise.promise();
Promise<String> deployDataImportJournalConsumer = Promise.promise();
Promise<String> deployJobExecutionProgressConsumer = Promise.promise();
Promise<String> deployQuickMarcUpdateConsumer = Promise.promise();
Promise<String> deployPeriodicDeleteJobExecution = Promise.promise();

Expand All @@ -122,11 +133,16 @@ private Future<?> deployConsumersVerticles(Vertx vertx) {
.setWorker(true)
.setInstances(dataImportConsumerInstancesNumber), deployDataImportConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, DataImportJournalConsumersVerticle.class),
vertx.deployVerticle(getVerticleName(verticleFactory, DataImportJournalBatchConsumerVerticle.class),
new DeploymentOptions()
.setWorker(true)
.setInstances(dataImportJournalConsumerInstancesNumber), deployDataImportJournalConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, JobExecutionProgressVerticle.class),
new DeploymentOptions()
.setWorker(true)
.setInstances(jobExecutionProgressInstancesNumber), deployJobExecutionProgressConsumer);

vertx.deployVerticle(getVerticleName(verticleFactory, QuickMarcUpdateConsumersVerticle.class),
new DeploymentOptions()
.setWorker(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.JobExecutionSourceChunk;
import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper;
import org.folio.rest.jaxrs.model.ProfileType;
import org.folio.rest.jaxrs.model.RawRecordsDto;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.services.exceptions.UnsupportedProfileException;
Expand All @@ -27,7 +28,9 @@
import java.util.List;
import java.util.Map;

import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.*;
import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE;
import static org.folio.rest.jaxrs.model.ProfileType.MAPPING_PROFILE;
import static org.folio.rest.jaxrs.model.ProfileType.MATCH_PROFILE;


public abstract class AbstractChunkProcessingService implements ChunkProcessingService {
Expand Down Expand Up @@ -112,7 +115,7 @@ private boolean isExistsMatchProfileToInstanceWithActionUpdateMarcBib(Collection
}

private ProfileSnapshotWrapper getChildSnapshotWrapperByType(ProfileSnapshotWrapper profileSnapshotWrapper,
ProfileSnapshotWrapper.ContentType contentType) {
ProfileType contentType) {
if (!CollectionUtils.isEmpty(profileSnapshotWrapper.getChildSnapshotWrappers())) {
List<ProfileSnapshotWrapper> childSnapshotWrappers = profileSnapshotWrapper.getChildSnapshotWrappers();
for (ProfileSnapshotWrapper snapshotWrapper : childSnapshotWrappers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_DELETE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.rest.jaxrs.model.DataImportEventTypes.DI_RAW_RECORDS_CHUNK_PARSED;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ReactTo.NON_MATCH;
import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE;
import static org.folio.rest.jaxrs.model.ProfileType.MATCH_PROFILE;
import static org.folio.rest.jaxrs.model.ReactToType.NON_MATCH;
import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_AUTHORITY;
import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB;
import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_HOLDING;
Expand All @@ -24,6 +25,7 @@
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getControlFieldValue;
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.getValue;
import static org.folio.services.afterprocessing.AdditionalFieldsUtil.hasIndicator;
import static org.folio.services.journal.JournalUtil.getJournalMessageProducer;
import static org.folio.services.util.EventHandlingUtil.sendEventToKafka;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.ACTION_FIELD;
import static org.folio.verticle.consumers.StoredRecordChunksKafkaHandler.CREATE_ACTION;
Expand All @@ -33,6 +35,8 @@
import com.google.common.collect.Lists;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.jackson.DatabindCodec;
Expand Down Expand Up @@ -60,7 +64,9 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.MappingProfile;
import org.folio.rest.jaxrs.model.IncomingRecord;
import org.folio.services.exceptions.InvalidJobProfileForFileException;
import org.folio.services.journal.BatchableJournalRecord;
import org.folio.services.journal.JournalUtil;
import org.folio.dao.JobExecutionSourceChunkDao;
import org.folio.dataimport.util.OkapiConnectionParams;
Expand Down Expand Up @@ -137,7 +143,7 @@ public class ChangeEngineServiceImpl implements ChangeEngineService {
private final KafkaConfig kafkaConfig;
private final FieldModificationService fieldModificationService;
private final IncomingRecordService incomingRecordService;
private final JournalRecordService journalRecordService;
private MessageProducer<Collection<BatchableJournalRecord>> journalRecordProducer;

@Value("${srm.kafka.RawChunksKafkaHandler.maxDistributionNum:100}")
private int maxDistributionNum;
Expand All @@ -155,7 +161,7 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio
@Autowired KafkaConfig kafkaConfig,
@Autowired FieldModificationService fieldModificationService,
@Autowired IncomingRecordService incomingRecordService,
@Autowired JournalRecordService journalRecordService) {
@Autowired Vertx vertx) {
this.jobExecutionSourceChunkDao = jobExecutionSourceChunkDao;
this.jobExecutionService = jobExecutionService;
this.marcRecordAnalyzer = marcRecordAnalyzer;
Expand All @@ -166,7 +172,7 @@ public ChangeEngineServiceImpl(@Autowired JobExecutionSourceChunkDao jobExecutio
this.kafkaConfig = kafkaConfig;
this.fieldModificationService = fieldModificationService;
this.incomingRecordService = incomingRecordService;
this.journalRecordService = journalRecordService;
this.journalRecordProducer = getJournalMessageProducer(vertx);
}

@Override
Expand Down Expand Up @@ -311,14 +317,18 @@ private Future<Boolean> sendEventWithContext(List<Record> records, JobExecution
private void saveIncomingAndJournalRecords(List<Record> parsedRecords, String tenantId) {
if (!parsedRecords.isEmpty()) {
incomingRecordService.saveBatch(JournalUtil.buildIncomingRecordsByRecords(parsedRecords), tenantId);
journalRecordService.saveBatch(JournalUtil.buildJournalRecordsByRecords(parsedRecords), tenantId);
List<BatchableJournalRecord> batchableJournalRecords = JournalUtil.buildJournalRecordsByRecords(parsedRecords)
.stream()
.map(r -> new BatchableJournalRecord(r, tenantId))
.toList();
journalRecordProducer.write(batchableJournalRecords);
}
}

private boolean createOrderActionExists(JobExecution jobExecution) {
if (jobExecution.getJobProfileSnapshotWrapper() != null) {
List<ProfileSnapshotWrapper> actionProfiles = jobExecution.getJobProfileSnapshotWrapper().getChildSnapshotWrappers()
.stream().filter(wrapper -> wrapper.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE).toList();
.stream().filter(wrapper -> wrapper.getContentType() == ACTION_PROFILE).toList();

if (!actionProfiles.isEmpty() && ifOrderCreateActionProfileExists(actionProfiles)) {
LOGGER.debug("createOrderActionExists:: Event type for Order's logic set by jobExecutionId {} ", jobExecution.getId());
Expand Down Expand Up @@ -427,7 +437,7 @@ private boolean isCreateInstanceActionExists(JobExecution jobExecution) {

private boolean containsCreateInstanceActionWithoutMarcBib(ProfileSnapshotWrapper profileSnapshot) {
for (ProfileSnapshotWrapper childWrapper : profileSnapshot.getChildSnapshotWrappers()) {
if (childWrapper.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE
if (childWrapper.getContentType() == ACTION_PROFILE
&& actionProfileMatches(childWrapper, List.of(FolioRecord.INSTANCE), Action.CREATE)) {
return childWrapper.getReactTo() != NON_MATCH && !containsMarcBibToInstanceMappingProfile(childWrapper);
} else if (containsCreateInstanceActionWithoutMarcBib(childWrapper)) {
Expand Down Expand Up @@ -465,7 +475,7 @@ private boolean isMarcAuthorityMatchProfile(JobExecution jobExecution) {
private boolean containsMatchProfile(ProfileSnapshotWrapper profileSnapshot) {
var childWrappers = profileSnapshot.getChildSnapshotWrappers();
for (ProfileSnapshotWrapper childWrapper : childWrappers) {
if (childWrapper.getContentType() == ProfileSnapshotWrapper.ContentType.MATCH_PROFILE
if (childWrapper.getContentType() == MATCH_PROFILE
&& marcAuthorityMatchProfileMatches(childWrapper)) {
return true;
} else if (containsMatchProfile(childWrapper)) {
Expand All @@ -484,7 +494,7 @@ private boolean containsMarcActionProfile(ProfileSnapshotWrapper profileSnapshot
List<FolioRecord> entityTypes, Action action) {
List<ProfileSnapshotWrapper> childWrappers = profileSnapshot.getChildSnapshotWrappers();
for (ProfileSnapshotWrapper childWrapper : childWrappers) {
if (childWrapper.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE
if (childWrapper.getContentType() == ACTION_PROFILE
&& actionProfileMatches(childWrapper, entityTypes, action)) {
return true;
} else if (containsMarcActionProfile(childWrapper, entityTypes, action)) {
Expand All @@ -497,7 +507,7 @@ && actionProfileMatches(childWrapper, entityTypes, action)) {
private boolean containsCreateActionProfileWithMarcHoldings(ProfileSnapshotWrapper profileSnapshot) {
List<ProfileSnapshotWrapper> childWrappers = profileSnapshot.getChildSnapshotWrappers();
for (ProfileSnapshotWrapper childWrapper : childWrappers) {
if (childWrapper.getContentType() == ProfileSnapshotWrapper.ContentType.ACTION_PROFILE
if (childWrapper.getContentType() == ACTION_PROFILE
&& actionProfileMatches(childWrapper, List.of(FolioRecord.HOLDINGS), Action.CREATE)
&& isMarcHoldingsExists(childWrapper)) {
return true;
Expand Down Expand Up @@ -854,8 +864,13 @@ private void fillParsedRecordsWithAdditionalFields(List<Record> records) {
}
String inventoryId = UUID.randomUUID().toString();
addFieldToMarcRecord(record, TAG_999, SUBFIELD_I, inventoryId);
var hrid = getControlFieldValue(record, TAG_001).trim();
record.setExternalIdsHolder(new ExternalIdsHolder().withAuthorityId(inventoryId).withAuthorityHrid(hrid));
Optional.ofNullable(getControlFieldValue(record, TAG_001))
.map(String::trim)
.ifPresentOrElse(hrId -> record.setExternalIdsHolder(new ExternalIdsHolder().withAuthorityId(inventoryId).withAuthorityHrid(hrId)),
() -> {
record.setExternalIdsHolder(new ExternalIdsHolder().withAuthorityId(inventoryId));
LOGGER.warn("fillParsedRecordsWithAdditionalFields:: record with id: {} does not contain the hrId field", record.getId());
});
}
}
}
Expand Down
Loading

0 comments on commit 056af87

Please sign in to comment.