From 190b66137bcd548c01997cd0b726e7c46519cb3b Mon Sep 17 00:00:00 2001
From: Pavel Bobylev <131748689+PBobylev@users.noreply.github.com>
Date: Thu, 20 Jun 2024 13:03:52 +0500
Subject: [PATCH] MODINV-986: InstanceIngress create events consumption (#729)
* MODINV-986: step 1: prepare InstanceIngestConsumerVerticle re-using other consumer verticles logic
* MODINV-986: code review remark fixes
* MODINV-986: instance_ingest -> instance_ingress
* MODINV-986: InstanceIngressEventConsumer and CreateInstanceIngressEventHandler
* MODINV-986: NEWS.md update
* MODINV-986: re-using code from CreateInstanceEventHandler and AbstractInstanceEventHandler + proper additional fields setting
* MODINV-986: remove a namespace from a topic name
* MODINV-986: fix instance source mapped to MARC, code optimization
* MODINV-986: post-rebase fixes
* MODINV-986: unit test + fixes
* MODINV-986: unit test + fixes
* MODINV-986: return consumers base property
* MODINV-986: rename source type BIBFRAME to LINKED_DATA
* MODINV-986: minor hotfix
* MODINV-986: postSnapshotInSrs just before saving + minor fixes
---
NEWS.md | 1 +
pom.xml | 1 +
ramls/instance-ingress-event.json | 96 +++++
ramls/instance-ingress-payload.json | 25 ++
.../inventory/DataImportConsumerVerticle.java | 119 +-----
.../InstanceIngressConsumerVerticle.java | 33 ++
.../java/org/folio/inventory/Launcher.java | 8 +
.../MarcBibUpdateConsumerVerticle.java | 86 +---
.../MarcHridSetConsumerVerticle.java | 87 +---
.../inventory/QuickMarcConsumerVerticle.java | 93 +----
.../MarcBibInstanceHridSetKafkaHandler.java | 6 +-
.../consumers/MarcBibUpdateKafkaHandler.java | 2 +-
.../actions/AbstractInstanceEventHandler.java | 29 +-
.../actions/CreateInstanceEventHandler.java | 13 +-
.../actions/InstanceUpdateDelegate.java | 4 +-
.../actions/ReplaceInstanceEventHandler.java | 9 +-
.../dataimport/util/AdditionalFieldsUtil.java | 1 +
.../dataimport/util/MappingConstants.java | 19 +
.../inventory/domain/instances/Instance.java | 7 +-
.../InstanceIngressEventConsumer.java | 106 +++++
.../CreateInstanceIngressEventHandler.java | 251 ++++++++++++
.../handler/InstanceIngressEventHandler.java | 11 +
.../InstanceIngressUpdateEventHandler.java | 14 +
.../support/KafkaConsumerVerticle.java | 162 ++++++++
.../java/org/folio/inventory/TestUtil.java | 27 ++
.../MarcInstanceSharingHandlerImplTest.java | 16 +-
.../util/RestDataImportHelperTest.java | 33 +-
.../cache/MappingMetadataCacheTest.java | 1 -
.../InstanceIngressConsumerVerticleTest.java | 62 +++
.../CreateInstanceEventHandlerTest.java | 23 +-
.../ReplaceInstanceEventHandlerTest.java | 22 +-
...teInstanceIngressEventHandlerUnitTest.java | 382 ++++++++++++++++++
32 files changed, 1334 insertions(+), 415 deletions(-)
create mode 100644 ramls/instance-ingress-event.json
create mode 100644 ramls/instance-ingress-payload.json
create mode 100644 src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java
create mode 100644 src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java
create mode 100644 src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java
create mode 100644 src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java
create mode 100644 src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java
create mode 100644 src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java
create mode 100644 src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java
create mode 100644 src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java
create mode 100644 src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java
diff --git a/NEWS.md b/NEWS.md
index 451cd9405..4a7237367 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -11,6 +11,7 @@
* Replace GET with POST request for fetching instances and holdings on /items endpoint to omit 414 error [MODINV-943](https://folio-org.atlassian.net/browse/MODINV-943)
* Call suppress-on-discovery for source record on holding update if discoverySuppress is true [MODINV-977](https://folio-org.atlassian.net/browse/MODINV-977)
* Requires `holdings-storage 2.0 3.0 4.0 5.0 6.0 7.0`
+* InstanceIngress create events consumption [MODINV-986](https://folio-org.atlassian.net/browse/MODINV-986)
## 20.2.0 2023-03-20
* Inventory cannot process Holdings with virtual fields ([MODINV-941](https://issues.folio.org/browse/MODINV-941))
diff --git a/pom.xml b/pom.xml
index 8e36cfbb6..4e7b3104d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -445,6 +445,7 @@
${basedir}/ramls/holdings-record.json
${basedir}/ramls/holdings-records-source.json
${basedir}/ramls/mappingMetadataDto.json
+ ${basedir}/ramls/instance-ingress-event.json
org.folio
true
diff --git a/ramls/instance-ingress-event.json b/ramls/instance-ingress-event.json
new file mode 100644
index 000000000..d34e079c9
--- /dev/null
+++ b/ramls/instance-ingress-event.json
@@ -0,0 +1,96 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "description": "Instance ingress event data model",
+ "javaType": "org.folio.rest.jaxrs.model.InstanceIngressEvent",
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "id": {
+ "description": "UUID",
+ "$ref": "uuid.json"
+ },
+ "eventType": {
+ "type": "string",
+ "enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"],
+ "description": "Instance ingress event type"
+ },
+ "InstanceIngressEventMetadata": {
+ "description": "Event metadata",
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "eventTTL": {
+ "description": "Time-to-live (TTL) for event in minutes",
+ "type": "integer"
+ },
+ "correlationId": {
+ "description": "Id to track related events, can be a meaningful string or a UUID",
+ "type": "string"
+ },
+ "originalEventId": {
+ "description": "Id of the event that started the sequence of related events",
+ "$ref": "uuid.json"
+ },
+ "publisherCallback": {
+ "description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time",
+ "type": "object",
+ "properties": {
+ "endpoint": {
+ "description": "Callback endpoint",
+ "type": "string"
+ },
+ "eventType": {
+ "description": "Error Event Type",
+ "type": "string"
+ }
+ }
+ },
+ "createdDate": {
+ "description": "Timestamp when event was created",
+ "type": "string",
+ "format": "date-time"
+ },
+ "publishedDate": {
+ "description": "Timestamp when event was initially published to the underlying topic",
+ "type": "string",
+ "format": "date-time"
+ },
+ "createdBy": {
+ "description": "Username of the user whose action caused an event",
+ "type": "string"
+ },
+ "publishedBy": {
+ "description": "Name and version of the module that published an event",
+ "type": "string"
+ }
+ },
+ "required": [
+ "eventTTL",
+ "publishedBy"
+ ]
+ },
+ "eventPayload": {
+ "type": "object",
+ "description": "An instance source record container",
+ "$ref": "instance-ingress-payload.json"
+ },
+ "tenant": {
+ "description": "Tenant id",
+ "type": "string"
+ },
+ "ts": {
+ "description": "Message timestamp",
+ "type": "string",
+ "format": "date-time"
+ }
+ },
+ "excludedFromEqualsAndHashCode": [
+ "eventMetadata",
+ "tenant",
+ "ts"
+ ],
+ "required": [
+ "id",
+ "eventType"
+ ]
+}
diff --git a/ramls/instance-ingress-payload.json b/ramls/instance-ingress-payload.json
new file mode 100644
index 000000000..3533b307a
--- /dev/null
+++ b/ramls/instance-ingress-payload.json
@@ -0,0 +1,25 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "description": "An instance source record container",
+ "type": "object",
+ "properties": {
+ "sourceRecordIdentifier": {
+ "type": "string",
+ "description": "The source record identifier"
+ },
+ "sourceRecordObject": {
+ "type": "string",
+ "description": "The source record JSON object"
+ },
+ "sourceType": {
+ "type": "string",
+ "enum": ["FOLIO", "LINKED_DATA", "MARC"],
+ "description": "Source type"
+ }
+ },
+ "additionalProperties": true,
+ "required": [
+ "sourceRecordObject",
+ "sourceType"
+ ]
+}
diff --git a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
index f98349069..4546b8edc 100644
--- a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
+++ b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
@@ -1,5 +1,6 @@
package org.folio.inventory;
+import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED;
@@ -15,6 +16,7 @@
import static org.folio.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.DataImportEventTypes.DI_PENDING_ORDER_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
+import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED;
@@ -25,45 +27,20 @@
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
-import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
-import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
-import java.util.ArrayList;
+import io.vertx.core.Promise;
import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventTypes;
import org.folio.inventory.consortium.cache.ConsortiumDataCache;
-import org.folio.inventory.dataimport.cache.MappingMetadataCache;
-import org.folio.inventory.dataimport.cache.ProfileSnapshotCache;
import org.folio.inventory.dataimport.consumers.DataImportKafkaHandler;
import org.folio.inventory.dataimport.util.ConsumerWrapperUtil;
-import org.folio.inventory.storage.Storage;
-import org.folio.kafka.AsyncRecordHandler;
-import org.folio.kafka.GlobalLoadSensor;
-import org.folio.kafka.KafkaConfig;
-import org.folio.kafka.KafkaConsumerWrapper;
-import org.folio.kafka.KafkaTopicNameHelper;
-import org.folio.kafka.SubscriptionDefinition;
+import org.folio.inventory.support.KafkaConsumerVerticle;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.events.EventManager;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.Future;
-import io.vertx.core.Promise;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.json.JsonObject;
-
-public class DataImportConsumerVerticle extends AbstractVerticle {
+public class DataImportConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(DataImportConsumerVerticle.class);
@@ -96,88 +73,32 @@ public class DataImportConsumerVerticle extends AbstractVerticle {
DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED,
DI_PENDING_ORDER_CREATED
);
-
- private final int loadLimit = getLoadLimit();
- private final int maxDistributionNumber = getMaxDistributionNumber();
- private final List> consumerWrappers = new ArrayList<>();
+ private static final String LOAD_LIMIT_PROPERTY = "DataImportConsumer";
+ private static final String MAX_DISTRIBUTION_PROPERTY = "DataImportConsumerVerticle";
@Override
public void start(Promise startPromise) {
- JsonObject config = vertx.getOrCreateContext().config();
- KafkaConfig kafkaConfig = KafkaConfig.builder()
- .envId(config.getString(KAFKA_ENV))
- .kafkaHost(config.getString(KAFKA_HOST))
- .kafkaPort(config.getString(KAFKA_PORT))
- .okapiUrl(config.getString(OKAPI_URL))
- .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
- .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
- .build();
- LOGGER.info("kafkaConfig: {}", kafkaConfig);
- EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber);
-
- HttpClient client = vertx.createHttpClient();
- Storage storage = Storage.basedUpon(config, client);
-
- String profileSnapshotExpirationTime = getCacheEnvVariable(config, "inventory.profile-snapshot-cache.expiration.time.seconds");
+ EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY));
+ var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());
- ProfileSnapshotCache profileSnapshotCache = new ProfileSnapshotCache(vertx, client, Long.parseLong(profileSnapshotExpirationTime));
- ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client);
- MappingMetadataCache mappingMetadataCache = MappingMetadataCache.getInstance(vertx, client);
- DataImportKafkaHandler dataImportKafkaHandler = new DataImportKafkaHandler(
- vertx, storage, client, profileSnapshotCache, kafkaConfig, mappingMetadataCache, consortiumDataCache);
+ var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(),
+ getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);
- List>> futures = EVENT_TYPES.stream()
- .map(eventType -> createKafkaConsumerWrapper(kafkaConfig, eventType, dataImportKafkaHandler))
- .collect(Collectors.toList());
+ var futures = EVENT_TYPES.stream()
+ .map(type -> super.createConsumer(type.value(), LOAD_LIMIT_PROPERTY))
+ .map(consumerWrapper -> consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName())
+ .map(consumerWrapper)
+ )
+ .toList();
GenericCompositeFuture.all(futures)
.onFailure(startPromise::fail)
- .onSuccess(ar -> {
- futures.forEach(future -> consumerWrappers.add(future.result()));
- startPromise.complete();
- });
+ .onSuccess(ar -> startPromise.complete());
}
@Override
- public void stop(Promise stopPromise) {
- List> stopFutures = consumerWrappers.stream()
- .map(KafkaConsumerWrapper::stop)
- .collect(Collectors.toList());
-
- GenericCompositeFuture.join(stopFutures).onComplete(ar -> stopPromise.complete());
- }
-
- private Future> createKafkaConsumerWrapper(KafkaConfig kafkaConfig, DataImportEventTypes eventType,
- AsyncRecordHandler recordHandler) {
- SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(kafkaConfig.getEnvId(),
- KafkaTopicNameHelper.getDefaultNameSpace(), eventType.value());
-
- KafkaConsumerWrapper consumerWrapper = KafkaConsumerWrapper.builder()
- .context(context)
- .vertx(vertx)
- .kafkaConfig(kafkaConfig)
- .loadLimit(loadLimit)
- .globalLoadSensor(new GlobalLoadSensor())
- .subscriptionDefinition(subscriptionDefinition)
- .build();
-
- return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName())
- .map(consumerWrapper);
+ protected Logger getLogger() {
+ return LOGGER;
}
- private int getLoadLimit() {
- return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumer.loadLimit", "5"));
- }
-
- private int getMaxDistributionNumber() {
- return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber", "100"));
- }
-
- private String getCacheEnvVariable(JsonObject config, String variableName) {
- String cacheExpirationTime = config.getString(variableName);
- if (StringUtils.isBlank(cacheExpirationTime)) {
- cacheExpirationTime = "3600";
- }
- return cacheExpirationTime;
- }
}
diff --git a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java
new file mode 100644
index 000000000..be82c7d95
--- /dev/null
+++ b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java
@@ -0,0 +1,33 @@
+package org.folio.inventory;
+
+import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;
+
+import io.vertx.core.Promise;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
+import org.folio.inventory.support.KafkaConsumerVerticle;
+
+public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {
+
+ private static final Logger LOGGER = LogManager.getLogger(InstanceIngressConsumerVerticle.class);
+ private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress";
+ private static final String BASE_PROPERTY = "InstanceIngressConsumerVerticle";
+
+ @Override
+ public void start(Promise startPromise) {
+ var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx, getStorage(), getHttpClient(), getMappingMetadataCache());
+
+ var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, BASE_PROPERTY, false);
+
+ consumerWrapper.start(instanceIngressEventHandler, constructModuleName())
+ .onFailure(startPromise::fail)
+ .onSuccess(ar -> startPromise.complete());
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return LOGGER;
+ }
+
+}
diff --git a/src/main/java/org/folio/inventory/Launcher.java b/src/main/java/org/folio/inventory/Launcher.java
index 0746df875..f4251e0fa 100644
--- a/src/main/java/org/folio/inventory/Launcher.java
+++ b/src/main/java/org/folio/inventory/Launcher.java
@@ -25,6 +25,7 @@ public class Launcher {
private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber";
private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber";
private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber";
+ private static final String INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngressConsumerVerticle.instancesNumber";
private static final VertxAssistant vertxAssistant = new VertxAssistant();
private static String inventoryModuleDeploymentId;
@@ -33,6 +34,7 @@ public class Launcher {
private static String quickMarcConsumerVerticleDeploymentId;
private static String marcBibUpdateConsumerVerticleDeploymentId;
private static String consortiumInstanceSharingVerticleDeploymentId;
+ private static String instanceIngressConsumerVerticleDeploymentId;
public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
@@ -96,12 +98,14 @@ private static void startConsumerVerticles(Map consumerVerticles
int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1"));
int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3"));
int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3"));
+ int instanceIngressConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG, "3"));
CompletableFuture future1 = new CompletableFuture<>();
CompletableFuture future2 = new CompletableFuture<>();
CompletableFuture future3 = new CompletableFuture<>();
CompletableFuture future4 = new CompletableFuture<>();
CompletableFuture future5 = new CompletableFuture<>();
+ CompletableFuture future6 = new CompletableFuture<>();
vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(),
consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1);
vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(),
@@ -112,12 +116,15 @@ private static void startConsumerVerticles(Map consumerVerticles
consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4);
vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(),
consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5);
+ vertxAssistant.deployVerticle(InstanceIngressConsumerVerticle.class.getName(),
+ consumerVerticlesConfig, instanceIngressConsumerVerticleNumber, future6);
consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS);
marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS);
quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS);
marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS);
consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS);
+ instanceIngressConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
}
private static void stop() {
@@ -133,6 +140,7 @@ private static void stop() {
.thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId))
+ .thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngressConsumerVerticleDeploymentId))
.thenAccept(v -> vertxAssistant.stop(stopped));
stopped.thenAccept(v -> log.info("Server Stopped"));
diff --git a/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java
index 7754f1036..9c355c769 100644
--- a/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java
+++ b/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java
@@ -1,100 +1,36 @@
package org.folio.inventory;
import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
-import io.vertx.core.AbstractVerticle;
+
import io.vertx.core.Promise;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.json.JsonObject;
-import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.consumers.MarcBibUpdateKafkaHandler;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
-import org.folio.inventory.storage.Storage;
-import org.folio.kafka.GlobalLoadSensor;
-import org.folio.kafka.KafkaConfig;
-import org.folio.kafka.KafkaConsumerWrapper;
-import org.folio.kafka.SubscriptionDefinition;
+import org.folio.inventory.support.KafkaConsumerVerticle;
-public class MarcBibUpdateConsumerVerticle extends AbstractVerticle {
+public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class);
- private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor();
- private static final String SRS_MARC_BIB_TOPIC_NAME = "srs.marc-bib";
- private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds";
- private final int loadLimit = getLoadLimit();
- private KafkaConsumerWrapper marcBibUpdateConsumerWrapper;
+ private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib";
+ private static final String BASE_PROPERTY = "MarcBibUpdateConsumer";
@Override
public void start(Promise startPromise) {
- JsonObject config = vertx.getOrCreateContext().config();
- KafkaConfig kafkaConfig = getKafkaConfig(config);
-
- HttpClient client = vertx.createHttpClient();
- Storage storage = Storage.basedUpon(config, client);
- InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage);
+ var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());
- MappingMetadataCache mappingMetadataCache = MappingMetadataCache.getInstance(vertx, client);
+ var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(BASE_PROPERTY),
+ getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache());
- MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx,
- getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);
+ var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT, BASE_PROPERTY);
- marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME);
marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}
- private KafkaConsumerWrapper createConsumer(KafkaConfig kafkaConfig, String topicEventType) {
- SubscriptionDefinition subscriptionDefinition = SubscriptionDefinition.builder()
- .eventType(topicEventType)
- .subscriptionPattern(formatSubscriptionPattern(kafkaConfig.getEnvId(), topicEventType))
- .build();
-
- return KafkaConsumerWrapper.builder()
- .context(context)
- .vertx(vertx)
- .kafkaConfig(kafkaConfig)
- .loadLimit(loadLimit)
- .globalLoadSensor(GLOBAL_LOAD_SENSOR)
- .subscriptionDefinition(subscriptionDefinition)
- .build();
- }
-
@Override
- public void stop(Promise stopPromise) {
- marcBibUpdateConsumerWrapper.stop()
- .onComplete(ar -> stopPromise.complete());
- }
-
- private int getLoadLimit() {
- return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.loadLimit","5"));
- }
-
- private int getMaxDistributionNumber() {
- return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.maxDistributionNumber", "100"));
+ protected Logger getLogger() {
+ return LOGGER;
}
- private KafkaConfig getKafkaConfig(JsonObject config) {
- KafkaConfig kafkaConfig = KafkaConfig.builder()
- .envId(config.getString(KAFKA_ENV))
- .kafkaHost(config.getString(KAFKA_HOST))
- .kafkaPort(config.getString(KAFKA_PORT))
- .okapiUrl(config.getString(OKAPI_URL))
- .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
- .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
- .build();
- LOGGER.info("kafkaConfig: {}", kafkaConfig);
- return kafkaConfig;
- }
-
- public static String formatSubscriptionPattern(String env, String eventType) {
- return String.join("\\.", env, "\\w{1,}", eventType);
- }
}
diff --git a/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java
index 12af1cbac..947a694ea 100644
--- a/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java
+++ b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java
@@ -1,72 +1,36 @@
package org.folio.inventory;
-import io.vertx.core.AbstractVerticle;
+import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_INSTANCE_HRID_SET;
+import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET;
+import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;
+
import io.vertx.core.CompositeFuture;
import io.vertx.core.Promise;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.json.JsonObject;
-import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.folio.DataImportEventTypes;
-import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.consumers.MarcBibInstanceHridSetKafkaHandler;
import org.folio.inventory.dataimport.consumers.MarcHoldingsRecordHridSetKafkaHandler;
import org.folio.inventory.dataimport.handlers.actions.HoldingsUpdateDelegate;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.services.HoldingsCollectionService;
-import org.folio.inventory.storage.Storage;
-import org.folio.kafka.GlobalLoadSensor;
-import org.folio.kafka.KafkaConfig;
-import org.folio.kafka.KafkaConsumerWrapper;
-import org.folio.kafka.KafkaTopicNameHelper;
-import org.folio.kafka.SubscriptionDefinition;
-
-import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_INSTANCE_HRID_SET;
-import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET;
-import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
+import org.folio.inventory.support.KafkaConsumerVerticle;
-public class MarcHridSetConsumerVerticle extends AbstractVerticle {
+public class MarcHridSetConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(MarcHridSetConsumerVerticle.class);
- private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor();
-
- private final int loadLimit = getLoadLimit();
- private KafkaConsumerWrapper marcBibConsumerWrapper;
- private KafkaConsumerWrapper marcHoldingsConsumerWrapper;
+ private static final String BASE_PROPERTY = "MarcBibInstanceHridSetConsumer";
@Override
public void start(Promise startPromise) {
- JsonObject config = vertx.getOrCreateContext().config();
- KafkaConfig kafkaConfig = KafkaConfig.builder()
- .envId(config.getString(KAFKA_ENV))
- .kafkaHost(config.getString(KAFKA_HOST))
- .kafkaPort(config.getString(KAFKA_PORT))
- .okapiUrl(config.getString(OKAPI_URL))
- .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
- .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
- .build();
- LOGGER.info("kafkaConfig: {}", kafkaConfig);
-
- marcBibConsumerWrapper = createConsumerByEvent(kafkaConfig, DI_SRS_MARC_BIB_INSTANCE_HRID_SET);
- marcHoldingsConsumerWrapper = createConsumerByEvent(kafkaConfig, DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET);
-
- HttpClient client = vertx.createHttpClient();
- Storage storage = Storage.basedUpon(config, client);
- HoldingsCollectionService holdingsCollectionService = new HoldingsCollectionService();
- InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage);
- HoldingsUpdateDelegate holdingsRecordUpdateDelegate = new HoldingsUpdateDelegate(storage, holdingsCollectionService);
+ var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value(), BASE_PROPERTY);
+ var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value(), BASE_PROPERTY);
- MappingMetadataCache mappingMetadataCache = MappingMetadataCache.getInstance(vertx, client);
+ var holdingsCollectionService = new HoldingsCollectionService();
+ var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());
+ var holdingsRecordUpdateDelegate = new HoldingsUpdateDelegate(getStorage(), holdingsCollectionService);
- MarcBibInstanceHridSetKafkaHandler marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, mappingMetadataCache);
- MarcHoldingsRecordHridSetKafkaHandler marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, mappingMetadataCache);
+ var marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, getMappingMetadataCache());
+ var marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, getMappingMetadataCache());
CompositeFuture.all(
marcBibConsumerWrapper.start(marcBibInstanceHridSetKafkaHandler, constructModuleName()),
@@ -77,27 +41,8 @@ public void start(Promise startPromise) {
}
@Override
- public void stop(Promise stopPromise) {
- CompositeFuture.join(marcBibConsumerWrapper.stop(), marcHoldingsConsumerWrapper.stop())
- .onComplete(ar -> stopPromise.complete());
+ protected Logger getLogger() {
+ return LOGGER;
}
- private int getLoadLimit() {
- return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibInstanceHridSetConsumer.loadLimit", "5"));
- }
-
- private KafkaConsumerWrapper createConsumerByEvent(KafkaConfig kafkaConfig, DataImportEventTypes event) {
- SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(
- kafkaConfig.getEnvId(),
- KafkaTopicNameHelper.getDefaultNameSpace(), event.value()
- );
- return KafkaConsumerWrapper.builder()
- .context(context)
- .vertx(vertx)
- .kafkaConfig(kafkaConfig)
- .loadLimit(loadLimit)
- .globalLoadSensor(GLOBAL_LOAD_SENSOR)
- .subscriptionDefinition(subscriptionDefinition)
- .build();
- }
}
diff --git a/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java b/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java
index e4ebdb934..b0414f310 100644
--- a/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java
+++ b/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java
@@ -1,10 +1,6 @@
package org.folio.inventory;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.Future;
import io.vertx.core.Promise;
-import io.vertx.core.http.HttpClient;
-import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -13,93 +9,32 @@
import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper;
import org.folio.inventory.dataimport.util.ConsumerWrapperUtil;
import org.folio.inventory.services.HoldingsCollectionService;
-import org.folio.inventory.storage.Storage;
-import org.folio.kafka.AsyncRecordHandler;
-import org.folio.kafka.GlobalLoadSensor;
-import org.folio.kafka.KafkaConfig;
-import org.folio.kafka.KafkaConsumerWrapper;
+import org.folio.inventory.support.KafkaConsumerVerticle;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
-import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
-import static org.folio.kafka.KafkaTopicNameHelper.createSubscriptionDefinition;
-import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace;
-
-public class QuickMarcConsumerVerticle extends AbstractVerticle {
+public class QuickMarcConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(QuickMarcConsumerVerticle.class);
-
- private final int loadLimit = getLoadLimit();
- private final int maxDistributionNumber = getMaxDistributionNumber();
- private KafkaConsumerWrapper consumer;
+ private static final String LOAD_LIMIT_PROPERTY = "QuickMarcConsumer";
+ private static final String MAX_DISTRIBUTION_PROPERTY = "QuickMarcConsumerVerticle";
@Override
public void start(Promise startPromise) {
- JsonObject config = vertx.getOrCreateContext().config();
- KafkaConfig kafkaConfig = getKafkaConfig(config);
-
- HttpClient client = vertx.createHttpClient();
- Storage storage = Storage.basedUpon(config, client);
-
- var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(client));
- HoldingsCollectionService holdingsCollectionService = new HoldingsCollectionService();
- QuickMarcKafkaHandler handler = new QuickMarcKafkaHandler(vertx, storage, maxDistributionNumber, kafkaConfig,
- precedingSucceedingTitlesHelper, holdingsCollectionService);
+ var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(getHttpClient()));
+ var holdingsCollectionService = new HoldingsCollectionService();
+ var handler = new QuickMarcKafkaHandler(vertx, getStorage(), getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY),
+ getKafkaConfig(), precedingSucceedingTitlesHelper, holdingsCollectionService);
- var kafkaConsumerFuture = createKafkaConsumer(kafkaConfig, QMEventTypes.QM_SRS_MARC_RECORD_UPDATED, handler);
+ var consumer = createConsumer(QMEventTypes.QM_SRS_MARC_RECORD_UPDATED.name(), LOAD_LIMIT_PROPERTY);
- kafkaConsumerFuture
+ consumer.start(handler, ConsumerWrapperUtil.constructModuleName())
+ .map(consumer)
.onFailure(startPromise::fail)
- .onSuccess(ar -> {
- consumer = ar;
- startPromise.complete();
- });
- }
-
- private KafkaConfig getKafkaConfig(JsonObject config) {
- KafkaConfig kafkaConfig = KafkaConfig.builder()
- .envId(config.getString(KAFKA_ENV))
- .kafkaHost(config.getString(KAFKA_HOST))
- .kafkaPort(config.getString(KAFKA_PORT))
- .okapiUrl(config.getString(OKAPI_URL))
- .replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
- .maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
- .build();
- LOGGER.info("kafkaConfig: {}", kafkaConfig);
- return kafkaConfig;
+ .onSuccess(ar -> startPromise.complete());
}
@Override
- public void stop(Promise stopPromise) {
- consumer.stop().onComplete(ar -> stopPromise.complete());
+ protected Logger getLogger() {
+ return LOGGER;
}
- private Future> createKafkaConsumer(KafkaConfig kafkaConfig, QMEventTypes eventType,
- AsyncRecordHandler recordHandler) {
- var subscriptionDefinition = createSubscriptionDefinition(kafkaConfig.getEnvId(),
- getDefaultNameSpace(), eventType.name());
-
- KafkaConsumerWrapper consumerWrapper = KafkaConsumerWrapper.builder()
- .context(context)
- .vertx(vertx)
- .kafkaConfig(kafkaConfig)
- .loadLimit(loadLimit)
- .globalLoadSensor(new GlobalLoadSensor())
- .subscriptionDefinition(subscriptionDefinition)
- .build();
-
- return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName())
- .map(consumerWrapper);
- }
-
- private int getLoadLimit() {
- return Integer.parseInt(System.getProperty("inventory.kafka.QuickMarcConsumer.loadLimit", "5"));
- }
-
- private int getMaxDistributionNumber() {
- return Integer.parseInt(System.getProperty("inventory.kafka.QuickMarcConsumerVerticle.maxDistributionNumber", "100"));
- }
}
diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java
index 3a4058b96..a5195b888 100644
--- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java
+++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetKafkaHandler.java
@@ -2,6 +2,7 @@
import static java.lang.String.format;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
@@ -37,7 +38,6 @@ public class MarcBibInstanceHridSetKafkaHandler implements AsyncRecordHandler handle(KafkaConsumerRecord record) {
String jobExecutionId = eventPayload.get(JOB_EXECUTION_ID_HEADER);
LOGGER.info("Event payload has been received with event type: {}, recordId: {} by jobExecution: {} and chunkId: {}", event.getEventType(), recordId, jobExecutionId, chunkId);
- if (isEmpty(eventPayload.get(MARC_KEY))) {
+ if (isEmpty(eventPayload.get(MARC_BIB_RECORD_FORMAT))) {
String message = format("Event payload does not contain required data to update Instance with event type: '%s', recordId: '%s' by jobExecution: '%s' and chunkId: '%s'", event.getEventType(), recordId, jobExecutionId, chunkId);
LOGGER.error(message);
return Future.failedFuture(message);
}
Context context = EventHandlingUtil.constructContext(headersMap.get(OKAPI_TENANT_HEADER), headersMap.get(OKAPI_TOKEN_HEADER), headersMap.get(OKAPI_URL_HEADER));
- Record marcRecord = new JsonObject(eventPayload.get(MARC_KEY)).mapTo(Record.class);
+ Record marcRecord = new JsonObject(eventPayload.get(MARC_BIB_RECORD_FORMAT)).mapTo(Record.class);
mappingMetadataCache.get(jobExecutionId, context)
.map(metadataOptional -> metadataOptional.orElseThrow(() ->
diff --git a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java
index 6f6324687..f50d8d6a1 100644
--- a/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java
+++ b/src/main/java/org/folio/inventory/dataimport/consumers/MarcBibUpdateKafkaHandler.java
@@ -3,6 +3,7 @@
import static java.lang.String.format;
import static java.util.Objects.isNull;
import static org.folio.inventory.EntityLinksKafkaTopic.LINKS_STATS;
+import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.FAIL;
import static org.folio.rest.jaxrs.model.LinkUpdateReport.Status.SUCCESS;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
@@ -48,7 +49,6 @@ public class MarcBibUpdateKafkaHandler implements AsyncRecordHandler requiredFields = Arrays.asList("source", "title", "instanceTypeId");
- private static final String ID_FIELD = "id";
private static final boolean IS_HRID_FILLING_NEEDED_FOR_INSTANCE = true;
protected final Storage storage;
@@ -97,7 +94,7 @@ protected org.folio.Instance defaultMapRecordToInstance(DataImportEventPayload d
protected Future saveRecordInSrsAndHandleResponse(DataImportEventPayload payload, Record srcRecord,
Instance instance, InstanceCollection instanceCollection, String tenantId) {
Promise promise = Promise.promise();
- getSourceStorageRecordsClient(payload, tenantId).postSourceStorageRecords(srcRecord)
+ getSourceStorageRecordsClient(payload.getOkapiUrl(), payload.getToken(), tenantId).postSourceStorageRecords(srcRecord)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
@@ -120,7 +117,7 @@ protected Future saveRecordInSrsAndHandleResponse(DataImportEventPaylo
protected Future putRecordInSrsAndHandleResponse(DataImportEventPayload payload, Record srcRecord,
Instance instance, String matchedId, String tenantId) {
Promise promise = Promise.promise();
- getSourceStorageRecordsClient(payload, tenantId).putSourceStorageRecordsGenerationById(matchedId ,srcRecord)
+ getSourceStorageRecordsClient(payload.getOkapiUrl(), payload.getToken(), tenantId).putSourceStorageRecordsGenerationById(matchedId ,srcRecord)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_OK.toInt()) {
@@ -139,9 +136,9 @@ protected Future putRecordInSrsAndHandleResponse(DataImportEventPayloa
return promise.future();
}
- protected Future postSnapshotInSrsAndHandleResponse(DataImportEventPayload payload, Snapshot snapshot, String tenantId) {
+ protected Future postSnapshotInSrsAndHandleResponse(String okapiUrl, String token, Snapshot snapshot, String tenantId) {
Promise promise = Promise.promise();
- getSourceStorageSnapshotsClient(payload, tenantId).postSourceStorageSnapshots(snapshot)
+ getSourceStorageSnapshotsClient(okapiUrl, token, tenantId).postSourceStorageSnapshots(snapshot)
.onComplete(ar -> {
var result = ar.result();
if (ar.succeeded() && result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
@@ -189,7 +186,7 @@ protected void setExternalIds(Record srcRecord, Instance instance) {
}
}
- private void deleteInstance(String id, String jobExecutionId, InstanceCollection instanceCollection) {
+ protected void deleteInstance(String id, String jobExecutionId, InstanceCollection instanceCollection) {
Promise promise = Promise.promise();
instanceCollection.delete(id, success -> {
LOGGER.info("deleteInstance:: Instance was deleted by id: '{}', jobExecutionId: '{}'", id, jobExecutionId);
@@ -203,12 +200,12 @@ private void deleteInstance(String id, String jobExecutionId, InstanceCollection
promise.future();
}
- public SourceStorageRecordsClient getSourceStorageRecordsClient(DataImportEventPayload payload, String tenantId) {
- return new SourceStorageRecordsClient(payload.getOkapiUrl(), tenantId, payload.getToken(), getHttpClient());
+ public SourceStorageRecordsClient getSourceStorageRecordsClient(String okapiUrl, String token, String tenantId) {
+ return new SourceStorageRecordsClient(okapiUrl, tenantId, token, getHttpClient());
}
- public SourceStorageSnapshotsClient getSourceStorageSnapshotsClient(DataImportEventPayload payload, String tenantId) {
- return new SourceStorageSnapshotsClient(payload.getOkapiUrl(), tenantId, payload.getToken(), getHttpClient());
+ public SourceStorageSnapshotsClient getSourceStorageSnapshotsClient(String okapiUrl, String token, String tenantId) {
+ return new SourceStorageSnapshotsClient(okapiUrl, tenantId, token, getHttpClient());
}
private Record encodeParsedRecordContent(Record srcRecord) {
@@ -230,7 +227,7 @@ protected void setSuppressFormDiscovery(Record srcRecord, boolean suppressFromDi
}
private void executeHrIdManipulation(Record srcRecord, JsonObject externalEntity) {
- var externalId = externalEntity.getString(ID_FIELD);
+ var externalId = externalEntity.getString(ID);
var externalHrId = extractHridForInstance(externalEntity);
var externalIdsHolder = srcRecord.getExternalIdsHolder();
setExternalIdsForInstance(externalIdsHolder, externalId, externalHrId);
diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java
index ed73ee426..ad0f5b1dd 100644
--- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java
+++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandler.java
@@ -45,7 +45,10 @@
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.*;
import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
+import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
+import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
+import static org.folio.inventory.domain.instances.Instance.ID;
import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE;
@@ -59,7 +62,7 @@ public class CreateInstanceEventHandler extends AbstractInstanceEventHandler {
private static final String INSTANCE_CREATION_999_ERROR_MESSAGE = "A new Instance was not created because the incoming record already contained a 999ff$s or 999ff$i field";
private static final String RECORD_ID_HEADER = "recordId";
private static final String CHUNK_ID_HEADER = "chunkId";
- private final IdStorageService idStorageService;
+ protected final IdStorageService idStorageService;
private final OrderHelperService orderHelperService;
public CreateInstanceEventHandler(Storage storage, PrecedingSucceedingTitlesHelper precedingSucceedingTitlesHelper,
@@ -120,7 +123,7 @@ public CompletableFuture handle(DataImportEventPayload d
.compose(v -> {
InstanceCollection instanceCollection = storage.getInstanceCollection(context);
JsonObject instanceAsJson = prepareInstance(dataImportEventPayload, instanceId, jobExecutionId);
- List requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields);
+ List requiredFieldsErrors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS);
if (!requiredFieldsErrors.isEmpty()) {
String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", requiredFieldsErrors,
jobExecutionId, recordId, chunkId);
@@ -173,7 +176,7 @@ public CompletableFuture handle(DataImportEventPayload d
return future;
}
- private String getInstanceId(Record record) {
+ protected String getInstanceId(Record record) {
String subfield999ffi = ParsedRecordUtil.getAdditionalSubfieldValue(record.getParsedRecord(), ParsedRecordUtil.AdditionalSubfields.I);
return isEmpty(subfield999ffi) ? UUID.randomUUID().toString() : subfield999ffi;
}
@@ -183,7 +186,7 @@ private JsonObject prepareInstance(DataImportEventPayload dataImportEventPayload
if (instanceAsJson.getJsonObject(INSTANCE_PATH) != null) {
instanceAsJson = instanceAsJson.getJsonObject(INSTANCE_PATH);
}
- instanceAsJson.put("id", instanceId);
+ instanceAsJson.put(ID, instanceId);
instanceAsJson.put(SOURCE_KEY, MARC_FORMAT);
instanceAsJson.remove(HRID_KEY);
@@ -222,7 +225,7 @@ private Future prepareAndExecuteMapping(DataImportEventPayload dataImportE
}
}
- private Future addInstance(Instance instance, InstanceCollection instanceCollection) {
+ protected Future addInstance(Instance instance, InstanceCollection instanceCollection) {
Promise promise = Promise.promise();
instanceCollection.add(instance, success -> promise.complete(success.getResult()),
failure -> {
diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java
index d0ea74252..fb6cb5345 100644
--- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java
+++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/InstanceUpdateDelegate.java
@@ -22,6 +22,7 @@
import static java.lang.String.format;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersUpdateDelegate;
+import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
public class InstanceUpdateDelegate {
@@ -30,7 +31,6 @@ public class InstanceUpdateDelegate {
private static final String MAPPING_RULES_KEY = "MAPPING_RULES";
private static final String MAPPING_PARAMS_KEY = "MAPPING_PARAMS";
private static final String QM_RELATED_RECORD_VERSION_KEY = "RELATED_RECORD_VERSION";
- private static final String MARC_FORMAT = "MARC_BIB";
private final Storage storage;
@@ -46,7 +46,7 @@ public Future handle(Map eventPayload, Record marcReco
JsonObject parsedRecord = retrieveParsedContent(marcRecord.getParsedRecord());
String instanceId = marcRecord.getExternalIdsHolder().getInstanceId();
LOGGER.info("Instance update with instanceId: {}", instanceId);
- RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_FORMAT);
+ RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
var mappedInstance = recordMapper.mapRecord(parsedRecord, mappingParameters, mappingRules);
InstanceCollection instanceCollection = storage.getInstanceCollection(context);
diff --git a/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java b/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java
index 3312c40b7..0b13f2fe1 100644
--- a/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java
+++ b/src/main/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandler.java
@@ -53,6 +53,8 @@
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING;
import static org.folio.inventory.dataimport.util.LoggerUtil.INCOMING_RECORD_ID;
import static org.folio.inventory.dataimport.util.LoggerUtil.logParametersEventHandler;
+import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_PATH;
+import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
import static org.folio.inventory.domain.instances.Instance.DISCOVERY_SUPPRESS_KEY;
import static org.folio.inventory.domain.instances.Instance.HRID_KEY;
import static org.folio.inventory.domain.instances.Instance.METADATA_KEY;
@@ -63,7 +65,6 @@
import static org.folio.inventory.domain.instances.InstanceSource.MARC;
import static org.folio.rest.jaxrs.model.ProfileType.ACTION_PROFILE;
-
public class ReplaceInstanceEventHandler extends AbstractInstanceEventHandler { // NOSONAR
private static final String PAYLOAD_HAS_NO_DATA_MSG = "Failed to handle event payload, cause event payload context does not contain MARC_BIBLIOGRAPHIC or INSTANCE data";
@@ -183,7 +184,7 @@ private void processInstanceUpdate(DataImportEventPayload dataImportEventPayload
recordId, chunkId))))
.compose(e -> {
JsonObject instanceAsJson = prepareTargetInstance(dataImportEventPayload, instanceToUpdate);
- List errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, requiredFields);
+ List errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS);
if (!errors.isEmpty()) {
String msg = format("Mapped Instance is invalid: %s, by jobExecutionId: '%s' and recordId: '%s' and chunkId: '%s' ", errors,
@@ -253,7 +254,7 @@ private Future copySnapshotToOtherTenant(String snapshotId, DataImport
.withStatus(Snapshot.Status.COMMITTED)
.withProcessingStartedDate(new Date());
- return postSnapshotInSrsAndHandleResponse(dataImportEventPayload, snapshot, tenantId);
+ return postSnapshotInSrsAndHandleResponse(dataImportEventPayload.getOkapiUrl(), dataImportEventPayload.getToken(), snapshot, tenantId);
}
@Override
@@ -356,7 +357,7 @@ private Future prepareRecordForMapping(DataImportEventPayload dataImportEv
}
private Future getRecordByInstanceId(DataImportEventPayload dataImportEventPayload, String instanceId, String tenantId) {
- SourceStorageRecordsClient client = getSourceStorageRecordsClient(dataImportEventPayload, tenantId);
+ SourceStorageRecordsClient client = getSourceStorageRecordsClient(dataImportEventPayload.getOkapiUrl(), dataImportEventPayload.getToken(), tenantId);
return client.getSourceStorageRecordsFormattedById(instanceId, INSTANCE_ID_TYPE).compose(resp -> {
if (resp.statusCode() != 200) {
LOGGER.warn(format("Failed to retrieve MARC record by instance id: '%s', status code: %s",
diff --git a/src/main/java/org/folio/inventory/dataimport/util/AdditionalFieldsUtil.java b/src/main/java/org/folio/inventory/dataimport/util/AdditionalFieldsUtil.java
index 568657bdc..4e39017dc 100644
--- a/src/main/java/org/folio/inventory/dataimport/util/AdditionalFieldsUtil.java
+++ b/src/main/java/org/folio/inventory/dataimport/util/AdditionalFieldsUtil.java
@@ -74,6 +74,7 @@ public final class AdditionalFieldsUtil {
private static final String ANY_STRING = "*";
private static final char INDICATOR = 'f';
public static final char SUBFIELD_I = 'i';
+ public static final char SUBFIELD_L = 'l';
private static final String HR_ID_FIELD = "hrid";
private static final CacheLoader parsedRecordContentCacheLoader;
private static final LoadingCache parsedRecordContentCache;
diff --git a/src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java b/src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java
new file mode 100644
index 000000000..59b751bc1
--- /dev/null
+++ b/src/main/java/org/folio/inventory/dataimport/util/MappingConstants.java
@@ -0,0 +1,19 @@
+package org.folio.inventory.dataimport.util;
+
+import static org.folio.inventory.domain.instances.Instance.INSTANCE_TYPE_ID_KEY;
+import static org.folio.inventory.domain.instances.Instance.SOURCE_KEY;
+import static org.folio.inventory.domain.instances.Instance.TITLE_KEY;
+
+import java.util.Arrays;
+import java.util.List;
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public final class MappingConstants {
+
+ public static final String MARC_BIB_RECORD_TYPE = "marc-bib";
+ public static final String MARC_BIB_RECORD_FORMAT = "MARC_BIB";
+ public static final String INSTANCE_PATH = "instance";
+ public static final List INSTANCE_REQUIRED_FIELDS = Arrays.asList(SOURCE_KEY, TITLE_KEY, INSTANCE_TYPE_ID_KEY);
+
+}
diff --git a/src/main/java/org/folio/inventory/domain/instances/Instance.java b/src/main/java/org/folio/inventory/domain/instances/Instance.java
index e04026364..bd92b3508 100644
--- a/src/main/java/org/folio/inventory/domain/instances/Instance.java
+++ b/src/main/java/org/folio/inventory/domain/instances/Instance.java
@@ -30,6 +30,7 @@
public class Instance {
// JSON property names
+ public static final String ID = "id";
public static final String VERSION_KEY = "_version";
public static final String HRID_KEY = "hrid";
public static final String MATCH_KEY_KEY = "matchKey";
@@ -147,7 +148,7 @@ public Instance(
public static Instance fromJson(JsonObject instanceJson) {
return new Instance(
- instanceJson.getString("id"),
+ instanceJson.getString(ID),
instanceJson.getString(VERSION_KEY),
instanceJson.getString("hrid"),
instanceJson.getString(SOURCE_KEY),
@@ -198,7 +199,7 @@ public static Instance fromJson(JsonObject instanceJson) {
public JsonObject getJsonForStorage() {
JsonObject json = new JsonObject();
//TODO: Review if this shouldn't be defaulting here
- json.put("id", getId() != null
+ json.put(ID, getId() != null
? getId()
: UUID.randomUUID().toString());
putIfNotNull(json, VERSION_KEY, version);
@@ -249,7 +250,7 @@ public JsonObject getJsonForStorage() {
public JsonObject getJsonForResponse(WebContext context) {
JsonObject json = new JsonObject();
- json.put("id", getId());
+ json.put(ID, getId());
putIfNotNull(json, VERSION_KEY, version);
json.put("hrid", getHrid());
json.put(SOURCE_KEY, getSource());
diff --git a/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java b/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java
new file mode 100644
index 000000000..b4a8e1dcc
--- /dev/null
+++ b/src/main/java/org/folio/inventory/instanceingress/InstanceIngressEventConsumer.java
@@ -0,0 +1,106 @@
+package org.folio.inventory.instanceingress;
+
+import static org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil.constructContext;
+import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.CREATE_INSTANCE;
+import static org.folio.rest.jaxrs.model.InstanceIngressEvent.EventType.UPDATE_INSTANCE;
+import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TENANT_HEADER;
+import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
+import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.json.Json;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.folio.inventory.common.Context;
+import org.folio.inventory.common.dao.EntityIdStorageDaoImpl;
+import org.folio.inventory.common.dao.PostgresClientFactory;
+import org.folio.inventory.dataimport.cache.MappingMetadataCache;
+import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper;
+import org.folio.inventory.instanceingress.handler.CreateInstanceIngressEventHandler;
+import org.folio.inventory.instanceingress.handler.InstanceIngressEventHandler;
+import org.folio.inventory.instanceingress.handler.InstanceIngressUpdateEventHandler;
+import org.folio.inventory.services.InstanceIdStorageService;
+import org.folio.inventory.storage.Storage;
+import org.folio.kafka.AsyncRecordHandler;
+import org.folio.kafka.KafkaHeaderUtils;
+import org.folio.processing.exceptions.EventProcessingException;
+import org.folio.rest.jaxrs.model.InstanceIngressEvent;
+
+public class InstanceIngressEventConsumer implements AsyncRecordHandler {
+
+ private static final Logger LOGGER = LogManager.getLogger(InstanceIngressEventConsumer.class);
+ private final Vertx vertx;
+ private final Storage storage;
+ private final HttpClient client;
+ private final MappingMetadataCache mappingMetadataCache;
+
+ public InstanceIngressEventConsumer(Vertx vertx,
+ Storage storage,
+ HttpClient client,
+ MappingMetadataCache mappingMetadataCache) {
+ this.vertx = vertx;
+ this.storage = storage;
+ this.client = client;
+ this.mappingMetadataCache = mappingMetadataCache;
+ }
+
+ @Override
+ public Future handle(KafkaConsumerRecord consumerRecord) {
+ var kafkaHeaders = KafkaHeaderUtils.kafkaHeadersToMap(consumerRecord.headers());
+ var event = Json.decodeValue(consumerRecord.value(), InstanceIngressEvent.class);
+ var context = constructContext(getTenantId(event, kafkaHeaders),
+ kafkaHeaders.get(OKAPI_TOKEN_HEADER), kafkaHeaders.get(OKAPI_URL_HEADER));
+ LOGGER.info("Instance ingress event has been received with event type: {}", event.getEventType());
+ return Future.succeededFuture(event.getEventPayload())
+ .compose(eventPayload -> processEvent(event, context)
+ .map(ar -> consumerRecord.key()), th -> {
+ LOGGER.error("Update record state was failed while handle event, {}", th.getMessage());
+ return Future.failedFuture(th.getMessage());
+ });
+ }
+
+ private static String getTenantId(InstanceIngressEvent event,
+ Map kafkaHeaders) {
+ return Optional.ofNullable(event.getTenant())
+ .orElseGet(() -> kafkaHeaders.get(OKAPI_TENANT_HEADER));
+ }
+
+ private Future processEvent(InstanceIngressEvent event, Context context) {
+ try {
+ Promise promise = Promise.promise();
+ getInstanceIngressEventHandler(event.getEventType(), context).handle(event)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ promise.fail(ex);
+ } else {
+ promise.complete(event.getEventType());
+ }
+ });
+ return promise.future();
+ } catch (Exception e) {
+ LOGGER.warn("Error during processPayload: ", e);
+ return Future.failedFuture(e);
+ }
+ }
+
+ private InstanceIngressEventHandler getInstanceIngressEventHandler(InstanceIngressEvent.EventType eventType, Context context) {
+ if (eventType == CREATE_INSTANCE) {
+ var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(client));
+ var idStorageService = new InstanceIdStorageService(new EntityIdStorageDaoImpl(new PostgresClientFactory(vertx)));
+ return new CreateInstanceIngressEventHandler(precedingSucceedingTitlesHelper, mappingMetadataCache, idStorageService, client, context, storage);
+ } else if (eventType == UPDATE_INSTANCE) {
+ return new InstanceIngressUpdateEventHandler();
+ } else {
+ LOGGER.warn("Can't process eventType {}", eventType);
+ throw new EventProcessingException("Can't process eventType " + eventType);
+ }
+ }
+
+}
diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java
new file mode 100644
index 000000000..cf2e107b2
--- /dev/null
+++ b/src/main/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandler.java
@@ -0,0 +1,251 @@
+package org.folio.inventory.instanceingress.handler;
+
+import static java.lang.String.format;
+import static java.util.Objects.isNull;
+import static java.util.Optional.ofNullable;
+import static org.apache.logging.log4j.LogManager.getLogger;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_L;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_035;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_035_SUB;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.reorderMarcRecordFields;
+import static org.folio.inventory.dataimport.util.MappingConstants.INSTANCE_REQUIRED_FIELDS;
+import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_FORMAT;
+import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
+import static org.folio.rest.jaxrs.model.EntityType.MARC_BIBLIOGRAPHIC;
+import static org.folio.rest.jaxrs.model.Record.RecordType.MARC_BIB;
+import static org.folio.rest.jaxrs.model.Snapshot.Status.PROCESSING_FINISHED;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.logging.log4j.Logger;
+import org.folio.HttpStatus;
+import org.folio.MappingMetadataDto;
+import org.folio.inventory.common.Context;
+import org.folio.inventory.dataimport.cache.MappingMetadataCache;
+import org.folio.inventory.dataimport.handlers.actions.CreateInstanceEventHandler;
+import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper;
+import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
+import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
+import org.folio.inventory.dataimport.util.ValidationUtil;
+import org.folio.inventory.domain.instances.Instance;
+import org.folio.inventory.domain.instances.InstanceCollection;
+import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
+import org.folio.inventory.services.IdStorageService;
+import org.folio.inventory.storage.Storage;
+import org.folio.kafka.exception.DuplicateEventException;
+import org.folio.processing.exceptions.EventProcessingException;
+import org.folio.processing.mapping.defaultmapper.RecordMapper;
+import org.folio.processing.mapping.defaultmapper.RecordMapperBuilder;
+import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
+import org.folio.rest.jaxrs.model.InstanceIngressEvent;
+import org.folio.rest.jaxrs.model.InstanceIngressPayload;
+import org.folio.rest.jaxrs.model.ParsedRecord;
+import org.folio.rest.jaxrs.model.RawRecord;
+import org.folio.rest.jaxrs.model.Record;
+import org.folio.rest.jaxrs.model.Snapshot;
+
+public class CreateInstanceIngressEventHandler extends CreateInstanceEventHandler implements InstanceIngressEventHandler {
+
+ private static final String LINKED_DATA_ID = "linkedDataId";
+ private static final Logger LOGGER = getLogger(CreateInstanceIngressEventHandler.class);
+ private static final String LD = "(ld) ";
+ private static final String FAILURE = "Failed to process InstanceIngressEvent with id {}";
+ private final Context context;
+ private final InstanceCollection instanceCollection;
+
+ public CreateInstanceIngressEventHandler(PrecedingSucceedingTitlesHelper precedingSucceedingTitlesHelper,
+ MappingMetadataCache mappingMetadataCache,
+ IdStorageService idStorageService,
+ HttpClient httpClient,
+ Context context,
+ Storage storage) {
+ super(storage, precedingSucceedingTitlesHelper, mappingMetadataCache, idStorageService, null, httpClient);
+ this.context = context;
+ this.instanceCollection = storage.getInstanceCollection(context);
+ }
+
+ @Override
+ public CompletableFuture handle(InstanceIngressEvent event) {
+ try {
+ LOGGER.info("Processing InstanceIngressEvent with id '{}' for instance creation", event.getId());
+ var future = new CompletableFuture();
+ if (eventContainsNoData(event)) {
+ var message = format("InstanceIngressEvent message does not contain required data to create Instance for eventId: '%s'", event.getId());
+ LOGGER.error(message);
+ return CompletableFuture.failedFuture(new EventProcessingException(message));
+ }
+
+ var targetRecord = constructMarcBibRecord(event.getEventPayload());
+ var instanceId = ofNullable(event.getEventPayload().getSourceRecordIdentifier()).orElseGet(() -> getInstanceId(targetRecord));
+ idStorageService.store(targetRecord.getId(), instanceId, context.getTenantId())
+ .compose(res -> super.getMappingMetadataCache().getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE))
+ .compose(metadataOptional -> metadataOptional.map(metadata -> prepareAndExecuteMapping(metadata, targetRecord, event, instanceId))
+ .orElseGet(() -> Future.failedFuture("MappingMetadata was not found for marc-bib record type")))
+ .compose(instance -> validateInstance(instance, event))
+ .compose(instance -> saveInstance(instance, event))
+ .onFailure(e -> {
+ if (!(e instanceof DuplicateEventException)) {
+ LOGGER.error(FAILURE, event.getId(), e);
+ }
+ future.completeExceptionally(e);
+ })
+ .onComplete(ar -> future.complete(ar.result()));
+ return future;
+ } catch (Exception e) {
+ LOGGER.error(FAILURE, event.getId(), e);
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ private boolean eventContainsNoData(InstanceIngressEvent event) {
+ return isNull(event.getEventPayload())
+ || isNull(event.getEventPayload().getSourceRecordObject())
+ || isNull(event.getEventPayload().getSourceType());
+ }
+
+ private Future prepareAndExecuteMapping(MappingMetadataDto mappingMetadata,
+ Record targetRecord,
+ InstanceIngressEvent event,
+ String instanceId) {
+ try {
+ LOGGER.info("Manipulating fields of a Record from InstanceIngressEvent with id '{}'", event.getId());
+ var mappingParameters = Json.decodeValue(mappingMetadata.getMappingParams(), MappingParameters.class);
+ AdditionalFieldsUtil.updateLatestTransactionDate(targetRecord, mappingParameters);
+ AdditionalFieldsUtil.move001To035(targetRecord);
+ AdditionalFieldsUtil.normalize035(targetRecord);
+ if (event.getEventPayload().getAdditionalProperties().containsKey(LINKED_DATA_ID)) {
+ AdditionalFieldsUtil.addFieldToMarcRecord(targetRecord, TAG_035, TAG_035_SUB,
+ LD + event.getEventPayload().getAdditionalProperties().get(LINKED_DATA_ID));
+ }
+
+ LOGGER.info("Mapping a Record from InstanceIngressEvent with id '{}' into an Instance", event.getId());
+ var parsedRecord = new JsonObject((String) targetRecord.getParsedRecord().getContent());
+ RecordMapper recordMapper = RecordMapperBuilder.buildMapper(MARC_BIB_RECORD_FORMAT);
+ var instance = recordMapper.mapRecord(parsedRecord, mappingParameters, new JsonObject(mappingMetadata.getMappingRules()));
+ instance.setId(instanceId);
+ instance.setSource(event.getEventPayload().getSourceType().value());
+ LOGGER.info("Mapped Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
+ return Future.succeededFuture(instance);
+ } catch (Exception e) {
+ LOGGER.warn("Error during preparing and executing mapping:", e);
+ return Future.failedFuture(e);
+ }
+ }
+
+ private Record constructMarcBibRecord(InstanceIngressPayload eventPayload) {
+ var recordId = UUID.randomUUID().toString();
+ var marcBibRecord = new org.folio.rest.jaxrs.model.Record()
+ .withId(recordId)
+ .withRecordType(MARC_BIB)
+ .withSnapshotId(recordId)
+ .withRawRecord(new RawRecord()
+ .withId(recordId)
+ .withContent(eventPayload.getSourceRecordObject())
+ )
+ .withParsedRecord(new ParsedRecord()
+ .withId(recordId)
+ .withContent(eventPayload.getSourceRecordObject())
+ );
+ eventPayload
+ .withAdditionalProperty(MARC_BIBLIOGRAPHIC.value(), marcBibRecord);
+ return marcBibRecord;
+ }
+
+ private Future validateInstance(org.folio.Instance instance, InstanceIngressEvent event) {
+ try {
+ LOGGER.info("Validating Instance from InstanceIngressEvent with id '{}':", event.getId());
+ var instanceAsJson = JsonObject.mapFrom(instance);
+ var errors = EventHandlingUtil.validateJsonByRequiredFields(instanceAsJson, INSTANCE_REQUIRED_FIELDS);
+ return failIfErrors(errors, event.getId())
+ .orElseGet(() -> {
+ var mappedInstance = Instance.fromJson(instanceAsJson);
+ var uuidErrors = ValidationUtil.validateUUIDs(mappedInstance);
+ return failIfErrors(uuidErrors, event.getId()).orElseGet(() -> Future.succeededFuture(mappedInstance));
+ });
+ } catch (Exception e) {
+ return Future.failedFuture(e);
+ }
+ }
+
+ private Optional> failIfErrors(List errors, String eventId) {
+ if (errors.isEmpty()) {
+ return Optional.empty();
+ }
+ var msg = format("Mapped Instance is invalid: %s, from InstanceIngressEvent with id '%s'", errors, eventId);
+ LOGGER.warn(msg);
+ return Optional.of(Future.failedFuture(msg));
+ }
+
+ private Future saveInstance(Instance instance, InstanceIngressEvent event) {
+ LOGGER.info("Saving Instance from InstanceIngressEvent with id '{}': {}", event.getId(), instance);
+ var targetRecord = (Record) event.getEventPayload().getAdditionalProperties().get(MARC_BIBLIOGRAPHIC.value());
+ var sourceContent = targetRecord.getParsedRecord().getContent().toString();
+ return super.addInstance(instance, instanceCollection)
+ .compose(createdInstance -> getPrecedingSucceedingTitlesHelper().createPrecedingSucceedingTitles(instance, context).map(createdInstance))
+ .compose(createdInstance -> executeFieldsManipulation(createdInstance, targetRecord, event.getEventPayload().getAdditionalProperties()))
+ .compose(createdInstance -> {
+ var targetContent = targetRecord.getParsedRecord().getContent().toString();
+ var content = reorderMarcRecordFields(sourceContent, targetContent);
+ targetRecord.setParsedRecord(targetRecord.getParsedRecord().withContent(content));
+ return saveRecordInSrsAndHandleResponse(event, targetRecord, createdInstance);
+ });
+ }
+
+ private Future executeFieldsManipulation(Instance instance, Record srcRecord,
+ Map eventProperties) {
+ if (eventProperties.containsKey(LINKED_DATA_ID)) {
+ AdditionalFieldsUtil.addFieldToMarcRecord(srcRecord, TAG_999, SUBFIELD_L, String.valueOf(eventProperties.get(LINKED_DATA_ID)));
+ }
+ return super.executeFieldsManipulation(instance, srcRecord);
+ }
+
+ private Future saveRecordInSrsAndHandleResponse(InstanceIngressEvent event, Record srcRecord, Instance instance) {
+ LOGGER.info("Saving record in SRS and handling a response for an Instance with id '{}':", instance.getId());
+ Promise promise = Promise.promise();
+ postSnapshotInSrsAndHandleResponse(srcRecord.getSnapshotId())
+ .onFailure(promise::fail)
+ .compose(snapshot -> {
+ getSourceStorageRecordsClient(context.getOkapiLocation(), context.getToken(), context.getTenantId())
+ .postSourceStorageRecords(srcRecord)
+ .onComplete(ar -> {
+ var result = ar.result();
+ if (ar.succeeded() &&
+ result.statusCode() == HttpStatus.HTTP_CREATED.toInt()) {
+ LOGGER.info("Created MARC record in SRS with id: '{}', instanceId: '{}', from tenant: {}",
+ srcRecord.getId(), instance.getId(), context.getTenantId());
+ promise.complete(instance);
+ } else {
+ String msg = format(
+ "Failed to create MARC record in SRS, instanceId: '%s', status code: %s, Record: %s",
+ instance.getId(), result != null ? result.statusCode() : "", result != null ? result.bodyAsString() : "");
+ LOGGER.warn(msg);
+ super.deleteInstance(instance.getId(), event.getId(),
+ instanceCollection);
+ promise.fail(msg);
+ }
+ });
+ return promise.future();
+ });
+ return promise.future();
+ }
+
+ private Future postSnapshotInSrsAndHandleResponse(String id) {
+ var snapshot = new Snapshot()
+ .withJobExecutionId(id)
+ .withProcessingStartedDate(new Date())
+ .withStatus(PROCESSING_FINISHED);
+ return super.postSnapshotInSrsAndHandleResponse(context.getOkapiLocation(),
+ context.getToken(), snapshot, context.getTenantId());
+ }
+
+}
diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java
new file mode 100644
index 000000000..5d15b87da
--- /dev/null
+++ b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressEventHandler.java
@@ -0,0 +1,11 @@
+package org.folio.inventory.instanceingress.handler;
+
+import java.util.concurrent.CompletableFuture;
+import org.folio.inventory.domain.instances.Instance;
+import org.folio.rest.jaxrs.model.InstanceIngressEvent;
+
+public interface InstanceIngressEventHandler {
+
+ CompletableFuture handle(InstanceIngressEvent instanceIngressEvent);
+
+}
diff --git a/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java
new file mode 100644
index 000000000..f871ab228
--- /dev/null
+++ b/src/main/java/org/folio/inventory/instanceingress/handler/InstanceIngressUpdateEventHandler.java
@@ -0,0 +1,14 @@
+package org.folio.inventory.instanceingress.handler;
+
+import java.util.concurrent.CompletableFuture;
+import org.folio.inventory.domain.instances.Instance;
+import org.folio.rest.jaxrs.model.InstanceIngressEvent;
+
+public class InstanceIngressUpdateEventHandler implements InstanceIngressEventHandler {
+
+ @Override
+ public CompletableFuture handle(InstanceIngressEvent instanceIngressEvent) {
+ // to be implemented in MODINV-1008
+ return CompletableFuture.failedFuture(new UnsupportedOperationException());
+ }
+}
diff --git a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java
new file mode 100644
index 000000000..2e0211aeb
--- /dev/null
+++ b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java
@@ -0,0 +1,162 @@
+package org.folio.inventory.support;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static java.lang.System.getProperty;
+import static java.util.Objects.isNull;
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
+import static org.folio.kafka.KafkaTopicNameHelper.getDefaultNameSpace;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Promise;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.json.JsonObject;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.logging.log4j.Logger;
+import org.folio.inventory.dataimport.cache.MappingMetadataCache;
+import org.folio.inventory.dataimport.cache.ProfileSnapshotCache;
+import org.folio.inventory.storage.Storage;
+import org.folio.kafka.GlobalLoadSensor;
+import org.folio.kafka.KafkaConfig;
+import org.folio.kafka.KafkaConsumerWrapper;
+import org.folio.kafka.KafkaTopicNameHelper;
+import org.folio.kafka.SubscriptionDefinition;
+import org.folio.okapi.common.GenericCompositeFuture;
+
+public abstract class KafkaConsumerVerticle extends AbstractVerticle {
+ private static final String LOAD_LIMIT_TEMPLATE = "inventory.kafka.%s.loadLimit";
+ private static final String LOAD_LIMIT_DEFAULT = "5";
+ private static final String MAX_DISTRIBUTION_NUMBER_TEMPLATE = "inventory.kafka.%s.maxDistributionNumber";
+ private static final String MAX_DISTRIBUTION_NUMBER_DEFAULT = "100";
+ private static final String CACHE_EXPIRATION_DEFAULT = "3600";
+ private static final String PROFILE_SNAPSHOT_CACHE_EXPIRATION_TIME = "inventory.profile-snapshot-cache.expiration.time.seconds";
+ private final List> consumerWrappers = new ArrayList<>();
+ private ProfileSnapshotCache profileSnapshotCache;
+ private KafkaConfig kafkaConfig;
+ private JsonObject config;
+ private HttpClient httpClient;
+ private Storage storage;
+
+ @Override
+ public void stop(Promise stopPromise) {
+ var stopFutures = consumerWrappers.stream()
+ .map(KafkaConsumerWrapper::stop)
+ .toList();
+
+ GenericCompositeFuture.join(stopFutures)
+ .onComplete(ar -> stopPromise.complete());
+ }
+
+ protected abstract Logger getLogger();
+
+ protected KafkaConsumerWrapper createConsumer(String eventType, String loadLimitPropertyKey) {
+ return createConsumer(eventType, loadLimitPropertyKey, true);
+ }
+
+ protected KafkaConsumerWrapper createConsumer(String eventType, String loadLimitPropertyKey, boolean namespacedTopic) {
+ var kafkaConsumerWrapper = KafkaConsumerWrapper.builder()
+ .context(context)
+ .vertx(vertx)
+ .kafkaConfig(getKafkaConfig())
+ .loadLimit(getLoadLimit(loadLimitPropertyKey))
+ .globalLoadSensor(new GlobalLoadSensor())
+ .subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType, namespacedTopic))
+ .build();
+ consumerWrappers.add(kafkaConsumerWrapper);
+ return kafkaConsumerWrapper;
+ }
+
+ protected KafkaConfig getKafkaConfig() {
+ if (isNull(kafkaConfig)) {
+ kafkaConfig = KafkaConfig.builder()
+ .envId(getConfig().getString(KAFKA_ENV))
+ .kafkaHost(getConfig().getString(KAFKA_HOST))
+ .kafkaPort(getConfig().getString(KAFKA_PORT))
+ .okapiUrl(getConfig().getString(OKAPI_URL))
+ .replicationFactor(parseInt(getConfig().getString(KAFKA_REPLICATION_FACTOR)))
+ .maxRequestSize(parseInt(getConfig().getString(KAFKA_MAX_REQUEST_SIZE)))
+ .build();
+ getLogger().info("kafkaConfig: {}", kafkaConfig);
+ }
+ return kafkaConfig;
+ }
+
+ protected HttpClient getHttpClient() {
+ if (isNull(httpClient)) {
+ httpClient = vertx.createHttpClient();
+ }
+ return httpClient;
+ }
+
+ protected Storage getStorage() {
+ if (isNull(storage)) {
+ storage = Storage.basedUpon(getConfig(), vertx.createHttpClient());
+ }
+ return storage;
+ }
+
+ protected MappingMetadataCache getMappingMetadataCache() {
+ return MappingMetadataCache.getInstance(vertx, getHttpClient());
+ }
+
+ protected ProfileSnapshotCache getProfileSnapshotCache() {
+ if (isNull(profileSnapshotCache)) {
+ var profileSnapshotExpirationTime = getCacheEnvVariable(PROFILE_SNAPSHOT_CACHE_EXPIRATION_TIME);
+ profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime));
+ }
+ return profileSnapshotCache;
+ }
+
+ protected String getCacheEnvVariable(String variableName) {
+ var cacheExpirationTime = getConfig().getString(variableName);
+ if (isBlank(cacheExpirationTime)) {
+ cacheExpirationTime = CACHE_EXPIRATION_DEFAULT;
+ }
+ return cacheExpirationTime;
+ }
+
+ protected int getMaxDistributionNumber(String property) {
+ return getConsumerProperty(MAX_DISTRIBUTION_NUMBER_TEMPLATE, property, MAX_DISTRIBUTION_NUMBER_DEFAULT);
+ }
+
+ private JsonObject getConfig() {
+ if (isNull(config)) {
+ config = vertx.getOrCreateContext().config();
+ }
+ return config;
+ }
+
+ private SubscriptionDefinition getSubscriptionDefinition(String envId, String eventType, boolean namespacedTopic) {
+ return namespacedTopic
+ ? KafkaTopicNameHelper.createSubscriptionDefinition(envId, getDefaultNameSpace(), eventType)
+ : createSubscriptionDefinition(envId, eventType);
+ }
+
+ private SubscriptionDefinition createSubscriptionDefinition(String env, String eventType) {
+ return SubscriptionDefinition.builder()
+ .eventType(eventType)
+ .subscriptionPattern(formatSubscriptionPattern(env, eventType))
+ .build();
+ }
+
+ private String formatSubscriptionPattern(String env, String eventType) {
+ return join("\\.", env, "\\w{1,}", eventType);
+ }
+
+ private int getLoadLimit(String propertyKey) {
+ return getConsumerProperty(LOAD_LIMIT_TEMPLATE, propertyKey, LOAD_LIMIT_DEFAULT);
+ }
+
+ private int getConsumerProperty(String nameTemplate, String propertyKey, String defaultValue) {
+ return parseInt(getProperty(format(nameTemplate, propertyKey), defaultValue));
+ }
+
+}
diff --git a/src/test/java/org/folio/inventory/TestUtil.java b/src/test/java/org/folio/inventory/TestUtil.java
index 50e576f4e..eed95c3cf 100644
--- a/src/test/java/org/folio/inventory/TestUtil.java
+++ b/src/test/java/org/folio/inventory/TestUtil.java
@@ -1,8 +1,11 @@
package org.folio.inventory;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.impl.HttpResponseImpl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import org.folio.HttpStatus;
/**
* Util class contains helper methods for unit testing needs
@@ -12,4 +15,28 @@ public final class TestUtil {
public static String readFileFromPath(String path) throws IOException {
return Files.readString(Path.of(path));
}
+
+ public static HttpResponseImpl buildHttpResponseWithBuffer(int httpStatus) {
+ return buildHttpResponseWithBuffer(null, httpStatus, "");
+ }
+
+ public static HttpResponseImpl buildHttpResponseWithBuffer(HttpStatus httpStatus) {
+ return buildHttpResponseWithBuffer(null, httpStatus.toInt(), "");
+ }
+
+ public static HttpResponseImpl buildHttpResponseWithBuffer(HttpStatus httpStatus, String statusMessage) {
+ return buildHttpResponseWithBuffer(null, httpStatus.toInt(), statusMessage);
+ }
+
+ public static HttpResponseImpl buildHttpResponseWithBuffer(Buffer buffer, int httpStatus) {
+ return buildHttpResponseWithBuffer(buffer, httpStatus, "");
+ }
+
+ public static HttpResponseImpl buildHttpResponseWithBuffer(Buffer buffer, HttpStatus httpStatus) {
+ return buildHttpResponseWithBuffer(buffer, httpStatus.toInt(), "");
+ }
+
+ public static HttpResponseImpl buildHttpResponseWithBuffer(Buffer buffer, int httpStatus, String statusMessage) {
+ return new HttpResponseImpl<>(null, httpStatus, statusMessage, null, null, null, buffer, null);
+ }
}
diff --git a/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java b/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java
index a6ecf5024..0db8c75f1 100644
--- a/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java
+++ b/src/test/java/org/folio/inventory/consortium/handlers/MarcInstanceSharingHandlerImplTest.java
@@ -52,8 +52,8 @@
import static org.folio.HttpStatus.HTTP_INTERNAL_SERVER_ERROR;
import static org.folio.HttpStatus.HTTP_NO_CONTENT;
+import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer;
import static org.folio.inventory.consortium.handlers.MarcInstanceSharingHandlerImpl.SRS_RECORD_ID_TYPE;
-import static org.folio.inventory.consortium.util.RestDataImportHelperTest.buildHttpResponseWithBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
@@ -138,7 +138,7 @@ public void setUp() throws IOException {
}
private final HttpResponse sourceStorageRecordsResponseBuffer =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, BufferImpl.buffer(recordJson));
+ buildHttpResponseWithBuffer(BufferImpl.buffer(recordJson), HttpStatus.HTTP_OK);
@Test
public void publishInstanceTest(TestContext testContext) {
@@ -192,8 +192,8 @@ public void publishInstanceAndRelinkAuthoritiesTest(TestContext testContext) thr
String instanceId = "eb89b292-d2b7-4c36-9bfc-f816d6f96418";
String targetInstanceHrid = "consin0000000000101";
- Record record = buildHttpResponseWithBuffer(HttpStatus.HTTP_OK,
- BufferImpl.buffer(recordJsonWithLinkedAuthorities)).bodyAsJson(Record.class);
+ Record record = buildHttpResponseWithBuffer(BufferImpl.buffer(recordJsonWithLinkedAuthorities), HttpStatus.HTTP_OK)
+ .bodyAsJson(Record.class);
Authority authority1 = new Authority().withId(AUTHORITY_ID_1).withSource(Authority.Source.MARC);
Authority authority2 = new Authority().withId(AUTHORITY_ID_2).withSource(Authority.Source.CONSORTIUM_MARC);
@@ -267,7 +267,7 @@ public void shouldReturnFailedFutureIfErrorDuringRetrievingAuthoritiesDuringUnli
String instanceId = "eb89b292-d2b7-4c36-9bfc-f816d6f96418";
- Record record = buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, BufferImpl.buffer(recordJsonWithLinkedAuthorities)).bodyAsJson(Record.class);
+ Record record = buildHttpResponseWithBuffer(BufferImpl.buffer(recordJsonWithLinkedAuthorities), HttpStatus.HTTP_OK).bodyAsJson(Record.class);
//given
marcHandler = spy(new MarcInstanceSharingHandlerImpl(instanceOperationsHelper, storage, vertx, httpClient));
@@ -311,7 +311,7 @@ public void shouldNotPutLinkInstanceAuthoritiesIfInstanceNotLinkedToSharedAuthor
String instanceId = "eb89b292-d2b7-4c36-9bfc-f816d6f96418";
String targetInstanceHrid = "consin0000000000101";
- Record record = buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, BufferImpl.buffer(recordJsonWithLinkedAuthorities)).bodyAsJson(Record.class);
+ Record record = buildHttpResponseWithBuffer(BufferImpl.buffer(recordJsonWithLinkedAuthorities), HttpStatus.HTTP_OK).bodyAsJson(Record.class);
Authority authority1 = new Authority().withId(AUTHORITY_ID_1).withSource(Authority.Source.MARC);
Authority authority2 = new Authority().withId(AUTHORITY_ID_2).withSource(Authority.Source.MARC);
@@ -380,7 +380,7 @@ public void publishInstanceAndNotModifyMarcRecordIfLocalAuthoritiesNotLinkedToMa
String instanceId = "eb89b292-d2b7-4c36-9bfc-f816d6f96418";
String targetInstanceHrid = "consin0000000000101";
- Record record = buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, BufferImpl.buffer(recordJsonWithLinkedAuthorities)).bodyAsJson(Record.class);
+ Record record = buildHttpResponseWithBuffer(BufferImpl.buffer(recordJsonWithLinkedAuthorities), HttpStatus.HTTP_OK).bodyAsJson(Record.class);
Authority authority1 = new Authority().withId(AUTHORITY_ID_1).withSource(Authority.Source.CONSORTIUM_MARC);
Authority authority2 = new Authority().withId(AUTHORITY_ID_2).withSource(Authority.Source.CONSORTIUM_MARC);
@@ -523,7 +523,7 @@ public void deleteSourceRecordByInstanceIdFailedTest() {
MarcInstanceSharingHandlerImpl handler = new MarcInstanceSharingHandlerImpl(instanceOperationsHelper, null, vertx, httpClient);
handler.deleteSourceRecordByRecordId(recordId, instanceId, tenant, sourceStorageClient)
.onComplete(result -> assertTrue(result.failed()));
-
+
verify(sourceStorageClient, times(1)).deleteSourceStorageRecordsById(recordId, SRS_RECORD_ID_TYPE);
}
diff --git a/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java b/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java
index 680cfc533..f5d315480 100644
--- a/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java
+++ b/src/test/java/org/folio/inventory/consortium/util/RestDataImportHelperTest.java
@@ -20,11 +20,11 @@
import org.junit.Test;
import org.junit.runner.RunWith;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer;
import static org.folio.inventory.consortium.util.RestDataImportHelper.FIELD_JOB_EXECUTIONS;
import static org.folio.inventory.consortium.util.RestDataImportHelper.STATUS_COMMITTED;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,7 +63,7 @@ public void initJobExecutionTest() {
.put(FIELD_JOB_EXECUTIONS, new JsonArray().add(new JsonObject().put("id", expectedJobExecutionId)));
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_CREATED, BufferImpl.buffer(responseBody.encode()));
+ buildHttpResponseWithBuffer(BufferImpl.buffer(responseBody.encode()), HttpStatus.HTTP_CREATED);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -88,7 +88,7 @@ public void initJobExecutionFailedInternalServerErrorTest() {
String expectedJobExecutionId = UUID.randomUUID().toString();
Map kafkaHeaders = new HashMap<>();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_INTERNAL_SERVER_ERROR, null);
+ buildHttpResponseWithBuffer(HttpStatus.HTTP_INTERNAL_SERVER_ERROR, "Ok");
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -115,7 +115,7 @@ public void initJobExecutionFailedWithoutJobExecutionsArrayTest() {
JsonObject responseBody = new JsonObject().put("jobExecutions", new JsonArray().add(""));
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_CREATED, BufferImpl.buffer(responseBody.encode()));
+ buildHttpResponseWithBuffer(BufferImpl.buffer(responseBody.encode()), HttpStatus.HTTP_CREATED);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -140,7 +140,7 @@ public void initJobExecutionFailedWithJobExecutionsEmptyArrayTest() {
String expectedJobExecutionId = UUID.randomUUID().toString();
Map kafkaHeaders = new HashMap<>();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_CREATED, BufferImpl.buffer("{\"jobExecutions\":[]}"));
+ buildHttpResponseWithBuffer(BufferImpl.buffer("{\"jobExecutions\":[]}"), HttpStatus.HTTP_CREATED);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -166,7 +166,7 @@ public void setDefaultJobProfileToJobExecutionTest() {
String expectedJobExecutionId = UUID.randomUUID().toString();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, null);
+ buildHttpResponseWithBuffer(HttpStatus.HTTP_OK);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -191,7 +191,7 @@ public void setDefaultJobProfileToJobExecutionFailedInternalServerErrorTest() {
String expectedJobExecutionId = UUID.randomUUID().toString();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_INTERNAL_SERVER_ERROR, null);
+ buildHttpResponseWithBuffer(HttpStatus.HTTP_INTERNAL_SERVER_ERROR, "Ok");
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -217,7 +217,7 @@ public void postChunkTest() {
String expectedJobExecutionId = UUID.randomUUID().toString();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_NO_CONTENT, null);
+ buildHttpResponseWithBuffer(HttpStatus.HTTP_NO_CONTENT);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
RawRecordsDto rawRecordsDto = new RawRecordsDto()
@@ -252,7 +252,7 @@ public void getJobExecutionStatusByJobExecutionId() {
String expectedJobExecutionId = UUID.randomUUID().toString();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, BufferImpl.buffer("{\"status\":\"" + STATUS_COMMITTED + "\"}"));
+ buildHttpResponseWithBuffer(BufferImpl.buffer("{\"status\":\"" + STATUS_COMMITTED + "\"}"), HttpStatus.HTTP_OK);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -277,7 +277,7 @@ public void getJobExecutionStatusByJobExecutionIdFailedWithEmptyResponseBodyTest
String expectedJobExecutionId = UUID.randomUUID().toString();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_OK, null);
+ buildHttpResponseWithBuffer(HttpStatus.HTTP_OK);
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -303,7 +303,7 @@ public void getJobExecutionStatusByJobExecutionIdFailedInternalServerErrorTest()
String expectedJobExecutionId = UUID.randomUUID().toString();
HttpResponseImpl jobExecutionResponse =
- buildHttpResponseWithBuffer(HttpStatus.HTTP_INTERNAL_SERVER_ERROR, null);
+ buildHttpResponseWithBuffer(HttpStatus.HTTP_INTERNAL_SERVER_ERROR, "Ok");
Future> futureResponse = Future.succeededFuture(jobExecutionResponse);
doAnswer(invocation -> {
@@ -322,15 +322,4 @@ public void getJobExecutionStatusByJobExecutionIdFailedInternalServerErrorTest()
});
}
- public static HttpResponseImpl buildHttpResponseWithBuffer(HttpStatus httpStatus, Buffer buffer) {
- return new HttpResponseImpl(
- null,
- httpStatus.toInt(),
- "Ok",
- null,
- null,
- null,
- buffer,
- new ArrayList());
- }
}
diff --git a/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java b/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java
index dc7175889..d4a897617 100644
--- a/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java
+++ b/src/test/java/org/folio/inventory/dataimport/cache/MappingMetadataCacheTest.java
@@ -5,7 +5,6 @@
import java.util.Optional;
import java.util.UUID;
-import io.vertx.core.json.JsonObject;
import org.folio.inventory.common.Context;
import org.folio.inventory.dataimport.handlers.matching.util.EventHandlingUtil;
import org.folio.MappingMetadataDto;
diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java
new file mode 100644
index 000000000..ce707dd0c
--- /dev/null
+++ b/src/test/java/org/folio/inventory/dataimport/consumers/InstanceIngressConsumerVerticleTest.java
@@ -0,0 +1,62 @@
+package org.folio.inventory.dataimport.consumers;
+
+import io.vertx.core.DeploymentOptions;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
+import org.folio.inventory.InstanceIngressConsumerVerticle;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
+import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
+import static org.folio.inventory.dataimport.util.KafkaConfigConstants.*;
+
+@RunWith(VertxUnitRunner.class)
+public class InstanceIngressConsumerVerticleTest {
+
+ private static final String KAFKA_ENV_NAME = "test-env";
+ private static Vertx vertx = Vertx.vertx();
+
+ public static EmbeddedKafkaCluster cluster;
+
+ @Test
+ public void shouldDeployVerticle(TestContext context) {
+ Async async = context.async();
+ cluster = provisionWith(defaultClusterConfig());
+ cluster.start();
+ String[] hostAndPort = cluster.getBrokerList().split(":");
+ DeploymentOptions options = new DeploymentOptions()
+ .setConfig(new JsonObject()
+ .put(KAFKA_HOST, hostAndPort[0])
+ .put(KAFKA_PORT, hostAndPort[1])
+ .put(KAFKA_REPLICATION_FACTOR, "1")
+ .put(KAFKA_ENV, KAFKA_ENV_NAME)
+ .put(KAFKA_MAX_REQUEST_SIZE, "1048576"))
+ .setWorker(true);
+
+ Promise promise = Promise.promise();
+ vertx.deployVerticle(InstanceIngressConsumerVerticle.class.getName(), options, promise);
+
+ promise.future().onComplete(ar -> {
+ context.assertTrue(ar.succeeded());
+ async.complete();
+ });
+
+ }
+
+ @AfterClass
+ public static void tearDownClass(TestContext context) {
+ Async async = context.async();
+ vertx.close(ar -> {
+ cluster.stop();
+ async.complete();
+ });
+ }
+
+}
diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java
index 88b50d70b..4ecf2d953 100644
--- a/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java
+++ b/src/test/java/org/folio/inventory/dataimport/handlers/actions/CreateInstanceEventHandlerTest.java
@@ -78,11 +78,13 @@
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static java.util.concurrent.CompletableFuture.completedStage;
+import static org.apache.http.HttpStatus.SC_CREATED;
import static org.folio.ActionProfile.FolioRecord.INSTANCE;
import static org.folio.ActionProfile.FolioRecord.MARC_BIBLIOGRAPHIC;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_CREATED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
+import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_005;
import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.dateTime005Formatter;
import static org.folio.inventory.dataimport.util.DataImportConstants.UNIQUE_ID_ERROR_MESSAGE;
@@ -232,7 +234,7 @@ public void setUp() throws IOException {
new PrecedingSucceedingTitlesHelper(context -> mockedClient), MappingMetadataCache.getInstance(vertx,
httpClient), instanceIdStorageService, orderHelperService, httpClient));
- doReturn(sourceStorageClient).when(createInstanceEventHandler).getSourceStorageRecordsClient(any(), any());
+ doReturn(sourceStorageClient).when(createInstanceEventHandler).getSourceStorageRecordsClient(any(), any(), any());
doAnswer(invocationOnMock -> {
Instance instanceRecord = invocationOnMock.getArgument(0);
Consumer> successHandler = invocationOnMock.getArgument(1);
@@ -287,7 +289,7 @@ public void shouldProcessEvent(String content, String acceptInstanceId) throws I
"\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," +
"\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" +
"}}");
- HttpResponse resp = buildHttpResponseWithBuffer(buffer);
+ HttpResponse resp = buildHttpResponseWithBuffer(buffer, SC_CREATED);
when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp));
DataImportEventPayload dataImportEventPayload = new DataImportEventPayload()
@@ -318,7 +320,7 @@ public void shouldProcessEvent(String content, String acceptInstanceId) throws I
assertThat(createdInstance.getJsonArray("notes").getJsonObject(0).getString("instanceNoteTypeId"), notNullValue());
assertThat(createdInstance.getJsonArray("notes").getJsonObject(1).getString("instanceNoteTypeId"), notNullValue());
verify(mockedClient, times(2)).post(any(URL.class), any(JsonObject.class));
- verify(createInstanceEventHandler).getSourceStorageRecordsClient(any(), argThat(tenantId -> tenantId.equals(TENANT_ID)));
+ verify(createInstanceEventHandler).getSourceStorageRecordsClient(any(), any(), argThat(tenantId -> tenantId.equals(TENANT_ID)));
}
@Test
@@ -340,7 +342,7 @@ public void shouldProcessEventAndUpdate005Field() throws InterruptedException, E
MappingManager.registerReaderFactory(fakeReaderFactory);
MappingManager.registerWriterFactory(new InstanceWriterFactory());
- HttpResponse resp = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"));
+ HttpResponse resp = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"), SC_CREATED);
ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(Record.class);
when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp));
@@ -461,7 +463,7 @@ public void shouldProcessConsortiumEvent() throws InterruptedException, Executio
context.put(MARC_BIBLIOGRAPHIC.value(), Json.encode(record));
Buffer buffer = BufferImpl.buffer("{\"id\": \"567859ad-505a-400d-a699-0028a1fdbf84\",\"parsedRecord\": {\"content\": \"{\\\"leader\\\":\\\"00567nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"957985c6-97e3-4038-b0e7-343ecd0b8120\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"},\"deleted\": false,\"order\": 0,\"externalIdsHolder\": {\"instanceId\": \"b5e25bc3-a5a5-474a-8333-4a728d2f3485\",\"instanceHrid\": \"in00000000028\"},\"state\": \"ACTUAL\"}");
- HttpResponse resp = buildHttpResponseWithBuffer(buffer);
+ HttpResponse resp = buildHttpResponseWithBuffer(buffer, SC_CREATED);
when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp));
DataImportEventPayload dataImportEventPayload = new DataImportEventPayload()
@@ -677,7 +679,7 @@ public void shouldNotProcessEventIfNatureContentFieldIsNotUUID() throws Interrup
"\"id\":\"990fad8b-64ec-4de4-978c-9f8bbed4c6d3\"," +
"\"content\":\"{\\\"leader\\\":\\\"00574nam 22001211a 4500\\\",\\\"fields\\\":[{\\\"035\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"(in001)ybp7406411\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"245\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"titleValue\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"336\\\":{\\\"subfields\\\":[{\\\"b\\\":\\\"b6698d38-149f-11ec-82a8-0242ac130003\\\"}],\\\"ind1\\\":\\\"1\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"780\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"Houston oil directory\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"785\\\":{\\\"subfields\\\":[{\\\"t\\\":\\\"SAIS review of international affairs\\\"},{\\\"x\\\":\\\"1945-4724\\\"}],\\\"ind1\\\":\\\"0\\\",\\\"ind2\\\":\\\"0\\\"}},{\\\"500\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Adaptation of Xi xiang ji by Wang Shifu.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"520\\\":{\\\"subfields\\\":[{\\\"a\\\":\\\"Ben shu miao shu le cui ying ying he zhang sheng wei zheng qu hun yin zi you li jin qu zhe jian xin zhi hou, zhong cheng juan shu de ai qing gu shi. jie lu le bao ban hun yin he feng jian li jiao de zui e.\\\"}],\\\"ind1\\\":\\\" \\\",\\\"ind2\\\":\\\" \\\"}},{\\\"999\\\":{\\\"subfields\\\":[{\\\"i\\\":\\\"4d4545df-b5ba-4031-a031-70b1c1b2fc5d\\\"}],\\\"ind1\\\":\\\"f\\\",\\\"ind2\\\":\\\"f\\\"}}]}\"" +
"}}");
- HttpResponse resp = buildHttpResponseWithBuffer(buffer);
+ HttpResponse resp = buildHttpResponseWithBuffer(buffer, SC_CREATED);
when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp));
DataImportEventPayload dataImportEventPayload = new DataImportEventPayload()
@@ -698,7 +700,7 @@ public void shouldNotProcessEventIfNatureContentFieldIsNotUUID() throws Interrup
public void shouldNotProcessEventIfRecordContains999field() throws InterruptedException, ExecutionException, TimeoutException {
var recordId = UUID.randomUUID().toString();
- HttpResponse resp = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"));
+ HttpResponse resp = buildHttpResponseWithBuffer(BufferImpl.buffer("{}"), SC_CREATED);
when(sourceStorageClient.postSourceStorageRecords(any())).thenReturn(Future.succeededFuture(resp));
var context = new HashMap();
@@ -920,12 +922,7 @@ public void shouldNotProcessEventEvenIfInventoryStorageErrorExists() throws Inte
future.get(5, TimeUnit.SECONDS);
}
- private static HttpResponseImpl buildHttpResponseWithBuffer(Buffer buffer) {
- return new HttpResponseImpl<>(null, HttpStatus.SC_CREATED, "",
- null, null, null, buffer, null);
- }
-
private Response createdResponse() {
- return new Response(HttpStatus.SC_CREATED, null, null, null);
+ return new Response(SC_CREATED, null, null, null);
}
}
diff --git a/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java b/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java
index d2d5abdaa..0e55ab79b 100644
--- a/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java
+++ b/src/test/java/org/folio/inventory/dataimport/handlers/actions/ReplaceInstanceEventHandlerTest.java
@@ -15,7 +15,6 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.impl.HttpResponseImpl;
import org.apache.http.HttpStatus;
import org.folio.ActionProfile;
import org.folio.DataImportEventPayload;
@@ -89,6 +88,7 @@
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_MATCHED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_INSTANCE_UPDATED_READY_FOR_POST_PROCESSING;
+import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer;
import static org.folio.inventory.dataimport.handlers.actions.ReplaceInstanceEventHandler.ACTION_HAS_NO_MAPPING_MSG;
import static org.folio.inventory.dataimport.handlers.actions.ReplaceInstanceEventHandler.MARC_BIB_RECORD_CREATED;
import static org.folio.inventory.domain.instances.InstanceSource.CONSORTIUM_MARC;
@@ -273,8 +273,8 @@ public void setUp() throws IOException {
return null;
}).when(instanceRecordCollection).update(any(), any(Consumer.class), any(Consumer.class));
- doReturn(sourceStorageClient).when(replaceInstanceEventHandler).getSourceStorageRecordsClient(any(), any());
- doReturn(sourceStorageSnapshotsClient).when(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), any());
+ doReturn(sourceStorageClient).when(replaceInstanceEventHandler).getSourceStorageRecordsClient(any(), any(), any());
+ doReturn(sourceStorageSnapshotsClient).when(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), any(), any());
doAnswer(invocationOnMock -> completedStage(createResponse(201, null)))
.when(mockedClient).post(any(URL.class), any(JsonObject.class));
@@ -501,9 +501,9 @@ public void shouldProcessEventIfConsortiumInstance() throws InterruptedException
assertThat(createdInstance.getString("_version"), is(INSTANCE_VERSION_AS_STRING));
verify(mockedClient, times(2)).post(any(URL.class), any(JsonObject.class));
verify(sourceStorageClient).getSourceStorageRecordsFormattedById(anyString(),eq(INSTANCE.value()));
- verify(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
+ verify(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
verify(sourceStorageSnapshotsClient).postSourceStorageSnapshots(argThat(snapshot -> snapshot.getJobExecutionId().equals(record.getSnapshotId())));
- verify(replaceInstanceEventHandler).getSourceStorageRecordsClient(any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
+ verify(replaceInstanceEventHandler).getSourceStorageRecordsClient(any(), any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
verify(sourceStorageClient).getSourceStorageRecordsFormattedById(anyString(), eq(INSTANCE.value()));
verify(1, getRequestedFor(new UrlPathPattern(new RegexPattern(MAPPING_METADATA_URL + "/.*"), true)));
}
@@ -640,8 +640,8 @@ public void shouldUpdateSharedFolioInstanceOnCentralTenantIfPayloadContainsCentr
ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(Record.class);
verify(sourceStorageClient).postSourceStorageRecords(recordCaptor.capture());
- verify(replaceInstanceEventHandler).getSourceStorageRecordsClient(any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
- verify(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
+ verify(replaceInstanceEventHandler).getSourceStorageRecordsClient(any(), any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
+ verify(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
verify(sourceStorageSnapshotsClient).postSourceStorageSnapshots(argThat(snapshot -> snapshot.getJobExecutionId().equals(record.getSnapshotId())));
assertNotNull(recordId, recordCaptor.getValue().getMatchedId());
}
@@ -709,8 +709,8 @@ public void shouldUpdateSharedMarcInstanceOnCentralTenantIfPayloadContainsCentra
ArgumentCaptor recordCaptor = ArgumentCaptor.forClass(Record.class);
verify(sourceStorageClient).putSourceStorageRecordsGenerationById(any(), recordCaptor.capture());
- verify(replaceInstanceEventHandler, times(2)).getSourceStorageRecordsClient(any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
- verify(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
+ verify(replaceInstanceEventHandler, times(2)).getSourceStorageRecordsClient(any(), any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
+ verify(replaceInstanceEventHandler).getSourceStorageSnapshotsClient(any(), any(), argThat(tenantId -> tenantId.equals(consortiumTenant)));
verify(sourceStorageSnapshotsClient).postSourceStorageSnapshots(argThat(snapshot -> snapshot.getJobExecutionId().equals(record.getSnapshotId())));
}
@@ -1309,8 +1309,4 @@ private void mockInstance(String sourceType) {
}).when(instanceRecordCollection).findById(anyString(), any(Consumer.class), any(Consumer.class));
}
- private static HttpResponseImpl buildHttpResponseWithBuffer(Buffer buffer, int httpStatus) {
- return new HttpResponseImpl<>(null, httpStatus, "",
- null, null, null, buffer, null);
- }
}
diff --git a/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java b/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java
new file mode 100644
index 000000000..b95728a74
--- /dev/null
+++ b/src/test/java/org/folio/inventory/instanceingress/handler/CreateInstanceIngressEventHandlerUnitTest.java
@@ -0,0 +1,382 @@
+package org.folio.inventory.instanceingress.handler;
+
+import static io.vertx.core.Future.failedFuture;
+import static io.vertx.core.Future.succeededFuture;
+import static io.vertx.core.buffer.impl.BufferImpl.buffer;
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.folio.inventory.TestUtil.buildHttpResponseWithBuffer;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_I;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.SUBFIELD_L;
+import static org.folio.inventory.dataimport.util.AdditionalFieldsUtil.TAG_999;
+import static org.folio.inventory.dataimport.util.MappingConstants.MARC_BIB_RECORD_TYPE;
+import static org.folio.rest.jaxrs.model.InstanceIngressPayload.SourceType.LINKED_DATA;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import org.apache.http.HttpStatus;
+import org.folio.MappingMetadataDto;
+import org.folio.inventory.TestUtil;
+import org.folio.inventory.common.Context;
+import org.folio.inventory.common.domain.Failure;
+import org.folio.inventory.common.domain.Success;
+import org.folio.inventory.dataimport.cache.MappingMetadataCache;
+import org.folio.inventory.dataimport.handlers.actions.PrecedingSucceedingTitlesHelper;
+import org.folio.inventory.dataimport.util.AdditionalFieldsUtil;
+import org.folio.inventory.domain.instances.Instance;
+import org.folio.inventory.domain.instances.InstanceCollection;
+import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
+import org.folio.inventory.services.IdStorageService;
+import org.folio.inventory.storage.Storage;
+import org.folio.processing.mapping.defaultmapper.processor.parameters.MappingParameters;
+import org.folio.rest.client.SourceStorageRecordsClient;
+import org.folio.rest.client.SourceStorageSnapshotsClient;
+import org.folio.rest.jaxrs.model.InstanceIngressEvent;
+import org.folio.rest.jaxrs.model.InstanceIngressPayload;
+import org.folio.rest.jaxrs.model.Record;
+import org.folio.rest.jaxrs.model.Snapshot;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(VertxUnitRunner.class)
+public class CreateInstanceIngressEventHandlerUnitTest {
+ private static final String MAPPING_RULES_PATH = "src/test/resources/handlers/bib-rules.json";
+ private static final String BIB_RECORD_PATH = "src/test/resources/handlers/bib-record.json";
+
+ @Rule
+ public MockitoRule initRule = MockitoJUnit.rule();
+ @Mock
+ private SourceStorageRecordsClient sourceStorageClient;
+ @Mock
+ private SourceStorageSnapshotsClient sourceStorageSnapshotsClient;
+ @Mock
+ private PrecedingSucceedingTitlesHelper precedingSucceedingTitlesHelper;
+ @Mock
+ private MappingMetadataCache mappingMetadataCache;
+ @Mock
+ private IdStorageService idStorageService;
+ @Mock
+ private HttpClient httpClient;
+ @Mock
+ private Context context;
+ @Mock
+ private Storage storage;
+ @Mock
+ private InstanceCollection instanceCollection;
+ private CreateInstanceIngressEventHandler handler;
+
+ @Before
+ public void setUp() {
+ doReturn("tenant").when(context).getTenantId();
+ doReturn("okapiUrl").when(context).getOkapiLocation();
+ doReturn("token").when(context).getToken();
+ doReturn(instanceCollection).when(storage).getInstanceCollection(context);
+ handler = spy(new CreateInstanceIngressEventHandler(precedingSucceedingTitlesHelper,
+ mappingMetadataCache, idStorageService, httpClient, context, storage));
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifEventDoesNotContainData() {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString());
+ var expectedMessage = format("InstanceIngressEvent message does not contain " +
+ "required data to create Instance for eventId: '%s'", event.getId());
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertEquals(expectedMessage, exception.getCause().getMessage());
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifIdStorageServiceStoreFails() {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject("{}")
+ .withSourceType(LINKED_DATA)
+ );
+ var expectedMessage = "idStorageService failure";
+ doReturn(failedFuture(expectedMessage)).when(idStorageService).store(anyString(), anyString(), anyString());
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).startsWith(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifMappingMetadataWasNotFound() {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject("{}")
+ .withSourceType(LINKED_DATA)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ doReturn(succeededFuture(Optional.empty())).when(mappingMetadataCache)
+ .getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+ var expectedMessage = "MappingMetadata was not found for marc-bib record type";
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).startsWith(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifInstanceValidationFails() throws IOException {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject("{}")
+ .withSourceType(LINKED_DATA)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
+ doReturn(succeededFuture(Optional.of(new MappingMetadataDto()
+ .withMappingRules(mappingRules.encode())
+ .withMappingParams(Json.encode(new MappingParameters())))))
+ .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
+ doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
+
+ var expectedMessage = "Mapped Instance is invalid: [Field 'title' is a required field and can not be null, "
+ + "Field 'instanceTypeId' is a required field and can not be null], from InstanceIngressEvent with id '" + event.getId() + "'";
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifInstanceSavingFailed() throws IOException {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH))
+ .withSourceType(LINKED_DATA)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
+ doReturn(succeededFuture(Optional.of(new MappingMetadataDto()
+ .withMappingRules(mappingRules.encode())
+ .withMappingParams(Json.encode(new MappingParameters())))))
+ .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
+ doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
+
+ var expectedMessage = "Some failure";
+ doAnswer(i -> {
+ Consumer failureHandler = i.getArgument(2);
+ failureHandler.accept(new Failure(expectedMessage, 400));
+ return null;
+ }).when(instanceCollection).add(any(), any(), any());
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifCreatePrecedingSucceedingTitlesFailed() throws IOException {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH))
+ .withSourceType(LINKED_DATA)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
+ doReturn(succeededFuture(Optional.of(new MappingMetadataDto()
+ .withMappingRules(mappingRules.encode())
+ .withMappingParams(Json.encode(new MappingParameters())))))
+ .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
+ doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
+ doAnswer(i -> {
+ Consumer> sucessHandler = i.getArgument(1);
+ sucessHandler.accept(new Success<>(i.getArgument(0)));
+ return null;
+ }).when(instanceCollection).add(any(), any(), any());
+ var expectedMessage = "CreatePrecedingSucceedingTitles failure";
+ doReturn(failedFuture(expectedMessage)).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).isEqualTo(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifSourceStorageSnapshotsClientReturnsError() throws IOException {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH))
+ .withSourceType(LINKED_DATA)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
+ doReturn(succeededFuture(Optional.of(new MappingMetadataDto()
+ .withMappingRules(mappingRules.encode())
+ .withMappingParams(Json.encode(new MappingParameters())))))
+ .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ doAnswer(i -> {
+ Consumer> sucessHandler = i.getArgument(1);
+ sucessHandler.accept(new Success<>(i.getArgument(0)));
+ return null;
+ }).when(instanceCollection).add(any(), any(), any());
+ doReturn(succeededFuture()).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ var snapshotHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST);
+ doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
+
+ var expectedMessage = "Failed to create snapshot in SRS, snapshot id: ";
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).startsWith(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnFailedFuture_ifItsFailedToCreateMarcRecordInSrs() throws IOException {
+ // given
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH))
+ .withSourceType(LINKED_DATA)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
+ doReturn(succeededFuture(Optional.of(new MappingMetadataDto()
+ .withMappingRules(mappingRules.encode())
+ .withMappingParams(Json.encode(new MappingParameters())))))
+ .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
+ doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
+ doAnswer(i -> {
+ Consumer> sucessHandler = i.getArgument(1);
+ sucessHandler.accept(new Success<>(i.getArgument(0)));
+ return null;
+ }).when(instanceCollection).add(any(), any(), any());
+ doReturn(succeededFuture()).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());
+ doReturn(sourceStorageClient).when(handler).getSourceStorageRecordsClient(any(), any(), any());
+ var sourceStorageHttpResponse = buildHttpResponseWithBuffer(HttpStatus.SC_BAD_REQUEST);
+ doReturn(succeededFuture(sourceStorageHttpResponse)).when(sourceStorageClient).postSourceStorageRecords(any());
+
+ var expectedMessage = "Failed to create MARC record in SRS, instanceId: ";
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ var exception = Assert.assertThrows(ExecutionException.class, future::get);
+ assertThat(exception.getCause().getMessage()).startsWith(expectedMessage);
+ }
+
+ @Test
+ public void shouldReturnSucceededFuture_ifProcessFinishedCorrectly() throws IOException, ExecutionException, InterruptedException {
+ // given
+ var linkedDataIdId = "someLinkedDataIdId";
+ var event = new InstanceIngressEvent()
+ .withId(UUID.randomUUID().toString())
+ .withEventPayload(new InstanceIngressPayload()
+ .withSourceRecordObject(TestUtil.readFileFromPath(BIB_RECORD_PATH))
+ .withSourceType(LINKED_DATA)
+ .withSourceRecordIdentifier(UUID.randomUUID().toString())
+ .withAdditionalProperty("linkedDataId", linkedDataIdId)
+ );
+ doReturn(succeededFuture(null)).when(idStorageService).store(anyString(), anyString(), anyString());
+ var mappingRules = new JsonObject(TestUtil.readFileFromPath(MAPPING_RULES_PATH));
+ doReturn(succeededFuture(Optional.of(new MappingMetadataDto()
+ .withMappingRules(mappingRules.encode())
+ .withMappingParams(Json.encode(new MappingParameters())))))
+ .when(mappingMetadataCache).getByRecordType(InstanceIngressEventConsumer.class.getSimpleName(), context, MARC_BIB_RECORD_TYPE);
+ doReturn(sourceStorageSnapshotsClient).when(handler).getSourceStorageSnapshotsClient(any(), any(), any());
+ var snapshotHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Snapshot())), HttpStatus.SC_CREATED);
+ doReturn(succeededFuture(snapshotHttpResponse)).when(sourceStorageSnapshotsClient).postSourceStorageSnapshots(any());
+ doAnswer(i -> {
+ Consumer> sucessHandler = i.getArgument(1);
+ sucessHandler.accept(new Success<>(i.getArgument(0)));
+ return null;
+ }).when(instanceCollection).add(any(), any(), any());
+ doReturn(succeededFuture()).when(precedingSucceedingTitlesHelper).createPrecedingSucceedingTitles(any(), any());
+ doReturn(sourceStorageClient).when(handler).getSourceStorageRecordsClient(any(), any(), any());
+ var sourceStorageHttpResponse = buildHttpResponseWithBuffer(buffer(Json.encode(new Record())), HttpStatus.SC_CREATED);
+ doReturn(succeededFuture(sourceStorageHttpResponse)).when(sourceStorageClient).postSourceStorageRecords(any());
+
+ // when
+ var future = handler.handle(event);
+
+ // then
+ assertThat(future.isDone()).isTrue();
+
+ var instance = future.get();
+ assertThat(instance.getId()).isEqualTo(event.getEventPayload().getSourceRecordIdentifier());
+ assertThat(instance.getSource()).isEqualTo("LINKED_DATA");
+ assertThat(instance.getIdentifiers().stream().anyMatch(i -> i.value.equals("(ld) " + linkedDataIdId))).isTrue();
+
+ var recordCaptor = ArgumentCaptor.forClass(Record.class);
+ verify(sourceStorageClient).postSourceStorageRecords(recordCaptor.capture());
+ var recordSentToSRS = recordCaptor.getValue();
+ assertThat(recordSentToSRS.getId()).isNotNull();
+ assertThat(recordSentToSRS.getId()).isNotEqualTo(event.getEventPayload().getSourceRecordIdentifier());
+ assertThat(recordSentToSRS.getId()).doesNotContain(linkedDataIdId);
+ assertThat(recordSentToSRS.getRecordType()).isEqualTo(Record.RecordType.MARC_BIB);
+ assertThat(AdditionalFieldsUtil.getValue(recordSentToSRS, TAG_999, SUBFIELD_I)).hasValue(instance.getId());
+ assertThat(AdditionalFieldsUtil.getValue(recordSentToSRS, TAG_999, SUBFIELD_L)).hasValue(linkedDataIdId);
+ }
+}