From cdcb39da5c93f74fa5ea3f3a129698b4b0475d4e Mon Sep 17 00:00:00 2001 From: Pavel Bobylev <131748689+PBobylev@users.noreply.github.com> Date: Fri, 31 May 2024 17:58:45 +0500 Subject: [PATCH] MODINV-986: InstanceIngressEventConsumer and CreateInstanceIngressEventHandler --- pom.xml | 1 + ramls/instance-ingress-event.json | 92 +++++ ramls/instance-ingress-payload.json | 25 ++ .../inventory/DataImportConsumerVerticle.java | 6 +- .../InstanceIngressConsumerVerticle.java | 4 +- .../MarcBibInstanceHridSetKafkaHandler.java | 6 +- .../consumers/MarcBibUpdateKafkaHandler.java | 2 +- .../actions/AbstractInstanceEventHandler.java | 21 +- .../actions/CreateInstanceEventHandler.java | 48 +-- .../actions/InstanceUpdateDelegate.java | 4 +- .../actions/ReplaceInstanceEventHandler.java | 61 ++-- .../dataimport/util/MappingConstants.java | 19 + .../inventory/domain/instances/Instance.java | 7 +- .../handler/InstanceIngressEventHandler.java | 22 -- .../InstanceIngressEventConsumer.java | 88 +++++ .../CreateInstanceIngressEventHandler.java | 331 ++++++++++++++++++ .../handler/InstanceIngressEventHandler.java | 11 + .../InstanceIngressUpdateEventHandler.java | 13 + .../support/KafkaConsumerVerticle.java | 11 + .../cache/MappingMetadataCacheTest.java | 23 +- .../InstanceIngressConsumerVerticleTest.java | 62 ++++ .../InstanceIngressEventConsumerTest.java | 136 +++++++ ...arcBibInstanceHridSetKafkaHandlerTest.java | 5 +- ...dateInstanceQuickMarcEventHandlerTest.java | 21 +- .../consumers/QuickMarcKafkaHandlerTest.java | 8 +- 25 files changed, 896 insertions(+), 131 deletions(-) create mode 100644 ramls/instance-ingress-event.json create mode 100644 ramls/instance-ingress-payload.json create mode 100644 src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java delete mode 100644 src/main/java/org/folio/inventory/handler/InstanceIngressEventHandler.java create mode 100644 src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java create mode 100644 src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java create mode 100644 src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java create mode 100644 src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java create mode 100644 src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java create mode 100644 src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressEventConsumerTest.java diff --git a/pom.xml b/pom.xml index bfa3c239a..8dc9a5f1d 100644 --- a/pom.xml +++ b/pom.xml @@ -445,6 +445,7 @@ ${basedir}/ramls/holdings-record.json ${basedir}/ramls/holdings-records-source.json ${basedir}/ramls/mappingMetadataDto.json + ${basedir}/ramls/instance-ingress-event.json org.folio true diff --git a/ramls/instance-ingress-event.json b/ramls/instance-ingress-event.json new file mode 100644 index 000000000..867cb07d2 --- /dev/null +++ b/ramls/instance-ingress-event.json @@ -0,0 +1,92 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "description": "Instance ingress event data model", + "javaType": "org.folio.rest.jaxrs.model.InstanceIngressEvent", + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "description": "UUID", + "$ref": "uuid.json" + }, + "eventType": { + "type": "string", + "enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"], + "description": "Instance ingress event type" + }, + "eventMetadata": { + "description": "Event metadata", + "type": "object", + "additionalProperties": false, + "properties": { + "tenantId": { + "description": "Tenant id", + "type": "string" + }, + "eventTTL": { + "description": "Time-to-live (TTL) for event in minutes", + "type": "integer" + }, + "correlationId": { + "description": "Id to track related events, can be a meaningful string or a UUID", + "type": "string" + }, + "originalEventId": { + "description": "Id of the event that started the sequence of related events", + "$ref": "uuid.json" + }, + "publisherCallback": { + "description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time", + "type": "object", + "properties": { + "endpoint": { + "description": "Callback endpoint", + "type": "string" + }, + "eventType": { + "description": "Error Event Type", + "type": "string" + } + } + }, + "createdDate": { + "description": "Timestamp when event was created", + "type": "string", + "format": "date-time" + }, + "publishedDate": { + "description": "Timestamp when event was initially published to the underlying topic", + "type": "string", + "format": "date-time" + }, + "createdBy": { + "description": "Username of the user whose action caused an event", + "type": "string" + }, + "publishedBy": { + "description": "Name and version of the module that published an event", + "type": "string" + } + }, + "required": [ + "tenantId", + "eventTTL", + "publishedBy" + ] + }, + "eventPayload": { + "type": "object", + "description": "An instance source record container", + "$ref": "instance-ingress-payload.json" + } + }, + "excludedFromEqualsAndHashCode": [ + "eventMetadata", + "eventPayload" + ], + "required": [ + "id", + "eventType", + "eventMetadata" + ] +} diff --git a/ramls/instance-ingress-payload.json b/ramls/instance-ingress-payload.json new file mode 100644 index 000000000..55ce36217 --- /dev/null +++ b/ramls/instance-ingress-payload.json @@ -0,0 +1,25 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "description": "An instance source record container", + "type": "object", + "properties": { + "sourceRecordIdentifier": { + "type": "string", + "description": "The source record identifier" + }, + "sourceRecordObject": { + "type": "string", + "description": "The source record JSON object" + }, + "sourceType": { + "type": "string", + "enum": ["BIBFRAME", "MARC", "FOLIO"], + "description": "Source type" + } + }, + "additionalProperties": true, + "required": [ + "sourceRecordObject", + "sourceType" + ] +} diff --git a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java index 118f39484..4e6df5b7d 100644 --- a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java @@ -78,13 +78,9 @@ public class DataImportConsumerVerticle extends KafkaConsumerVerticle { @Override public void start(Promise startPromise) { EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber()); - - var profileSnapshotExpirationTime = getCacheEnvVariable("inventory.profile-snapshot-cache.expiration.time.seconds"); - - var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime)); var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient()); - var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache, + var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(), getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache); var futures = EVENT_TYPES.stream() diff --git a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java index b3aec2fc4..b758f9dea 100644 --- a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java @@ -5,7 +5,7 @@ import io.vertx.core.Promise; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.folio.inventory.handler.InstanceIngressEventHandler; +import org.folio.inventory.instanceingress.InstanceIngressEventConsumer; import org.folio.inventory.support.KafkaConsumerVerticle; public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle { @@ -15,7 +15,7 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle { @Override public void start(Promise startPromise) { - var instanceIngressEventHandler = new InstanceIngressEventHandler(); + var instanceIngressEventHandler = new InstanceIngressEventConsumer(getStorage(), getHttpClient(), getMappingMetadataCache()); var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC); diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java index 3a4058b96..a5195b888 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java @@ -2,6 +2,7 @@ import static java.lang.String.format; import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER; @@ -37,7 +38,6 @@ public class MarcBibInstanceHridSetKafkaHandler implements AsyncRecordHandler handle(KafkaConsumerRecord record) { String jobExecutionId = eventPayload.get(JOB_EXECUTION_ID_HEADER); LOGGER.info("Event payload has been received with event type: {}, recordId: {} by jobExecution: {} and chunkId: {}", event.getEventType(), recordId, jobExecutionId, chunkId); - if (isEmpty(eventPayload.get(MARC_KEY))) { + if (isEmpty(eventPayload.get(MARC_BIB_RECORD_FORMAT))) { String message = format("Event payload does not contain required data to update Instance with event type: '%s', recordId: '%s' by jobExecution: '%s' and chunkId: '%s'", event.getEventType(), recordId, jobExecutionId, chunkId); LOGGER.error(message); return Future.failedFuture(message); } Context context = EventHandlingUtil.constructContext(headersMap.get(OKAPI_TENANT_HEADER), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER)); - Record marcRecord = new JsonObject(eventPayload.get(MARC_KEY)).mapTo(Record.class); + Record marcRecord = new JsonObject(eventPayload.get(MARC_BIB_RECORD_FORMAT)).mapTo(Record.class); mappingMetadataCache.get(jobExecutionId, context) .map(metadataOptional -> metadataOptional.orElseThrow(() -> diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java index 6f6324687..f50d8d6a1 100644 --- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java @@ -3,6 +3,7 @@ import static java.lang.String.format; import static java.util.Objects.isNull; import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE; import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL; import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS; import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; @@ -48,7 +49,6 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler requiredFields = Arrays.asList("source", "title", "instanceTypeId"); - private static final String ID_FIELD = "id"; - private static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true; + public static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true; protected final Storage storage; @Getter @@ -230,7 +225,7 @@ protected void setSuppressFormDiscovery(Record srcRecord, boolean suppressFromDi } private void executeHrIdManipulation(Record srcRecord, JsonObject externalEntity) { - var externalId = externalEntity.getString(ID_FIELD); + var externalId = externalEntity.getString(ID); var externalHrId = extractHridForInstance(externalEntity); var externalIdsHolder = srcRecord.getExternalIdsHolder(); setExternalIdsForInstance(externalIdsHolder, externalId, externalHrId); diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java index 3f2092c2f..89acb9488 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java @@ -1,10 +1,34 @@ package org.folio.inventory.dataimport.handlers.actions; +import static java.lang.String.format; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.folio.ActionProfile.Action.CREATE; +import static org.folio.ActionProfile.FolioRecord.INSTANCE; +import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC; +import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED; +import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING; +import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I; +import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999; +import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields; +import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE; +import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler; +import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH; +import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS; +import static org.folio.inventory.domain.instances.Instance.HRID_KEY; +import static org.folio.inventory.domain.instances.Instance.ID; +import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY; +import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE; + import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.http.HttpClient; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.folio.ActionProfile; @@ -29,26 +53,6 @@ import org.folio.rest.jaxrs.model.EntityType; import org.folio.rest.jaxrs.model.Record; -import java.util.HashMap; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; - -import static java.lang.String.format; -import static org.apache.commons.lang.StringUtils.isNotBlank; -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.folio.ActionProfile.Action.CREATE; -import static org.folio.ActionProfile.FolioRecord.INSTANCE; -import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC; -import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED; -import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING; -import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.*; -import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE; -import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler; -import static org.folio.inventory.domain.instances.Instance.HRID_KEY; -import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY; -import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE; - public class CreateInstanceEventHandler extends AbstractInstanceEventHandler { private static final Logger LOGGER = LogManager.getLogger(CreateInstanceEventHandler.class); @@ -120,7 +124,7 @@ public CompletableFuture handle(DataImportEventPayload d .compose(v -> { InstanceCollection instanceCollection = storage.getInstanceCollection(context); JsonObject instanceAsJson = prepareInstance(dataImportEventPayload, instanceId, jobExecutionId); - List requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields); + List requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS); if (!requiredFieldsErrors.isEmpty()) { String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", requiredFieldsErrors, jobExecutionId, recordId, chunkId); @@ -183,7 +187,7 @@ private JsonObject prepareInstance(DataImportEventPayload dataImportEventPayload if (instanceAsJson.getJsonObject(INSTANCE_PATH) != null) { instanceAsJson = instanceAsJson.getJsonObject(INSTANCE_PATH); } - instanceAsJson.put("id", instanceId); + instanceAsJson.put(ID, instanceId); instanceAsJson.put(SOURCE_KEY, MARC_FORMAT); instanceAsJson.remove(HRID_KEY); diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java index d0ea74252..fb6cb5345 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java @@ -22,6 +22,7 @@ import static java.lang.String.format; import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersUpdateDelegate; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT; public class InstanceUpdateDelegate { @@ -30,7 +31,6 @@ public class InstanceUpdateDelegate { private static final String MAPPING_RULES_KEY = "MAPPING_RULES"; private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS"; private static final String QM_RELATED_RECORD_VERSION_KEY = "RELATED_RECORD_VERSION"; - private static final String MARC_FORMAT = "MARC_BIB"; private final Storage storage; @@ -46,7 +46,7 @@ public Future handle(Map eventPayload, Record marcReco JsonObject parsedRecord = retrieveParsedContent(marcRecord.getParsedRecord()); String instanceId = marcRecord.getExternalIdsHolder().getInstanceId(); LOGGER.info("Instance update with instanceId: {}", instanceId); - RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_FORMAT); + RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT); var mappedInstance = recordMapper.mapRecord(parsedRecord, mappingParameters, mappingRules); InstanceCollection instanceCollection = storage.getInstanceCollection(context); diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java index 4cf33f8f0..b35110e8d 100644 --- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java +++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java @@ -1,10 +1,40 @@ package org.folio.inventory.dataimport.handlers.actions; +import static java.lang.String.format; +import static java.util.Objects.nonNull; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.folio.ActionProfile.Action.UPDATE; +import static org.folio.ActionProfile.FolioRecord.INSTANCE; +import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC; +import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED; +import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING; +import static org.folio.inventory.dataimport.util.LoggerUtil.INCOMING_RECORD_ID; +import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler; +import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH; +import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS; +import static org.folio.inventory.domain.instances.Instance.DISCOVERY_SUPPRESS_KEY; +import static org.folio.inventory.domain.instances.Instance.HRID_KEY; +import static org.folio.inventory.domain.instances.Instance.METADATA_KEY; +import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY; +import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_FOLIO; +import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_MARC; +import static org.folio.inventory.domain.instances.InstanceSource.FOLIO; +import static org.folio.inventory.domain.instances.InstanceSource.MARC; +import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE; + import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.http.HttpClient; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.apache.http.HttpStatus; import org.folio.ActionProfile; import org.folio.DataImportEventPayload; @@ -34,35 +64,6 @@ import org.folio.rest.jaxrs.model.ProfileSnapshotWrapper; import org.folio.rest.jaxrs.model.Snapshot; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -import static java.lang.String.format; -import static java.util.Objects.nonNull; -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import static org.folio.ActionProfile.Action.UPDATE; -import static org.folio.ActionProfile.FolioRecord.INSTANCE; -import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC; -import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED; -import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING; -import static org.folio.inventory.dataimport.util.LoggerUtil.INCOMING_RECORD_ID; -import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler; -import static org.folio.inventory.domain.instances.Instance.DISCOVERY_SUPPRESS_KEY; -import static org.folio.inventory.domain.instances.Instance.HRID_KEY; -import static org.folio.inventory.domain.instances.Instance.METADATA_KEY; -import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY; -import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_FOLIO; -import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_MARC; -import static org.folio.inventory.domain.instances.InstanceSource.FOLIO; -import static org.folio.inventory.domain.instances.InstanceSource.MARC; -import static org.folio.rest.jaxrs.model.ProfileSnapshotWrapper.ContentType.ACTION_PROFILE; - public class ReplaceInstanceEventHandler extends AbstractInstanceEventHandler { // NOSONAR @@ -183,7 +184,7 @@ private void processInstanceUpdate(DataImportEventPayload dataImportEventPayload recordId, chunkId)))) .compose(e -> { JsonObject instanceAsJson = prepareTargetInstance(dataImportEventPayload, instanceToUpdate); - List errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields); + List errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS); if (!errors.isEmpty()) { String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", errors, diff --git a/src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java b/src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java new file mode 100644 index 000000000..59b751bc1 --- /dev/null +++ b/src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java @@ -0,0 +1,19 @@ +package org.folio.inventory.dataimport.util; + +import static org.folio.inventory.domain.instances.Instance.INSTANCE_TYPE_ID_KEY; +import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY; +import static org.folio.inventory.domain.instances.Instance.TITLE_KEY; + +import java.util.Arrays; +import java.util.List; +import lombok.experimental.UtilityClass; + +@UtilityClass +public final class MappingConstants { + + public static final String MARC_BIB_RECORD_TYPE = "marc-bib"; + public static final String MARC_BIB_RECORD_FORMAT = "MARC_BIB"; + public static final String INSTANCE_PATH = "instance"; + public static final List INSTANCE_REQUIRED_FIELDS = Arrays.asList(SOURCE_KEY, TITLE_KEY, INSTANCE_TYPE_ID_KEY); + +} diff --git a/src/main/java/org/folio/inventory/domain/instances/Instance.java b/src/main/java/org/folio/inventory/domain/instances/Instance.java index 4fa289808..f3737fe26 100644 --- a/src/main/java/org/folio/inventory/domain/instances/Instance.java +++ b/src/main/java/org/folio/inventory/domain/instances/Instance.java @@ -29,6 +29,7 @@ public class Instance { // JSON property names + public static final String ID = "id"; public static final String VERSION_KEY = "_version"; public static final String HRID_KEY = "hrid"; public static final String MATCH_KEY_KEY = "matchKey"; @@ -146,7 +147,7 @@ public Instance( public static Instance fromJson(JsonObject instanceJson) { return new Instance( - instanceJson.getString("id"), + instanceJson.getString(ID), instanceJson.getString(VERSION_KEY), instanceJson.getString("hrid"), instanceJson.getString(SOURCE_KEY), @@ -197,7 +198,7 @@ public static Instance fromJson(JsonObject instanceJson) { public JsonObject getJsonForStorage() { JsonObject json = new JsonObject(); //TODO: Review if this shouldn't be defaulting here - json.put("id", getId() != null + json.put(ID, getId() != null ? getId() : UUID.randomUUID().toString()); putIfNotNull(json, VERSION_KEY, version); @@ -248,7 +249,7 @@ public JsonObject getJsonForStorage() { public JsonObject getJsonForResponse(WebContext context) { JsonObject json = new JsonObject(); - json.put("id", getId()); + json.put(ID, getId()); putIfNotNull(json, VERSION_KEY, version); json.put("hrid", getHrid()); json.put(SOURCE_KEY, getSource()); diff --git a/src/main/java/org/folio/inventory/handler/InstanceIngressEventHandler.java b/src/main/java/org/folio/inventory/handler/InstanceIngressEventHandler.java deleted file mode 100644 index 0074c254a..000000000 --- a/src/main/java/org/folio/inventory/handler/InstanceIngressEventHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.folio.inventory.handler; - -import io.vertx.core.Future; -import io.vertx.kafka.client.consumer.KafkaConsumerRecord; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.folio.kafka.AsyncRecordHandler; - -public class InstanceIngressEventHandler implements AsyncRecordHandler { - - private static final Logger LOGGER = LogManager.getLogger( - InstanceIngressEventHandler.class); - - @Override - public Future handle(KafkaConsumerRecord kafkaConsumerRecord) { - // to extract and re-use common logic from CreateInstanceEventHandler - // 1. Change event; 2. Re-use all except: source type to be changed to BIBFRAME, DI event not to be sent - - return null; - } - -} diff --git a/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java b/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java new file mode 100644 index 000000000..bcd15033b --- /dev/null +++ b/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java @@ -0,0 +1,88 @@ +package org.folio.inventory.instanceingress; + +import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext; +import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.CREATE_INSTANCE; +import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.UPDATE_INSTANCE; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER; +import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpClient; +import io.vertx.core.json.Json; +import io.vertx.ext.web.client.WebClient; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.inventory.common.Context; +import org.folio.inventory.dataimport.cache.MappingMetadataCache; +import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper; +import org.folio.inventory.instanceingress.handler.CreateInstanceIngressEventHandler; +import org.folio.inventory.instanceingress.handler.InstanceIngressEventHandler; +import org.folio.inventory.instanceingress.handler.InstanceIngressUpdateEventHandler; +import org.folio.inventory.storage.Storage; +import org.folio.kafka.AsyncRecordHandler; +import org.folio.kafka.KafkaHeaderUtils; +import org.folio.processing.exceptions.EventProcessingException; +import org.folio.rest.jaxrs.model.InstanceIngressEvent; + +public class InstanceIngressEventConsumer implements AsyncRecordHandler { + + private static final Logger LOGGER = LogManager.getLogger(InstanceIngressEventConsumer.class); + private final Storage storage; + private final HttpClient client; + private final MappingMetadataCache mappingMetadataCache; + + public InstanceIngressEventConsumer(Storage storage, + HttpClient client, + MappingMetadataCache mappingMetadataCache) { + this.storage = storage; + this.client = client; + this.mappingMetadataCache = mappingMetadataCache; + } + + @Override + public Future handle(KafkaConsumerRecord consumerRecord) { + var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers()); + var event = Json.decodeValue(consumerRecord.value(), InstanceIngressEvent.class); + var context = constructContext(event.getEventMetadata().getTenantId(), + kafkaHeaders.get(OKAPI_TOKEN_HEADER), kafkaHeaders.get(OKAPI_URL_HEADER)); + LOGGER.info("Instance ingress event has been received with event type: {}", event.getEventType()); + return Future.succeededFuture(event.getEventPayload()) + .compose(eventPayload -> processEvent(event, context) + .map(ar -> consumerRecord.key()), th -> { + LOGGER.error("Update record state was failed while handle event, {}", th.getMessage()); + return Future.failedFuture(th.getMessage()); + }); + } + + private Future processEvent(InstanceIngressEvent event, Context context) { + try { + Promise promise = Promise.promise(); + getInstanceIngressEventHandler(event.getEventType(), context).handle(event) + .whenComplete((res, ex) -> { + if (ex != null) { + promise.fail(ex); + } else { + promise.complete(event.getEventType()); + } + }); + return promise.future(); + } catch (Exception e) { + LOGGER.warn("Error during processPayload: ", e); + return Future.failedFuture(e); + } + } + + private InstanceIngressEventHandler getInstanceIngressEventHandler(InstanceIngressEvent.EventType eventType, Context context) { + if (eventType == CREATE_INSTANCE) { + return new CreateInstanceIngressEventHandler(new PrecedingSucceedingTitlesHelper(WebClient.wrap(client)), mappingMetadataCache, client, context, storage); + } else if (eventType == UPDATE_INSTANCE) { + return new InstanceIngressUpdateEventHandler(); + } else { + LOGGER.warn("Can't process eventType {}", eventType); + throw new EventProcessingException("Can't process eventType " + eventType); + } + } + +} diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java new file mode 100644 index 000000000..d6e07ba51 --- /dev/null +++ b/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java @@ -0,0 +1,331 @@ +package org.folio.inventory.instanceingress.handler; + +import static java.lang.String.format; +import static java.util.Objects.isNull; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.codehaus.plexus.util.StringUtils.isNotEmpty; +import static org.folio.inventory.dataimport.handlers.actions.AbstractInstanceEventHandler.IS_HRID_FILLING_NEEDED_FOR_INSTANCE; +import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999; +import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields; +import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE; +import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE; +import static org.folio.inventory.domain.instances.Instance.ID; +import static org.folio.inventory.domain.instances.Instance.INSTANCE_TYPE_ID_KEY; +import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY; +import static org.folio.rest.jaxrs.model.EntityType.MARC_BIBLIOGRAPHIC; +import static org.folio.rest.jaxrs.model.InstanceIngressPayload.SourceType; +import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB; +import static org.folio.rest.jaxrs.model.Snapshot.Status.PROCESSING_FINISHED; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.http.HttpClient; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.folio.HttpStatus; +import org.folio.MappingMetadataDto; +import org.folio.inventory.common.Context; +import org.folio.inventory.dataimport.cache.MappingMetadataCache; +import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper; +import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil; +import org.folio.inventory.dataimport.util.AdditionalFieldsUtil; +import org.folio.inventory.dataimport.util.ValidationUtil; +import org.folio.inventory.domain.instances.Instance; +import org.folio.inventory.domain.instances.InstanceCollection; +import org.folio.inventory.instanceingress.InstanceIngressEventConsumer; +import org.folio.inventory.storage.Storage; +import org.folio.kafka.exception.DuplicateEventException; +import org.folio.processing.exceptions.EventProcessingException; +import org.folio.processing.mapping.defaultmapper.RecordMapper; +import org.folio.processing.mapping.defaultmapper.RecordMapperBuilder; +import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; +import org.folio.rest.client.SourceStorageRecordsClient; +import org.folio.rest.client.SourceStorageSnapshotsClient; +import org.folio.rest.jaxrs.model.ExternalIdsHolder; +import org.folio.rest.jaxrs.model.InstanceIngressEvent; +import org.folio.rest.jaxrs.model.ParsedRecord; +import org.folio.rest.jaxrs.model.RawRecord; +import org.folio.rest.jaxrs.model.Record; +import org.folio.rest.jaxrs.model.Snapshot; + +public class CreateInstanceIngressEventHandler implements InstanceIngressEventHandler { + + private static final Logger LOGGER = LogManager.getLogger(CreateInstanceIngressEventHandler.class); + private final PrecedingSucceedingTitlesHelper precedingSucceedingTitlesHelper; + private final MappingMetadataCache mappingMetadataCache; + private final HttpClient httpClient; + private final Context context; + private final InstanceCollection instanceCollection; + + public CreateInstanceIngressEventHandler(PrecedingSucceedingTitlesHelper precedingSucceedingTitlesHelper, + MappingMetadataCache mappingMetadataCache, + HttpClient httpClient, + Context context, + Storage storage) { + this.precedingSucceedingTitlesHelper = precedingSucceedingTitlesHelper; + this.mappingMetadataCache = mappingMetadataCache; + this.httpClient = httpClient; + this.context = context; + this.instanceCollection = storage.getInstanceCollection(context); + } + + @Override + public CompletableFuture handle(InstanceIngressEvent event) { + try { + LOGGER.info("Processing InstanceIngressEvent with id '{}' for instance creation", event.getId()); + if (isNull(event.getEventPayload()) || isNull(event.getEventPayload().getSourceRecordObject())) { + var message = format("InstanceIngressEvent message does not contain required data to create Instance for eventId: '%s'", + event.getId()); + LOGGER.error(message); + return CompletableFuture.failedFuture(new EventProcessingException(message)); + } + //? no IdStorageService interaction? + mappingMetadataCache.getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE) + .map(metadataOptional -> metadataOptional.orElseThrow(() -> new EventProcessingException("MappingMetadata was not found for marc-bib record type"))) + .compose(mappingMetadataDto -> prepareAndExecuteMapping(mappingMetadataDto, event)) + .compose(instance -> validateInstance(instance, event)) + .compose(instance -> saveInstance(instance, event)); + + return CompletableFuture.completedFuture(event); + } catch (Exception e) { + LOGGER.error("Failed to process InstanceIngressEvent with id {}", event.getId(), e); + return CompletableFuture.failedFuture(e); + } + } + + private Future prepareAndExecuteMapping(MappingMetadataDto mappingMetadata, InstanceIngressEvent event) { + return postSnapshotInSrsAndHandleResponse() + .compose(snapshot -> { + try { + LOGGER.debug("Constructing a Record from InstanceIngressEvent with id '{}'", event.getId()); + var marcBibRecord = new org.folio.rest.jaxrs.model.Record() + .withId(event.getId()) //? + .withRecordType(MARC_BIB) + .withSnapshotId(snapshot.getJobExecutionId()) //? + .withRawRecord(new RawRecord() //? + .withId(event.getId()) + .withContent(event.getEventPayload().getSourceRecordObject()) + ) + .withParsedRecord(new ParsedRecord() + .withId(event.getId()) //? + .withContent(event.getEventPayload().getSourceRecordObject()) + ); + event.getEventPayload() + .withAdditionalProperty(MARC_BIBLIOGRAPHIC.value(), marcBibRecord); + + LOGGER.debug("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId()); + var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class); + AdditionalFieldsUtil.updateLatestTransactionDate(marcBibRecord, mappingParameters); + AdditionalFieldsUtil.move001To035(marcBibRecord); + AdditionalFieldsUtil.normalize035(marcBibRecord); + //? other manipulations how to + + LOGGER.debug("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId()); + var parsedRecord = new JsonObject((String) marcBibRecord.getParsedRecord().getContent()); + RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT); + var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules())); + LOGGER.debug("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance); + return Future.succeededFuture(instance); + } catch (Exception e) { + LOGGER.warn("Error during preparing and executing mapping:", e); + return Future.failedFuture(e); + } + }); + } + + private Future validateInstance(org.folio.Instance instance, InstanceIngressEvent event) { + try { + LOGGER.debug("Validating Instance from InstanceIngressEvent with id '{}':", event.getId()); + var instanceAsJson = prepareInstance(instance, event.getEventPayload().getSourceType()); + var errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS); + return failIfErrors(errors, event.getId()) + .orElseGet(() -> { + var mappedInstance = Instance.fromJson(instanceAsJson); + var uuidErrors = ValidationUtil.validateUUIDs(mappedInstance); + return failIfErrors(uuidErrors, event.getId()) + .orElseGet(() -> Future.succeededFuture(mappedInstance)); + }); + } catch (Exception e) { + return Future.failedFuture(e); + } + } + + private static Optional> failIfErrors(List errors, String eventId) { + if (!errors.isEmpty()) { + var msg = format("Mapped Instance is invalid: %s, from InstanceIngressEvent with id '%s' ", errors, eventId); + LOGGER.warn(msg); + return Optional.of(Future.failedFuture(msg)); + } + return Optional.empty(); + } + + private JsonObject prepareInstance(org.folio.Instance instance, SourceType sourceType) { + var instanceAsJson = JsonObject.mapFrom(instance); + instanceAsJson.put(SOURCE_KEY, sourceType.value()); + if (isNull(instanceAsJson.getString(INSTANCE_TYPE_ID_KEY))) { + instanceAsJson.put(INSTANCE_TYPE_ID_KEY, "30fffe0e-e985-4144-b2e2-1e8179bdb41f"); + } + return instanceAsJson; + } + + private Future saveInstance(Instance instance, InstanceIngressEvent event) { + LOGGER.debug("Saving Instance from InstanceIngressEvent with id '{}':", event.getId()); + var targetRecord = (Record) event.getEventPayload().getAdditionalProperties().get(MARC_BIBLIOGRAPHIC.value()); + var sourceContent = targetRecord.getParsedRecord().getContent().toString(); + addInstance(instance) + .compose(createdInstance -> precedingSucceedingTitlesHelper.createPrecedingSucceedingTitles(instance, context).map(createdInstance)) + .compose(createdInstance -> executeFieldsManipulation(createdInstance, targetRecord)) + .compose(createdInstance -> { + var targetContent = targetRecord.getParsedRecord().getContent().toString(); + var content = reorderMarcRecordFields(sourceContent, targetContent); + targetRecord.setParsedRecord(targetRecord.getParsedRecord().withContent(content)); + return saveRecordInSrsAndHandleResponse(targetRecord, createdInstance); + }); + return Future.succeededFuture(); + } + + private Future addInstance(Instance instance) { + Promise promise = Promise.promise(); + instanceCollection.add(instance, success -> promise.complete(success.getResult()), + failure -> { + //This is temporary solution (verify by error message). It will be improved via another solution by https://issues.folio.org/browse/RMB-899. + if (isNotBlank(failure.getReason()) && failure.getReason().contains(UNIQUE_ID_ERROR_MESSAGE)) { + LOGGER.info("Duplicated event received by InstanceId: {}. Ignoring...", instance.getId()); + promise.fail(new DuplicateEventException(format("Duplicated event by Instance id: %s", instance.getId()))); + } else { + LOGGER.error(format("Error posting Instance by instanceId:'%s' cause %s, status code %s", instance.getId(), failure.getReason(), failure.getStatusCode())); + promise.fail(failure.getReason()); + } + }); + return promise.future(); + } + + private Future executeFieldsManipulation(Instance instance, Record srcRecord) { + LOGGER.debug("executeFieldsManipulation for an Instance with id '{}':", instance.getId()); + AdditionalFieldsUtil.fill001FieldInMarcRecord(srcRecord, instance.getHrid()); + if (StringUtils.isBlank(srcRecord.getMatchedId())) { + srcRecord.setMatchedId(srcRecord.getId()); + } + setExternalIds(srcRecord, instance); + return AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, 'i', instance.getId()) + ? Future.succeededFuture(instance) + : Future.failedFuture(format("Failed to add instance id '%s' to record with id '%s'", instance.getId(), srcRecord.getId())); + } + + protected void setExternalIds(Record srcRecord, Instance instance) { + if (srcRecord.getExternalIdsHolder() == null) { + srcRecord.setExternalIdsHolder(new ExternalIdsHolder()); + } + var externalId = srcRecord.getExternalIdsHolder().getInstanceId(); + var externalHrid = srcRecord.getExternalIdsHolder().getInstanceHrid(); + if (isNotEmpty(externalId) || isNotEmpty(externalHrid)) { + if (AdditionalFieldsUtil.isFieldsFillingNeeded(srcRecord, instance)) { + executeHrIdManipulation(srcRecord, instance.getJsonForStorage()); + } + } else { + executeHrIdManipulation(srcRecord, instance.getJsonForStorage()); + } + } + + private void executeHrIdManipulation(Record srcRecord, JsonObject externalEntity) { + var externalId = externalEntity.getString(ID); + var externalHrId = extractHridForInstance(externalEntity); + var externalIdsHolder = srcRecord.getExternalIdsHolder(); + setExternalIdsForInstance(externalIdsHolder, externalId, externalHrId); + boolean isAddedField = AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, 'i', externalId); + if (IS_HRID_FILLING_NEEDED_FOR_INSTANCE) { + AdditionalFieldsUtil.fillHrIdFieldInMarcRecord(Pair.of(srcRecord, externalEntity)); + } + if (!isAddedField) { + throw new EventProcessingException( + format("Failed to add externalEntity id '%s' to record with id '%s'", externalId, srcRecord.getId())); + } + } + + private String extractHridForInstance(JsonObject externalEntity) { + return externalEntity.getString("hrid"); + } + + private void setExternalIdsForInstance(ExternalIdsHolder externalIdsHolder, String externalId, String externalHrid) { + externalIdsHolder.setInstanceId(externalId); + externalIdsHolder.setInstanceHrid(externalHrid); + } + + protected Future saveRecordInSrsAndHandleResponse(Record srcRecord, Instance instance) { + LOGGER.debug("Saving record in SRS and handling a response for an Instance with id '{}':", instance.getId()); + Promise promise = Promise.promise(); + getSourceStorageRecordsClient().postSourceStorageRecords(srcRecord) + .onComplete(ar -> { + var result = ar.result(); + if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) { + LOGGER.info("Created MARC record in SRS with id: '{}', instanceId: '{}', from tenant: {}", + srcRecord.getId(), instance.getId(), context.getTenantId()); + promise.complete(instance); + } else { + String msg = format("Failed to create MARC record in SRS, instanceId: '%s', status code: %s, Record: %s", + instance.getId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : ""); + LOGGER.warn(msg); + deleteInstance(instance.getId()); + promise.fail(msg); + } + }); + return promise.future(); + } + + public SourceStorageRecordsClient getSourceStorageRecordsClient() { + return new SourceStorageRecordsClient(context.getOkapiLocation(), context.getTenantId(), context.getToken(), httpClient); + } + + private void deleteInstance(String id) { + Promise promise = Promise.promise(); + instanceCollection.delete(id, success -> { + LOGGER.info("deleteInstance:: Instance was deleted by id: '{}'", id); + promise.complete(success.getResult()); + }, + failure -> { + LOGGER.warn("deleteInstance:: Error deleting Instance by id: '{}', cause: {}, status code: {}", + id, failure.getReason(), failure.getStatusCode()); + promise.fail(failure.getReason()); + }); + promise.future(); + } + + private Future postSnapshotInSrsAndHandleResponse() { + Promise promise = Promise.promise(); + var snapshot = new Snapshot() + .withJobExecutionId(UUID.randomUUID().toString()) + .withProcessingStartedDate(new Date()) + .withStatus(PROCESSING_FINISHED); + getSourceStorageSnapshotsClient().postSourceStorageSnapshots(snapshot) + .onComplete(ar -> { + var result = ar.result(); + if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) { + LOGGER.info("postSnapshotInSrsAndHandleResponse:: Posted snapshot " + + "with id: {} to tenant: {}", snapshot.getJobExecutionId(), context.getTenantId()); + promise.complete(result.bodyAsJson(Snapshot.class)); + } else { + String msg = format("Failed to create snapshot in SRS, snapshot id: %s, tenant id: %s, status code: %s, snapshot: %s", + snapshot.getJobExecutionId(), context.getTenantId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : ""); + LOGGER.warn(msg); + promise.fail(msg); + } + }); + return promise.future(); + } + + public SourceStorageSnapshotsClient getSourceStorageSnapshotsClient() { + return new SourceStorageSnapshotsClient(context.getOkapiLocation(), context.getTenantId(), context.getToken(), httpClient); + } + +} diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java new file mode 100644 index 000000000..85187d9c0 --- /dev/null +++ b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java @@ -0,0 +1,11 @@ +package org.folio.inventory.instanceingress.handler; + +import java.util.concurrent.CompletableFuture; +import org.folio.rest.jaxrs.model.InstanceIngressEvent; +import org.folio.rest.jaxrs.model.InstanceIngressPayload; + +public interface InstanceIngressEventHandler { + + CompletableFuture handle(InstanceIngressEvent instanceIngressEvent); + +} diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java new file mode 100644 index 000000000..4c5e93f9a --- /dev/null +++ b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java @@ -0,0 +1,13 @@ +package org.folio.inventory.instanceingress.handler; + +import java.util.concurrent.CompletableFuture; +import org.folio.rest.jaxrs.model.InstanceIngressEvent; + +public class InstanceIngressUpdateEventHandler implements InstanceIngressEventHandler { + + @Override + public CompletableFuture handle(InstanceIngressEvent instanceIngressEvent) { + // to be implemented in MODINV-1008 + return null; + } +} diff --git a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java index 07a2580c7..700b3a44c 100644 --- a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.logging.log4j.Logger; import org.folio.inventory.dataimport.cache.MappingMetadataCache; +import org.folio.inventory.dataimport.cache.ProfileSnapshotCache; import org.folio.inventory.storage.Storage; import org.folio.kafka.GlobalLoadSensor; import org.folio.kafka.KafkaConfig; @@ -38,12 +39,14 @@ public abstract class KafkaConsumerVerticle extends AbstractVerticle { private static final String MAX_DISTRIBUTION_NUMBER_DEFAULT = "100"; private static final String CACHE_EXPIRATION_DEFAULT = "3600"; private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds"; + private static final String PROFILE_SNAPSHOT_CACHE_EXPIRATION_TIME = "inventory.profile-snapshot-cache.expiration.time.seconds"; private final List> consumerWrappers = new ArrayList<>(); private KafkaConfig kafkaConfig; private JsonObject config; private HttpClient httpClient; private Storage storage; private MappingMetadataCache mappingMetadataCache; + private ProfileSnapshotCache profileSnapshotCache; @Override public void stop(Promise stopPromise) { @@ -107,6 +110,14 @@ protected MappingMetadataCache getMappingMetadataCache() { return mappingMetadataCache; } + protected ProfileSnapshotCache getProfileSnapshotCache() { + if (isNull(profileSnapshotCache)) { + var profileSnapshotExpirationTime = getCacheEnvVariable(PROFILE_SNAPSHOT_CACHE_EXPIRATION_TIME); + profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime)); + } + return profileSnapshotCache; + } + protected String getCacheEnvVariable(String variableName) { var cacheExpirationTime = getConfig().getString(variableName); if (isBlank(cacheExpirationTime)) { diff --git a/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java b/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java index 6ca70cd8b..52e2520ca 100644 --- a/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java +++ b/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java @@ -1,17 +1,7 @@ package org.folio.inventory.dataimport.cache; import static com.github.tomakehurst.wiremock.client.WireMock.get; - -import java.util.Optional; -import java.util.UUID; - -import org.folio.inventory.common.Context; -import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil; -import org.folio.MappingMetadataDto; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE; import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.common.Slf4jNotifier; @@ -19,20 +9,27 @@ import com.github.tomakehurst.wiremock.junit.WireMockRule; import com.github.tomakehurst.wiremock.matching.RegexPattern; import com.github.tomakehurst.wiremock.matching.UrlPathPattern; - import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.Json; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; +import java.util.Optional; +import java.util.UUID; +import org.folio.MappingMetadataDto; +import org.folio.inventory.common.Context; +import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; @RunWith(VertxUnitRunner.class) public class MappingMetadataCacheTest { private static final String TENANT_ID = "diku"; private static final String MAPPING_METADATA_URL = "/mapping-metadata"; - private static final String MARC_BIB_RECORD_TYPE = "marc-bib"; private final Vertx vertx = Vertx.vertx(); private final MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java new file mode 100644 index 000000000..ce707dd0c --- /dev/null +++ b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java @@ -0,0 +1,62 @@ +package org.folio.inventory.dataimport.consumers; + +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import net.mguenther.kafka.junit.EmbeddedKafkaCluster; +import org.folio.inventory.InstanceIngressConsumerVerticle; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; +import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig; +import static org.folio.inventory.dataimport.util.KafkaConfigConstants.*; + +@RunWith(VertxUnitRunner.class) +public class InstanceIngressConsumerVerticleTest { + + private static final String KAFKA_ENV_NAME = "test-env"; + private static Vertx vertx = Vertx.vertx(); + + public static EmbeddedKafkaCluster cluster; + + @Test + public void shouldDeployVerticle(TestContext context) { + Async async = context.async(); + cluster = provisionWith(defaultClusterConfig()); + cluster.start(); + String[] hostAndPort = cluster.getBrokerList().split(":"); + DeploymentOptions options = new DeploymentOptions() + .setConfig(new JsonObject() + .put(KAFKA_HOST, hostAndPort[0]) + .put(KAFKA_PORT, hostAndPort[1]) + .put(KAFKA_REPLICATION_FACTOR, "1") + .put(KAFKA_ENV, KAFKA_ENV_NAME) + .put(KAFKA_MAX_REQUEST_SIZE, "1048576")) + .setWorker(true); + + Promise promise = Promise.promise(); + vertx.deployVerticle(InstanceIngressConsumerVerticle.class.getName(), options, promise); + + promise.future().onComplete(ar -> { + context.assertTrue(ar.succeeded()); + async.complete(); + }); + + } + + @AfterClass + public static void tearDownClass(TestContext context) { + Async async = context.async(); + vertx.close(ar -> { + cluster.stop(); + async.complete(); + }); + } + +} diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressEventConsumerTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressEventConsumerTest.java new file mode 100644 index 000000000..f86f9e04b --- /dev/null +++ b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressEventConsumerTest.java @@ -0,0 +1,136 @@ +package org.folio.inventory.dataimport.consumers; + +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.Slf4jNotifier; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.matching.RegexPattern; +import com.github.tomakehurst.wiremock.matching.UrlPathPattern; +import com.github.tomakehurst.wiremock.matching.UrlPattern; +import io.vertx.core.Vertx; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.producer.KafkaHeader; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.function.Consumer; +import org.folio.MappingMetadataDto; +import org.folio.inventory.TestUtil; +import org.folio.inventory.common.Context; +import org.folio.inventory.common.domain.Success; +import org.folio.inventory.dataimport.cache.MappingMetadataCache; +import org.folio.inventory.domain.instances.Instance; +import org.folio.inventory.domain.instances.InstanceCollection; +import org.folio.inventory.instanceingress.InstanceIngressEventConsumer; +import org.folio.inventory.storage.Storage; +import org.folio.okapi.common.XOkapiHeaders; +import org.folio.processing.mapping.MappingManager; +import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters; +import org.folio.rest.jaxrs.model.EventMetadata; +import org.folio.rest.jaxrs.model.InstanceIngressEvent; +import org.folio.rest.jaxrs.model.InstanceIngressPayload; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(VertxUnitRunner.class) +public class InstanceIngressEventConsumerTest { + + private static final String MAPPING_RULES_PATH = "src/test/resources/handlers/bib-rules.json"; + private static final String MAPPING_METADATA_URL = "/mapping-metadata"; + private static final String SRS_URL = "/source-storage/records"; + private static final String TENANT_ID = "test_tenant"; + private static final String MARC_RECORD = "src/test/resources/marc/parsedRecord.json"; + private static final String BIB_RECORD = "src/test/resources/handlers/bib-record.json"; + + @Mock + private Storage storage; + @Mock + InstanceCollection instanceRecordCollection; + @Mock + private KafkaConsumerRecord kafkaRecord; + + @Rule + public WireMockRule mockServer = new WireMockRule( + WireMockConfiguration.wireMockConfig() + .dynamicPort() + .notifier(new Slf4jNotifier(true))); + + private InstanceIngressEventConsumer instanceIngressEventConsumer; + + @Before + public void setUp() throws IOException { + MockitoAnnotations.openMocks(this); + MappingManager.clearReaderFactories(); + + var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH)); + + WireMock.stubFor(get(new UrlPathPattern(new RegexPattern(MAPPING_METADATA_URL + "/.*"), true)) + .willReturn(WireMock.ok().withBody(Json.encode(new MappingMetadataDto() + .withMappingParams(Json.encode(new MappingParameters())) + .withMappingRules(mappingRules.toString()))))); + WireMock.stubFor(post(new UrlPattern(new RegexPattern(SRS_URL), true)) + .willReturn(WireMock.created().withBody(TestUtil.readFileFromPath(BIB_RECORD)))); + + doAnswer(invocationOnMock -> { + Instance instanceRecord = invocationOnMock.getArgument(0); + Consumer> successHandler = invocationOnMock.getArgument(1); + successHandler.accept(new Success<>(instanceRecord)); + return null; + }).when(instanceRecordCollection).add(any(), any(Consumer.class), any(Consumer.class)); + + when(storage.getInstanceCollection(any(Context.class))).thenReturn(instanceRecordCollection); + + var vertx = Vertx.vertx(); + var httpClient = vertx.createHttpClient(); + instanceIngressEventConsumer = new InstanceIngressEventConsumer(storage, httpClient, new MappingMetadataCache(vertx, httpClient, 3600)); + } + + @Test + public void shouldReturnSucceededFutureWithObtainedRecordKey(TestContext context) throws IOException { + // given + var async = context.async(); + + var payload = new InstanceIngressPayload() + .withSourceType(InstanceIngressPayload.SourceType.BIBFRAME) + .withSourceRecordIdentifier(UUID.randomUUID().toString()) + .withSourceRecordObject(TestUtil.readFileFromPath(MARC_RECORD)); + var event = new InstanceIngressEvent() + .withId(UUID.randomUUID().toString()) + .withEventType(InstanceIngressEvent.EventType.CREATE_INSTANCE) + .withEventPayload(payload) + .withEventMetadata(new EventMetadata().withTenantId(TENANT_ID)); + + var expectedKafkaRecordKey = "test_key"; + when(kafkaRecord.key()).thenReturn(expectedKafkaRecordKey); + when(kafkaRecord.value()).thenReturn(Json.encode(event)); + when(kafkaRecord.headers()).thenReturn(List.of( + KafkaHeader.header(XOkapiHeaders.URL.toLowerCase(), mockServer.baseUrl()) + ) + ); + + // when + var future = instanceIngressEventConsumer.handle(kafkaRecord); + + // then + future.onComplete(ar -> { + context.assertTrue(ar.succeeded()); + context.assertEquals(expectedKafkaRecordKey, ar.result()); + async.complete(); + }); + } + +} diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandlerTest.java index faed10c4f..902fa2411 100644 --- a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandlerTest.java +++ b/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandlerTest.java @@ -38,6 +38,7 @@ import static org.folio.Record.RecordType.MARC_BIB; import static org.folio.inventory.dataimport.consumers.MarcHoldingsRecordHridSetKafkaHandler.JOB_EXECUTION_ID_KEY; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -110,7 +111,7 @@ public void shouldReturnSucceededFutureWithObtainedRecordKey(TestContext context // given Async async = context.async(); Map payload = new HashMap<>(); - payload.put("MARC_BIB", Json.encode(record)); + payload.put(MARC_BIB_RECORD_FORMAT, Json.encode(record)); payload.put(JOB_EXECUTION_ID_KEY, UUID.randomUUID().toString()); Event event = new Event().withId("01").withEventPayload(Json.encode(payload)); @@ -154,7 +155,7 @@ public void shouldReturnFailedIfOLErrorExist(TestContext context) { // given Async async = context.async(); Map payload = new HashMap<>(); - payload.put("MARC_BIB", Json.encode(record)); + payload.put(MARC_BIB_RECORD_FORMAT, Json.encode(record)); payload.put(JOB_EXECUTION_ID_KEY, UUID.randomUUID().toString()); payload.put("CURRENT_RETRY_NUMBER", "1"); diff --git a/src/test/java/org/folio/inventory/eventhandlers/UpdateInstanceQuickMarcEventHandlerTest.java b/src/test/java/org/folio/inventory/eventhandlers/UpdateInstanceQuickMarcEventHandlerTest.java index f8917c449..a2b32951c 100644 --- a/src/test/java/org/folio/inventory/eventhandlers/UpdateInstanceQuickMarcEventHandlerTest.java +++ b/src/test/java/org/folio/inventory/eventhandlers/UpdateInstanceQuickMarcEventHandlerTest.java @@ -1,5 +1,6 @@ package org.folio.inventory.eventhandlers; +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -107,8 +108,8 @@ record = new JsonObject(TestUtil.readFileFromPath(RECORD_PATH)); @Test public void shouldProcessEvent() { HashMap eventPayload = new HashMap<>(); - eventPayload.put("RECORD_TYPE", "MARC_BIB"); - eventPayload.put("MARC_BIB", record.encode()); + eventPayload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); + eventPayload.put(MARC_BIB_RECORD_FORMAT, record.encode()); eventPayload.put("MAPPING_RULES", mappingRules.encode()); eventPayload.put("MAPPING_PARAMS", new JsonObject().encode()); eventPayload.put("RELATED_RECORD_VERSION", INSTANCE_VERSION); @@ -141,8 +142,8 @@ public void shouldProcessEvent() { @Test public void shouldCompleteExceptionallyOnOLNumberExceeded() { HashMap eventPayload = new HashMap<>(); - eventPayload.put("RECORD_TYPE", "MARC_BIB"); - eventPayload.put("MARC_BIB", record.encode()); + eventPayload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); + eventPayload.put(MARC_BIB_RECORD_FORMAT, record.encode()); eventPayload.put("MAPPING_RULES", mappingRules.encode()); eventPayload.put("MAPPING_PARAMS", new JsonObject().encode()); eventPayload.put("RELATED_RECORD_VERSION", INSTANCE_VERSION); @@ -165,8 +166,8 @@ public void shouldAddPrecedingAndSucceedingTitlesFromIncomingRecord() throws IOE record.getParsedRecord().withContent(PARSED_CONTENT_WITH_PRECEDING_SUCCEEDING_TITLES); HashMap eventPayload = new HashMap<>(); - eventPayload.put("RECORD_TYPE", "MARC_BIB"); - eventPayload.put("MARC_BIB", Json.encode(record)); + eventPayload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); + eventPayload.put(MARC_BIB_RECORD_FORMAT, Json.encode(record)); eventPayload.put("MAPPING_RULES", mappingRules.encode()); eventPayload.put("MAPPING_PARAMS", new JsonObject().encode()); eventPayload.put("RELATED_RECORD_VERSION", INSTANCE_VERSION); @@ -191,8 +192,8 @@ public void shouldAddPrecedingAndSucceedingTitlesFromIncomingRecord() throws IOE @Test public void shouldCompleteExceptionally_whenRecordIsEmpty() { HashMap eventPayload = new HashMap<>(); - eventPayload.put("RECORD_TYPE", "MARC_BIB"); - eventPayload.put("MARC_BIB", ""); + eventPayload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); + eventPayload.put(MARC_BIB_RECORD_FORMAT, ""); eventPayload.put("MAPPING_RULES", mappingRules.encode()); eventPayload.put("MAPPING_PARAMS", new JsonObject().encode()); @@ -211,8 +212,8 @@ public void shouldSendError() { }).when(instanceRecordCollection).update(any(), any(), any()); HashMap eventPayload = new HashMap<>(); - eventPayload.put("RECORD_TYPE", "MARC_BIB"); - eventPayload.put("MARC_BIB", record.encode()); + eventPayload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); + eventPayload.put(MARC_BIB_RECORD_FORMAT, record.encode()); eventPayload.put("MAPPING_RULES", mappingRules.encode()); eventPayload.put("MAPPING_PARAMS", new JsonObject().encode()); diff --git a/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcKafkaHandlerTest.java b/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcKafkaHandlerTest.java index a686ba444..eaee1a6dc 100644 --- a/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcKafkaHandlerTest.java +++ b/src/test/java/org/folio/inventory/quickmarc/consumers/QuickMarcKafkaHandlerTest.java @@ -7,6 +7,8 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; + +import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -229,8 +231,8 @@ public void shouldSendInstanceUpdatedEvent(TestContext context) throws Interrupt // given Async async = context.async(); Map payload = new HashMap<>(); - payload.put("RECORD_TYPE", "MARC_BIB"); - payload.put("MARC_BIB", Json.encode(bibRecord)); + payload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); + payload.put(MARC_BIB_RECORD_FORMAT, Json.encode(bibRecord)); payload.put("MAPPING_RULES", bibMappingRules.encode()); payload.put("MAPPING_PARAMS", new JsonObject().encode()); payload.put("PARSED_RECORD_DTO", Json.encode(new ParsedRecordDto() @@ -422,7 +424,7 @@ public void shouldSendErrorEventWhenPayloadHasNoMarcRecord(TestContext context) // given Async async = context.async(); Map payload = new HashMap<>(); - payload.put("RECORD_TYPE", "MARC_BIB"); + payload.put("RECORD_TYPE", MARC_BIB_RECORD_FORMAT); payload.put("MAPPING_RULES", bibMappingRules.encode()); payload.put("MAPPING_PARAMS", new JsonObject().encode());