Skip to content

Commit

Permalink
MODINV-986: Step 3: CreateInstanceIngressEventHandler implementation (#…
Browse files Browse the repository at this point in the history
…724)

* MODINV-986: InstanceIngressCreateEventHandler implementation
  • Loading branch information
PBobylev authored May 31, 2024
1 parent 17d7598 commit 5b3787b
Show file tree
Hide file tree
Showing 22 changed files with 706 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,9 @@ public class DataImportConsumerVerticle extends KafkaConsumerVerticle {
@Override
public void start(Promise<Void> startPromise) {
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber());

var profileSnapshotExpirationTime = getCacheEnvVariable("inventory.profile-snapshot-cache.expiration.time.seconds");

var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache,
var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(),
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

var futures = EVENT_TYPES.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {

@Override
public void start(Promise<Void> startPromise) {
var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx);
var instanceIngressEventHandler = new InstanceIngressEventConsumer(getStorage(), getHttpClient(), getMappingMetadataCache());

var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
Expand Down Expand Up @@ -37,7 +38,6 @@ public class MarcBibInstanceHridSetKafkaHandler implements AsyncRecordHandler<St

private static final Logger LOGGER = LogManager.getLogger(MarcBibInstanceHridSetKafkaHandler.class);
private static final String MAPPING_METADATA_NOT_FOUND_MSG = "MappingParameters and mapping rules snapshots were not found by jobExecutionId '%s'";
private static final String MARC_KEY = "MARC_BIB";
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();
Expand Down Expand Up @@ -69,14 +69,14 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
String jobExecutionId = eventPayload.get(JOB_EXECUTION_ID_HEADER);
LOGGER.info("Event payload has been received with event type: {}, recordId: {} by jobExecution: {} and chunkId: {}", event.getEventType(), recordId, jobExecutionId, chunkId);

if (isEmpty(eventPayload.get(MARC_KEY))) {
if (isEmpty(eventPayload.get(MARC_BIB_RECORD_FORMAT))) {
String message = format("Event payload does not contain required data to update Instance with event type: '%s', recordId: '%s' by jobExecution: '%s' and chunkId: '%s'", event.getEventType(), recordId, jobExecutionId, chunkId);
LOGGER.error(message);
return Future.failedFuture(message);
}

Context context = EventHandlingUtil.constructContext(headersMap.get(OKAPI_TENANT_HEADER), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER));
Record marcRecord = new JsonObject(eventPayload.get(MARC_KEY)).mapTo(Record.class);
Record marcRecord = new JsonObject(eventPayload.get(MARC_BIB_RECORD_FORMAT)).mapTo(Record.class);

mappingMetadataCache.get(jobExecutionId, context)
.map(metadataOptional -> metadataOptional.orElseThrow(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.lang.String.format;
import static java.util.Objects.isNull;
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
Expand Down Expand Up @@ -48,7 +49,6 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler<String, Str
private static final AtomicLong INDEXER = new AtomicLong();
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String MARC_BIB_RECORD_TYPE = "marc-bib";

private final InstanceUpdateDelegate instanceUpdateDelegate;
private final MappingMetadataCache mappingMetadataCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
import static org.folio.inventory.domain.instances.Instance.ID;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -32,9 +34,6 @@
import org.folio.processing.mapping.defaultmapper.RecordMapperBuilder;
import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
import org.folio.rest.client.SourceStorageRecordsClient;

import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import org.folio.rest.client.SourceStorageSnapshotsClient;
import org.folio.rest.jaxrs.model.AdditionalInfo;
import org.folio.rest.jaxrs.model.EntityType;
Expand All @@ -46,11 +45,7 @@
public abstract class AbstractInstanceEventHandler implements EventHandler {
protected static final Logger LOGGER = LogManager.getLogger(AbstractInstanceEventHandler.class);
protected static final String MARC_FORMAT = "MARC";
protected static final String MARC_BIB_RECORD_FORMAT = "MARC_BIB";
protected static final String INSTANCE_PATH = "instance";
protected static final List<String> requiredFields = Arrays.asList("source", "title", "instanceTypeId");
private static final String ID_FIELD = "id";
private static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true;
public static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true;

protected final Storage storage;
@Getter
Expand Down Expand Up @@ -230,7 +225,7 @@ protected void setSuppressFormDiscovery(Record srcRecord, boolean suppressFromDi
}

private void executeHrIdManipulation(Record srcRecord, JsonObject externalEntity) {
var externalId = externalEntity.getString(ID_FIELD);
var externalId = externalEntity.getString(ID);
var externalHrId = extractHridForInstance(externalEntity);
var externalIdsHolder = srcRecord.getExternalIdsHolder();
setExternalIdsForInstance(externalIdsHolder, externalId, externalHrId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
package org.folio.inventory.dataimport.handlers.actions;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.folio.ActionProfile.Action.CREATE;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields;
import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.ID;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.ActionProfile;
Expand All @@ -29,26 +53,6 @@
import org.folio.rest.jaxrs.model.EntityType;
import org.folio.rest.jaxrs.model.Record;

import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.folio.ActionProfile.Action.CREATE;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.*;
import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;

public class CreateInstanceEventHandler extends AbstractInstanceEventHandler {

private static final Logger LOGGER = LogManager.getLogger(CreateInstanceEventHandler.class);
Expand Down Expand Up @@ -120,7 +124,7 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
.compose(v -> {
InstanceCollection instanceCollection = storage.getInstanceCollection(context);
JsonObject instanceAsJson = prepareInstance(dataImportEventPayload, instanceId, jobExecutionId);
List<String> requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields);
List<String> requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS);
if (!requiredFieldsErrors.isEmpty()) {
String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", requiredFieldsErrors,
jobExecutionId, recordId, chunkId);
Expand Down Expand Up @@ -183,7 +187,7 @@ private JsonObject prepareInstance(DataImportEventPayload dataImportEventPayload
if (instanceAsJson.getJsonObject(INSTANCE_PATH) != null) {
instanceAsJson = instanceAsJson.getJsonObject(INSTANCE_PATH);
}
instanceAsJson.put("id", instanceId);
instanceAsJson.put(ID, instanceId);
instanceAsJson.put(SOURCE_KEY, MARC_FORMAT);
instanceAsJson.remove(HRID_KEY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static java.lang.String.format;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersUpdateDelegate;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;

public class InstanceUpdateDelegate {

Expand All @@ -30,7 +31,6 @@ public class InstanceUpdateDelegate {
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String QM_RELATED_RECORD_VERSION_KEY = "RELATED_RECORD_VERSION";
private static final String MARC_FORMAT = "MARC_BIB";

private final Storage storage;

Expand All @@ -46,7 +46,7 @@ public Future<Instance> handle(Map<String, String> eventPayload, Record marcReco
JsonObject parsedRecord = retrieveParsedContent(marcRecord.getParsedRecord());
String instanceId = marcRecord.getExternalIdsHolder().getInstanceId();
LOGGER.info("Instance update with instanceId: {}", instanceId);
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_FORMAT);
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var mappedInstance = recordMapper.mapRecord(parsedRecord, mappingParameters, mappingRules);
InstanceCollection instanceCollection = storage.getInstanceCollection(context);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
package org.folio.inventory.dataimport.handlers.actions;

import static java.lang.String.format;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.folio.ActionProfile.Action.UPDATE;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.LoggerUtil.INCOMING_RECORD_ID;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.domain.instances.Instance.DISCOVERY_SUPPRESS_KEY;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.METADATA_KEY;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_FOLIO;
import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_MARC;
import static org.folio.inventory.domain.instances.InstanceSource.FOLIO;
import static org.folio.inventory.domain.instances.InstanceSource.MARC;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.http.HttpStatus;
import org.folio.ActionProfile;
import org.folio.DataImportEventPayload;
Expand Down Expand Up @@ -34,35 +64,6 @@
import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper;
import org.folio.rest.jaxrs.model.Snapshot;

import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static java.util.Objects.nonNull;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.folio.ActionProfile.Action.UPDATE;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.LoggerUtil.INCOMING_RECORD_ID;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
import static org.folio.inventory.domain.instances.Instance.DISCOVERY_SUPPRESS_KEY;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.METADATA_KEY;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_FOLIO;
import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_MARC;
import static org.folio.inventory.domain.instances.InstanceSource.FOLIO;
import static org.folio.inventory.domain.instances.InstanceSource.MARC;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;


public class ReplaceInstanceEventHandler extends AbstractInstanceEventHandler { // NOSONAR

Expand Down Expand Up @@ -183,7 +184,7 @@ private void processInstanceUpdate(DataImportEventPayload dataImportEventPayload
recordId, chunkId))))
.compose(e -> {
JsonObject instanceAsJson = prepareTargetInstance(dataImportEventPayload, instanceToUpdate);
List<String> errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields);
List<String> errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS);

if (!errors.isEmpty()) {
String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", errors,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.folio.inventory.dataimport.util;

import static org.folio.inventory.domain.instances.Instance.INSTANCE_TYPE_ID_KEY;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.inventory.domain.instances.Instance.TITLE_KEY;

import java.util.Arrays;
import java.util.List;
import lombok.experimental.UtilityClass;

@UtilityClass
public final class MappingConstants {

public static final String MARC_BIB_RECORD_TYPE = "marc-bib";
public static final String MARC_BIB_RECORD_FORMAT = "MARC_BIB";
public static final String INSTANCE_PATH = "instance";
public static final List<String> INSTANCE_REQUIRED_FIELDS = Arrays.asList(SOURCE_KEY, TITLE_KEY, INSTANCE_TYPE_ID_KEY);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

public class Instance {
// JSON property names
public static final String ID = "id";
public static final String VERSION_KEY = "_version";
public static final String HRID_KEY = "hrid";
public static final String MATCH_KEY_KEY = "matchKey";
Expand Down Expand Up @@ -146,7 +147,7 @@ public Instance(
public static Instance fromJson(JsonObject instanceJson) {

return new Instance(
instanceJson.getString("id"),
instanceJson.getString(ID),
instanceJson.getString(VERSION_KEY),
instanceJson.getString("hrid"),
instanceJson.getString(SOURCE_KEY),
Expand Down Expand Up @@ -197,7 +198,7 @@ public static Instance fromJson(JsonObject instanceJson) {
public JsonObject getJsonForStorage() {
JsonObject json = new JsonObject();
//TODO: Review if this shouldn't be defaulting here
json.put("id", getId() != null
json.put(ID, getId() != null
? getId()
: UUID.randomUUID().toString());
putIfNotNull(json, VERSION_KEY, version);
Expand Down Expand Up @@ -248,7 +249,7 @@ public JsonObject getJsonForStorage() {
public JsonObject getJsonForResponse(WebContext context) {
JsonObject json = new JsonObject();

json.put("id", getId());
json.put(ID, getId());
putIfNotNull(json, VERSION_KEY, version);
json.put("hrid", getHrid());
json.put(SOURCE_KEY, getSource());
Expand Down
Loading

0 comments on commit 5b3787b

Please sign in to comment.