Skip to content

Commit

Permalink
MODINV-921: Apply 005 logic before saving MARC Bib in SRS (#653)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslav-epam committed Dec 7, 2023
1 parent 8797b3b commit 75b433f
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 15 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 20.2.0-SNAPSHOT 2023-XX-XX
* [MODSOURMAN-1022](https://issues.folio.org/browse/MODSOURMAN-1022) Remove step of initial saving of incoming records to SRS
* [MODINV-921](https://issues.folio.org/browse/MODINV-921) Apply 005 logic before saving MARC Bib in SRS

## 20.1.0 2023-10-13
* Update status when user attempts to update shared auth record from member tenant ([MODDATAIMP-926](https://issues.folio.org/browse/MODDATAIMP-926))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
import org.folio.inventory.dataimport.services.OrderHelperService;
import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
import org.folio.inventory.dataimport.util.ParsedRecordUtil;
import org.folio.inventory.domain.instances.Instance;
import org.folio.inventory.domain.instances.InstanceCollection;
Expand Down Expand Up @@ -100,8 +101,12 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
String instanceId = res.getEntityId();
mappingMetadataCache.get(jobExecutionId, context)
.compose(parametersOptional -> parametersOptional
.map(mappingMetadata -> prepareAndExecuteMapping(dataImportEventPayload, new JsonObject(mappingMetadata.getMappingRules()),
Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class)))
.map(mappingMetadata -> {
MappingParameters mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
payloadContext.put(EntityType.MARC_BIBLIOGRAPHIC.value(), Json.encode(targetRecord));
return prepareAndExecuteMapping(dataImportEventPayload, new JsonObject(mappingMetadata.getMappingRules()), mappingParameters);
})
.orElseGet(() -> Future.failedFuture(format(MAPPING_PARAMETERS_NOT_FOUND_MSG, jobExecutionId, recordId, chunkId))))
.compose(v -> {
InstanceCollection instanceCollection = storage.getInstanceCollection(context);
Expand All @@ -121,10 +126,7 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
.onSuccess(ar -> {
dataImportEventPayload.getContext().put(INSTANCE.value(), Json.encode(ar));
orderHelperService.fillPayloadForOrderPostProcessingIfNeeded(dataImportEventPayload, DI_INVENTORY_INSTANCE_CREATED, context)
.onComplete(result -> {
future.complete(dataImportEventPayload);
}
);
.onComplete(result -> future.complete(dataImportEventPayload));
})
.onFailure(e -> {
if (!(e instanceof DuplicateEventException)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package org.folio.inventory.dataimport.util;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
import org.folio.rest.jaxrs.model.MarcFieldProtectionSetting;
import org.folio.rest.jaxrs.model.ParsedRecord;
import org.folio.rest.jaxrs.model.Record;
import org.marc4j.MarcJsonReader;
import org.marc4j.MarcJsonWriter;
import org.marc4j.MarcReader;
import org.marc4j.MarcStreamWriter;
import org.marc4j.MarcWriter;
import org.marc4j.marc.ControlField;
import org.marc4j.marc.MarcFactory;
import org.marc4j.marc.VariableField;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;

import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isNotBlank;

/**
* Util to work with additional fields
*/
public final class AdditionalFieldsUtil {

private static final Logger LOGGER = LogManager.getLogger();
public static final String TAG_005 = "005";
public static final DateTimeFormatter dateTime005Formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss.S");
private static final String ANY_STRING = "*";
private static final CacheLoader<String, org.marc4j.marc.Record> parsedRecordContentCacheLoader;
private static final LoadingCache<String, org.marc4j.marc.Record> parsedRecordContentCache;

static {
// this function is executed when creating a new item to be saved in the cache.
// In this case, this is a MARC4J Record
parsedRecordContentCacheLoader =
parsedRecordContent -> {
MarcJsonReader marcJsonReader =
new MarcJsonReader(
new ByteArrayInputStream(
parsedRecordContent.getBytes(StandardCharsets.UTF_8)));
if (marcJsonReader.hasNext()) {
return marcJsonReader.next();
}
return null;
};

parsedRecordContentCache =
Caffeine.newBuilder().maximumSize(2000)
// weak keys allows parsed content strings that are used as keys to be garbage
// collected, even it is still
// referenced by the cache.
.weakKeys()
.recordStats()
.executor(
serviceExecutor -> {
// Due to the static nature and the API of this AdditionalFieldsUtil class, it is difficult to
// pass a vertx instance or assume whether a call to any of its static methods here is by a Vertx
// thread or a regular thread. The logic before is able to discern the type of thread and execute
// cache operations using the appropriate threading model.
Context context = Vertx.currentContext();
if (context != null) {
context.runOnContext(ar -> serviceExecutor.run());
} else {
// The common pool below is used because it is the default executor for caffeine
ForkJoinPool.commonPool().execute(serviceExecutor);
}
})
.build(parsedRecordContentCacheLoader);
}

private AdditionalFieldsUtil() {
}

/**
* Updates field 005 for case when this field is not protected.
*
* @param record record to update
* @param mappingParameters mapping parameters
*/
public static void updateLatestTransactionDate(Record record, MappingParameters mappingParameters) {
if (isField005NeedToUpdate(record, mappingParameters)) {
String date = dateTime005Formatter.format(ZonedDateTime.ofInstant(Instant.now(), ZoneId.systemDefault()));
boolean isLatestTransactionDateUpdated = addControlledFieldToMarcRecord(record, TAG_005, date);
if (!isLatestTransactionDateUpdated) {
throw new EventProcessingException(format("Failed to update field '005' to record with id '%s'",
record != null ? record.getId() : "null"));
}
}
}

private static boolean addControlledFieldToMarcRecord(Record record, String field, String value) {
boolean result = false;
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
if (record != null && record.getParsedRecord() != null && record.getParsedRecord().getContent() != null) {
MarcWriter streamWriter = new MarcStreamWriter(new ByteArrayOutputStream());
MarcJsonWriter jsonWriter = new MarcJsonWriter(os);

org.marc4j.marc.Record marcRecord = computeMarcRecord(record);
if (marcRecord != null) {
ControlField currentField = (ControlField) marcRecord.getVariableField(field);
if (currentField != null) {
ControlField newControlField = MarcFactory.newInstance().newControlField(field, value);
marcRecord.getControlFields().set(marcRecord.getControlFields().indexOf(currentField), newControlField);
}
// use stream writer to recalculate leader
streamWriter.write(marcRecord);
jsonWriter.write(marcRecord);

String parsedContentString = new JsonObject(os.toString()).encode();
// save parsed content string to cache then set it on the record
parsedRecordContentCache.put(parsedContentString, marcRecord);
record.setParsedRecord(record.getParsedRecord().withContent(parsedContentString));
result = true;
}
}
} catch (Exception e) {
LOGGER.warn("addControlledFieldToMarcRecord:: Failed to add additional controlled field {} to record {}", field, record.getId(), e);
}
return result;
}

/**
* Read value from controlled field in marc record
*
* @param record marc record
* @param tag tag to read
* @return value from field
*/
public static String getValueFromControlledField(Record record, String tag) {
try {
org.marc4j.marc.Record marcRecord = computeMarcRecord(record);
if (marcRecord != null) {
Optional<ControlField> controlField = marcRecord.getControlFields()
.stream()
.filter(field -> field.getTag().equals(tag))
.findFirst();
if (controlField.isPresent()) {
return controlField.get().getData();
}
}
} catch (Exception e) {
LOGGER.warn("getValueFromControlledField:: Failed to read controlled field {} from record {}", tag, record.getId(), e);
return null;
}
return null;
}

private static MarcReader buildMarcReader(Record record) {
String content = normalizeContent(record.getParsedRecord());
return new MarcJsonReader(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)));
}

/**
* Checks whether field 005 needs to be updated or this field is protected.
*
* @param record record to check
* @param mappingParameters mapping parameters
* @return true for case when field 005 have to updated
*/
private static boolean isField005NeedToUpdate(Record record, MappingParameters mappingParameters) {
boolean needToUpdate = true;
List<MarcFieldProtectionSetting> fieldProtectionSettings = mappingParameters.getMarcFieldProtectionSettings();
if (CollectionUtils.isNotEmpty(fieldProtectionSettings)) {
MarcReader reader = new MarcJsonReader(new ByteArrayInputStream(record.getParsedRecord().getContent().toString().getBytes()));
if (reader.hasNext()) {
org.marc4j.marc.Record marcRecord = reader.next();
VariableField field = marcRecord.getVariableFields(TAG_005).get(0);
needToUpdate = isNotProtected(fieldProtectionSettings, (ControlField) field);
}
}
return needToUpdate;
}

/**
* Checks is the control field is protected or not.
*
* @param fieldProtectionSettings List of MarcFieldProtectionSettings
* @param field Control field that is being checked
* @return true for case when control field isn't protected
*/
private static boolean isNotProtected(List<MarcFieldProtectionSetting> fieldProtectionSettings, ControlField field) {
return fieldProtectionSettings.stream()
.filter(setting -> setting.getField().equals(ANY_STRING) || setting.getField().equals(field.getTag()))
.noneMatch(setting -> setting.getData().equals(ANY_STRING) || setting.getData().equals(field.getData()));
}

private static org.marc4j.marc.Record computeMarcRecord(Record record) {
if (record != null && record.getParsedRecord() != null && isNotBlank(record.getParsedRecord().getContent().toString())) {
try {
var content = normalizeContent(record.getParsedRecord().getContent());
return parsedRecordContentCache.get(content);
} catch (Exception e) {
LOGGER.warn("computeMarcRecord:: Error during the transformation to marc record", e);
try {
MarcReader reader = buildMarcReader(record);
if (reader.hasNext()) {
return reader.next();
}
} catch (Exception ex) {
LOGGER.warn("computeMarcRecord:: Error during the building of MarcReader", ex);
}
return null;
}
}
return null;
}

/**
* Normalize parsed record content of {@link ParsedRecord} is type {@link String}
*
* @param parsedRecord parsed record
* @return parsed record normalized content
*/
private static String normalizeContent(ParsedRecord parsedRecord) {
Object content = parsedRecord.getContent();
return (content instanceof String contentStr ? new JsonObject(contentStr) : JsonObject.mapFrom(content)).encode();
}

private static String normalizeContent(Object content) {
return content instanceof String contentStr ? contentStr : Json.encode(content);
}
}
Loading

0 comments on commit 75b433f

Please sign in to comment.