Skip to content

Commit

Permalink
MODINV-986: InstanceIngressEventConsumer and CreateInstanceIngressEve…
Browse files Browse the repository at this point in the history
…ntHandler
  • Loading branch information
PBobylev authored May 31, 2024
1 parent 6d6b7c0 commit cdcb39d
Show file tree
Hide file tree
Showing 25 changed files with 896 additions and 131 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@
<path>${basedir}/ramls/holdings-record.json</path>
<path>${basedir}/ramls/holdings-records-source.json</path>
<path>${basedir}/ramls/mappingMetadataDto.json</path>
<path>${basedir}/ramls/instance-ingress-event.json</path>
</sourcePaths>
<targetPackage>org.folio</targetPackage>
<generateBuilders>true</generateBuilders>
Expand Down
92 changes: 92 additions & 0 deletions ramls/instance-ingress-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Instance ingress event data model",
"javaType": "org.folio.rest.jaxrs.model.InstanceIngressEvent",
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"description": "UUID",
"$ref": "uuid.json"
},
"eventType": {
"type": "string",
"enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"],
"description": "Instance ingress event type"
},
"eventMetadata": {
"description": "Event metadata",
"type": "object",
"additionalProperties": false,
"properties": {
"tenantId": {
"description": "Tenant id",
"type": "string"
},
"eventTTL": {
"description": "Time-to-live (TTL) for event in minutes",
"type": "integer"
},
"correlationId": {
"description": "Id to track related events, can be a meaningful string or a UUID",
"type": "string"
},
"originalEventId": {
"description": "Id of the event that started the sequence of related events",
"$ref": "uuid.json"
},
"publisherCallback": {
"description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time",
"type": "object",
"properties": {
"endpoint": {
"description": "Callback endpoint",
"type": "string"
},
"eventType": {
"description": "Error Event Type",
"type": "string"
}
}
},
"createdDate": {
"description": "Timestamp when event was created",
"type": "string",
"format": "date-time"
},
"publishedDate": {
"description": "Timestamp when event was initially published to the underlying topic",
"type": "string",
"format": "date-time"
},
"createdBy": {
"description": "Username of the user whose action caused an event",
"type": "string"
},
"publishedBy": {
"description": "Name and version of the module that published an event",
"type": "string"
}
},
"required": [
"tenantId",
"eventTTL",
"publishedBy"
]
},
"eventPayload": {
"type": "object",
"description": "An instance source record container",
"$ref": "instance-ingress-payload.json"
}
},
"excludedFromEqualsAndHashCode": [
"eventMetadata",
"eventPayload"
],
"required": [
"id",
"eventType",
"eventMetadata"
]
}
25 changes: 25 additions & 0 deletions ramls/instance-ingress-payload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "An instance source record container",
"type": "object",
"properties": {
"sourceRecordIdentifier": {
"type": "string",
"description": "The source record identifier"
},
"sourceRecordObject": {
"type": "string",
"description": "The source record JSON object"
},
"sourceType": {
"type": "string",
"enum": ["BIBFRAME", "MARC", "FOLIO"],
"description": "Source type"
}
},
"additionalProperties": true,
"required": [
"sourceRecordObject",
"sourceType"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,9 @@ public class DataImportConsumerVerticle extends KafkaConsumerVerticle {
@Override
public void start(Promise<Void> startPromise) {
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber());

var profileSnapshotExpirationTime = getCacheEnvVariable("inventory.profile-snapshot-cache.expiration.time.seconds");

var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache,
var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(),
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

var futures = EVENT_TYPES.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.handler.InstanceIngressEventHandler;
import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {
Expand All @@ -15,7 +15,7 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {

@Override
public void start(Promise<Void> startPromise) {
var instanceIngressEventHandler = new InstanceIngressEventHandler();
var instanceIngressEventHandler = new InstanceIngressEventConsumer(getStorage(), getHttpClient(), getMappingMetadataCache());

var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
Expand Down Expand Up @@ -37,7 +38,6 @@ public class MarcBibInstanceHridSetKafkaHandler implements AsyncRecordHandler<St

private static final Logger LOGGER = LogManager.getLogger(MarcBibInstanceHridSetKafkaHandler.class);
private static final String MAPPING_METADATA_NOT_FOUND_MSG = "MappingParameters and mapping rules snapshots were not found by jobExecutionId '%s'";
private static final String MARC_KEY = "MARC_BIB";
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperTool.getMapper();
Expand Down Expand Up @@ -69,14 +69,14 @@ public Future<String> handle(KafkaConsumerRecord<String, String> record) {
String jobExecutionId = eventPayload.get(JOB_EXECUTION_ID_HEADER);
LOGGER.info("Event payload has been received with event type: {}, recordId: {} by jobExecution: {} and chunkId: {}", event.getEventType(), recordId, jobExecutionId, chunkId);

if (isEmpty(eventPayload.get(MARC_KEY))) {
if (isEmpty(eventPayload.get(MARC_BIB_RECORD_FORMAT))) {
String message = format("Event payload does not contain required data to update Instance with event type: '%s', recordId: '%s' by jobExecution: '%s' and chunkId: '%s'", event.getEventType(), recordId, jobExecutionId, chunkId);
LOGGER.error(message);
return Future.failedFuture(message);
}

Context context = EventHandlingUtil.constructContext(headersMap.get(OKAPI_TENANT_HEADER), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER));
Record marcRecord = new JsonObject(eventPayload.get(MARC_KEY)).mapTo(Record.class);
Record marcRecord = new JsonObject(eventPayload.get(MARC_BIB_RECORD_FORMAT)).mapTo(Record.class);

mappingMetadataCache.get(jobExecutionId, context)
.map(metadataOptional -> metadataOptional.orElseThrow(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.lang.String.format;
import static java.util.Objects.isNull;
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
Expand Down Expand Up @@ -48,7 +49,6 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler<String, Str
private static final AtomicLong INDEXER = new AtomicLong();
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String MARC_BIB_RECORD_TYPE = "marc-bib";

private final InstanceUpdateDelegate instanceUpdateDelegate;
private final MappingMetadataCache mappingMetadataCache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
import static org.folio.inventory.domain.instances.Instance.ID;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -32,9 +34,6 @@
import org.folio.processing.mapping.defaultmapper.RecordMapperBuilder;
import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
import org.folio.rest.client.SourceStorageRecordsClient;

import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import org.folio.rest.client.SourceStorageSnapshotsClient;
import org.folio.rest.jaxrs.model.AdditionalInfo;
import org.folio.rest.jaxrs.model.EntityType;
Expand All @@ -46,11 +45,7 @@
public abstract class AbstractInstanceEventHandler implements EventHandler {
protected static final Logger LOGGER = LogManager.getLogger(AbstractInstanceEventHandler.class);
protected static final String MARC_FORMAT = "MARC";
protected static final String MARC_BIB_RECORD_FORMAT = "MARC_BIB";
protected static final String INSTANCE_PATH = "instance";
protected static final List<String> requiredFields = Arrays.asList("source", "title", "instanceTypeId");
private static final String ID_FIELD = "id";
private static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true;
public static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true;

protected final Storage storage;
@Getter
Expand Down Expand Up @@ -230,7 +225,7 @@ protected void setSuppressFormDiscovery(Record srcRecord, boolean suppressFromDi
}

private void executeHrIdManipulation(Record srcRecord, JsonObject externalEntity) {
var externalId = externalEntity.getString(ID_FIELD);
var externalId = externalEntity.getString(ID);
var externalHrId = extractHridForInstance(externalEntity);
var externalIdsHolder = srcRecord.getExternalIdsHolder();
setExternalIdsForInstance(externalIdsHolder, externalId, externalHrId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
package org.folio.inventory.dataimport.handlers.actions;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.folio.ActionProfile.Action.CREATE;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields;
import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.ID;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.ActionProfile;
Expand All @@ -29,26 +53,6 @@
import org.folio.rest.jaxrs.model.EntityType;
import org.folio.rest.jaxrs.model.Record;

import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.folio.ActionProfile.Action.CREATE;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.*;
import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE;

public class CreateInstanceEventHandler extends AbstractInstanceEventHandler {

private static final Logger LOGGER = LogManager.getLogger(CreateInstanceEventHandler.class);
Expand Down Expand Up @@ -120,7 +124,7 @@ public CompletableFuture<DataImportEventPayload> handle(DataImportEventPayload d
.compose(v -> {
InstanceCollection instanceCollection = storage.getInstanceCollection(context);
JsonObject instanceAsJson = prepareInstance(dataImportEventPayload, instanceId, jobExecutionId);
List<String> requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields);
List<String> requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS);
if (!requiredFieldsErrors.isEmpty()) {
String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", requiredFieldsErrors,
jobExecutionId, recordId, chunkId);
Expand Down Expand Up @@ -183,7 +187,7 @@ private JsonObject prepareInstance(DataImportEventPayload dataImportEventPayload
if (instanceAsJson.getJsonObject(INSTANCE_PATH) != null) {
instanceAsJson = instanceAsJson.getJsonObject(INSTANCE_PATH);
}
instanceAsJson.put("id", instanceId);
instanceAsJson.put(ID, instanceId);
instanceAsJson.put(SOURCE_KEY, MARC_FORMAT);
instanceAsJson.remove(HRID_KEY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static java.lang.String.format;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersUpdateDelegate;
import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;

public class InstanceUpdateDelegate {

Expand All @@ -30,7 +31,6 @@ public class InstanceUpdateDelegate {
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String QM_RELATED_RECORD_VERSION_KEY = "RELATED_RECORD_VERSION";
private static final String MARC_FORMAT = "MARC_BIB";

private final Storage storage;

Expand All @@ -46,7 +46,7 @@ public Future<Instance> handle(Map<String, String> eventPayload, Record marcReco
JsonObject parsedRecord = retrieveParsedContent(marcRecord.getParsedRecord());
String instanceId = marcRecord.getExternalIdsHolder().getInstanceId();
LOGGER.info("Instance update with instanceId: {}", instanceId);
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_FORMAT);
RecordMapper<org.folio.Instance> recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var mappedInstance = recordMapper.mapRecord(parsedRecord, mappingParameters, mappingRules);
InstanceCollection instanceCollection = storage.getInstanceCollection(context);

Expand Down
Loading

0 comments on commit cdcb39d

Please sign in to comment.