Skip to content

Commit

Permalink
MODINV-986: unit test + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 7, 2024
1 parent 9e3b8cd commit ec49d42
Show file tree
Hide file tree
Showing 11 changed files with 490 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
return future;
}

private String getInstanceId(Record record) {
protected String getInstanceId(Record record) {
String subfield999ffi = ParsedRecordUtil.getAdditionalSubfieldValue(record.getParsedRecord(), ParsedRecordUtil.AdditionalSubfields.I);
return isEmpty(subfield999ffi) ? UUID.randomUUID().toString() : subfield999ffi;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public final class AdditionalFieldsUtil {
private static final char TAG_035_IND = ' ';
private static final String ANY_STRING = "*";
private static final char INDICATOR = 'f';
public static final char SUBFIELD_B = 'b';
public static final char SUBFIELD_I = 'i';
private static final String HR_ID_FIELD = "hrid";
private static final CacheLoader<String, org.marc4j.marc.Record> parsedRecordContentCacheLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

import static java.lang.String.format;
import static java.util.Objects.isNull;
import static java.util.Optional.ofNullable;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_B;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_035;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_035_SUB;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
Expand All @@ -19,9 +23,10 @@
import io.vertx.core.json.JsonObject;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.HttpStatus;
import org.folio.MappingMetadataDto;
Expand All @@ -37,20 +42,24 @@
import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
import org.folio.inventory.services.IdStorageService;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.exception.DuplicateEventException;
import org.folio.processing.exceptions.EventProcessingException;
import org.folio.processing.mapping.defaultmapper.RecordMapper;
import org.folio.processing.mapping.defaultmapper.RecordMapperBuilder;
import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;
import org.folio.rest.jaxrs.model.ParsedRecord;
import org.folio.rest.jaxrs.model.RawRecord;
import org.folio.rest.jaxrs.model.Record;
import org.folio.rest.jaxrs.model.Snapshot;

public class CreateInstanceIngressEventHandler extends CreateInstanceEventHandler implements InstanceIngressEventHandler {

private static final Logger LOGGER = LogManager.getLogger(CreateInstanceIngressEventHandler.class);
public static final String BIBFRAME_ID = "bibframeId";
private static final Logger LOGGER = getLogger(CreateInstanceIngressEventHandler.class);
private static final String BIBFRAME = "(bibframe) ";
private static final String FAILURE = "Failed to process InstanceIngressEvent with id {}";
private final Context context;
private final InstanceCollection instanceCollection;

Expand All @@ -66,76 +75,91 @@ public CreateInstanceIngressEventHandler(PrecedingSucceedingTitlesHelper precedi
}

@Override
public CompletableFuture<InstanceIngressEvent> handle(InstanceIngressEvent event) {
public CompletableFuture<Instance> handle(InstanceIngressEvent event) {
try {
LOGGER.info("Processing InstanceIngressEvent with id '{}' for instance creation", event.getId());
var future = new CompletableFuture<Instance>();
if (eventContainsNoData(event)) {
var message = format("InstanceIngressEvent message does not contain required data to create Instance for eventId: '%s'", event.getId());
LOGGER.error(message);
return CompletableFuture.failedFuture(new EventProcessingException(message));
}
super.getMappingMetadataCache().getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE)
.map(metadataOptional -> metadataOptional.orElseThrow(() -> new EventProcessingException("MappingMetadata was not found for marc-bib record type")))
.compose(mappingMetadataDto -> prepareAndExecuteMapping(mappingMetadataDto, event))
.compose(instance -> validateInstance(instance, event))
.compose(instance -> saveInstance(instance, event));

return CompletableFuture.completedFuture(event);
var targetRecord = constructMarcBibRecord(event.getEventPayload());
var instanceId = ofNullable(event.getEventPayload().getSourceRecordIdentifier()).orElseGet(() -> getInstanceId(targetRecord));
idStorageService.store(targetRecord.getId(), instanceId, context.getTenantId())
.compose(res -> super.getMappingMetadataCache().getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE))
.compose(metadataOptional -> metadataOptional.map(metadata -> prepareAndExecuteMapping(metadata, targetRecord, event, instanceId))
.orElseGet(() -> Future.failedFuture("MappingMetadata was not found for marc-bib record type")))
.compose(instance -> validateInstance(instance, event))
.compose(instance -> saveInstance(instance, event))
.onFailure(e -> {
if (!(e instanceof DuplicateEventException)) {
LOGGER.error(FAILURE, event.getId(), e);
}
future.completeExceptionally(e);
})
.onComplete(ar -> future.complete(ar.result()));
return future;
} catch (Exception e) {
LOGGER.error("Failed to process InstanceIngressEvent with id {}", event.getId(), e);
LOGGER.error(FAILURE, event.getId(), e);
return CompletableFuture.failedFuture(e);
}
}

private static boolean eventContainsNoData(InstanceIngressEvent event) {
return isNull(event.getEventPayload()) || isNull(event.getEventPayload().getSourceRecordObject());
private boolean eventContainsNoData(InstanceIngressEvent event) {
return isNull(event.getEventPayload())
|| isNull(event.getEventPayload().getSourceRecordObject())
|| isNull(event.getEventPayload().getSourceType());
}

private Future<org.folio.Instance> prepareAndExecuteMapping(MappingMetadataDto mappingMetadata, InstanceIngressEvent event) {
private Future<org.folio.Instance> prepareAndExecuteMapping(MappingMetadataDto mappingMetadata,
Record targetRecord,
InstanceIngressEvent event,
String instanceId) {
return postSnapshotInSrsAndHandleResponse(event.getId())
.compose(snapshot -> {
try {
var marcBibRecord = constructMarcBibRecord(event);

LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId());
var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
AdditionalFieldsUtil.updateLatestTransactionDate(marcBibRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(marcBibRecord);
AdditionalFieldsUtil.normalize035(marcBibRecord);
AdditionalFieldsUtil.addFieldToMarcRecord(marcBibRecord, TAG_035, TAG_035_SUB,
BIBFRAME + event.getEventPayload().getSourceRecordIdentifier());
AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
AdditionalFieldsUtil.move001To035(targetRecord);
AdditionalFieldsUtil.normalize035(targetRecord);
if (event.getEventPayload().getAdditionalProperties().containsKey(BIBFRAME_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
BIBFRAME + event.getEventPayload().getAdditionalProperties().get(BIBFRAME_ID));
}

LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId());
var parsedRecord = new JsonObject((String) marcBibRecord.getParsedRecord().getContent());
var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent());
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules()));
instance.setId(instanceId);
instance.setSource(event.getEventPayload().getSourceType().value());
LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
return super.idStorageService.store(marcBibRecord.getId(), instance.getId(), context.getTenantId())
.map(r -> instance)
.onFailure(e -> LOGGER.error("Error creating relationship of inventory recordId '{} and instanceId '{}'", event.getId(), instance.getId()));
return Future.succeededFuture(instance);
} catch (Exception e) {
LOGGER.warn("Error during preparing and executing mapping:", e);
return Future.failedFuture(e);
}
});
}

private Record constructMarcBibRecord(InstanceIngressEvent event) {
LOGGER.info("Constructing a Record from InstanceIngressEvent with id '{}'", event.getId());
private Record constructMarcBibRecord(InstanceIngressPayload eventPayload) {
var recordId = UUID.randomUUID().toString();
var marcBibRecord = new org.folio.rest.jaxrs.model.Record()
.withId(event.getId())
.withId(recordId)
.withRecordType(MARC_BIB)
.withSnapshotId(event.getId())
.withSnapshotId(recordId)
.withRawRecord(new RawRecord()
.withId(event.getId())
.withContent(event.getEventPayload().getSourceRecordObject())
.withId(recordId)
.withContent(eventPayload.getSourceRecordObject())
)
.withParsedRecord(new ParsedRecord()
.withId(event.getId())
.withContent(event.getEventPayload().getSourceRecordObject())
.withId(recordId)
.withContent(eventPayload.getSourceRecordObject())
);
event.getEventPayload()
eventPayload
.withAdditionalProperty(MARC_BIBLIOGRAPHIC.value(), marcBibRecord);
return marcBibRecord;
}
Expand All @@ -149,40 +173,46 @@ private Future<Instance> validateInstance(org.folio.Instance instance, InstanceI
.orElseGet(() -> {
var mappedInstance = Instance.fromJson(instanceAsJson);
var uuidErrors = ValidationUtil.validateUUIDs(mappedInstance);
return failIfErrors(uuidErrors, event.getId())
.orElseGet(() -> Future.succeededFuture(mappedInstance));
return failIfErrors(uuidErrors, event.getId()).orElseGet(() -> Future.succeededFuture(mappedInstance));
});
} catch (Exception e) {
return Future.failedFuture(e);
}
}

private static Optional<Future<Instance>> failIfErrors(List<String> errors, String eventId) {
private Optional<Future<Instance>> failIfErrors(List<String> errors, String eventId) {
if (errors.isEmpty()) {
return Optional.empty();
}
var msg = format("Mapped Instance is invalid: %s, from InstanceIngressEvent with id '%s' ", errors, eventId);
var msg = format("Mapped Instance is invalid: %s, from InstanceIngressEvent with id '%s'", errors, eventId);
LOGGER.warn(msg);
return Optional.of(Future.failedFuture(msg));
}

private Future<Void> saveInstance(Instance instance, InstanceIngressEvent event) {
private Future<Instance> saveInstance(Instance instance, InstanceIngressEvent event) {
LOGGER.info("Saving Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
var targetRecord = (Record) event.getEventPayload().getAdditionalProperties().get(MARC_BIBLIOGRAPHIC.value());
var sourceContent = targetRecord.getParsedRecord().getContent().toString();
super.addInstance(instance, instanceCollection)
return super.addInstance(instance, instanceCollection)
.compose(createdInstance -> getPrecedingSucceedingTitlesHelper().createPrecedingSucceedingTitles(instance, context).map(createdInstance))
.compose(createdInstance -> executeFieldsManipulation(createdInstance, targetRecord))
.compose(createdInstance -> executeFieldsManipulation(createdInstance, targetRecord, event.getEventPayload().getAdditionalProperties()))
.compose(createdInstance -> {
var targetContent = targetRecord.getParsedRecord().getContent().toString();
var content = reorderMarcRecordFields(sourceContent, targetContent);
targetRecord.setParsedRecord(targetRecord.getParsedRecord().withContent(content));
return saveRecordInSrsAndHandleResponse(event, targetRecord, createdInstance);
});
return Future.succeededFuture();
}

protected Future<Instance> saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) {
private Future<Instance> executeFieldsManipulation(Instance instance, Record srcRecord,
Map<String, Object> eventProperties) {
if (eventProperties.containsKey(BIBFRAME_ID)) {
AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, SUBFIELD_B, (String) eventProperties.get(BIBFRAME_ID));
}
return super.executeFieldsManipulation(instance, srcRecord);
}

private Future<Instance> saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) {
LOGGER.info("Saving record in SRS and handling a response for an Instance with id '{}':", instance.getId());
Promise<Instance> promise = Promise.promise();
getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId())
Expand All @@ -209,7 +239,8 @@ private Future<Snapshot> postSnapshotInSrsAndHandleResponse(String id) {
.withJobExecutionId(id)
.withProcessingStartedDate(new Date())
.withStatus(PROCESSING_IN_PROGRESS);
return super.postSnapshotInSrsAndHandleResponse(context.getOkapiLocation(), context.getToken(), snapshot, context.getTenantId());
return super.postSnapshotInSrsAndHandleResponse(context.getOkapiLocation(),
context.getToken(), snapshot, context.getTenantId());
}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.inventory.domain.instances.Instance;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;
import org.folio.rest.jaxrs.model.InstanceIngressPayload;

public interface InstanceIngressEventHandler {

CompletableFuture<InstanceIngressEvent> handle(InstanceIngressEvent instanceIngressEvent);
CompletableFuture<Instance> handle(InstanceIngressEvent instanceIngressEvent);

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.folio.inventory.instanceingress.handler;

import java.util.concurrent.CompletableFuture;
import org.folio.inventory.domain.instances.Instance;
import org.folio.rest.jaxrs.model.InstanceIngressEvent;

public class InstanceIngressUpdateEventHandler implements InstanceIngressEventHandler {

@Override
public CompletableFuture<InstanceIngressEvent> handle(InstanceIngressEvent instanceIngressEvent) {
public CompletableFuture<Instance> handle(InstanceIngressEvent instanceIngressEvent) {
// to be implemented in MODINV-1008
return CompletableFuture.failedFuture(new UnsupportedOperationException());
}
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/org/folio/inventory/TestUtil.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.folio.inventory;

import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.impl.HttpResponseImpl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -12,4 +14,9 @@ public final class TestUtil {
public static String readFileFromPath(String path) throws IOException {
return Files.readString(Path.of(path));
}

public static HttpResponseImpl<Buffer> buildHttpResponseWithBuffer(Buffer buffer, int httpStatus) {
return new HttpResponseImpl<>(null, httpStatus, "",
null, null, null, buffer, null);
}
}
Loading

0 comments on commit ec49d42

Please sign in to comment.