Skip to content

Commit

Permalink
MODINV-986: InstanceIngress create events consumption (#729)
Browse files Browse the repository at this point in the history
* MODINV-986: step 1: prepare InstanceIngestConsumerVerticle re-using other consumer verticles logic

* MODINV-986: code review remark fixes

* MODINV-986: instance_ingest -> instance_ingress

* MODINV-986: InstanceIngressEventConsumer and CreateInstanceIngressEventHandler

* MODINV-986: NEWS.md update

* MODINV-986: re-using code from CreateInstanceEventHandler and AbstractInstanceEventHandler + proper additional fields setting

* MODINV-986: remove a namespace from a topic name

* MODINV-986: fix instance source mapped to MARC, code optimization

* MODINV-986: post-rebase fixes

* MODINV-986: unit test + fixes

* MODINV-986: unit test + fixes

* MODINV-986: return consumers base property

* MODINV-986: rename source type BIBFRAME to LINKED_DATA

* MODINV-986: minor hotfix

* MODINV-986: postSnapshotInSrs just before saving + minor fixes
  • Loading branch information
PBobylev authored Jun 20, 2024
1 parent a06ee8e commit 190b661
Show file tree
Hide file tree
Showing 32 changed files with 1,334 additions and 415 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Replace GET with POST request for fetching instances and holdings on /items endpoint to omit 414 error [MODINV-943](https://folio-org.atlassian.net/browse/MODINV-943)
* Call suppress-on-discovery for source record on holding update if discoverySuppress is true [MODINV-977](https://folio-org.atlassian.net/browse/MODINV-977)
* Requires `holdings-storage 2.0 3.0 4.0 5.0 6.0 7.0`
* InstanceIngress create events consumption [MODINV-986](https://folio-org.atlassian.net/browse/MODINV-986)

## 20.2.0 2023-03-20
* Inventory cannot process Holdings with virtual fields ([MODINV-941](https://issues.folio.org/browse/MODINV-941))
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@
<path>${basedir}/ramls/holdings-record.json</path>
<path>${basedir}/ramls/holdings-records-source.json</path>
<path>${basedir}/ramls/mappingMetadataDto.json</path>
<path>${basedir}/ramls/instance-ingress-event.json</path>
</sourcePaths>
<targetPackage>org.folio</targetPackage>
<generateBuilders>true</generateBuilders>
Expand Down
96 changes: 96 additions & 0 deletions ramls/instance-ingress-event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Instance ingress event data model",
"javaType": "org.folio.rest.jaxrs.model.InstanceIngressEvent",
"type": "object",
"additionalProperties": false,
"properties": {
"id": {
"description": "UUID",
"$ref": "uuid.json"
},
"eventType": {
"type": "string",
"enum": ["CREATE_INSTANCE", "UPDATE_INSTANCE"],
"description": "Instance ingress event type"
},
"InstanceIngressEventMetadata": {
"description": "Event metadata",
"type": "object",
"additionalProperties": false,
"properties": {
"eventTTL": {
"description": "Time-to-live (TTL) for event in minutes",
"type": "integer"
},
"correlationId": {
"description": "Id to track related events, can be a meaningful string or a UUID",
"type": "string"
},
"originalEventId": {
"description": "Id of the event that started the sequence of related events",
"$ref": "uuid.json"
},
"publisherCallback": {
"description": "Allows a publisher to provide a callback endpoint or an error Event Type to be notified that despite the fact that there are subscribers for such an event type no one has received the event within the specified period of time",
"type": "object",
"properties": {
"endpoint": {
"description": "Callback endpoint",
"type": "string"
},
"eventType": {
"description": "Error Event Type",
"type": "string"
}
}
},
"createdDate": {
"description": "Timestamp when event was created",
"type": "string",
"format": "date-time"
},
"publishedDate": {
"description": "Timestamp when event was initially published to the underlying topic",
"type": "string",
"format": "date-time"
},
"createdBy": {
"description": "Username of the user whose action caused an event",
"type": "string"
},
"publishedBy": {
"description": "Name and version of the module that published an event",
"type": "string"
}
},
"required": [
"eventTTL",
"publishedBy"
]
},
"eventPayload": {
"type": "object",
"description": "An instance source record container",
"$ref": "instance-ingress-payload.json"
},
"tenant": {
"description": "Tenant id",
"type": "string"
},
"ts": {
"description": "Message timestamp",
"type": "string",
"format": "date-time"
}
},
"excludedFromEqualsAndHashCode": [
"eventMetadata",
"tenant",
"ts"
],
"required": [
"id",
"eventType"
]
}
25 changes: 25 additions & 0 deletions ramls/instance-ingress-payload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "An instance source record container",
"type": "object",
"properties": {
"sourceRecordIdentifier": {
"type": "string",
"description": "The source record identifier"
},
"sourceRecordObject": {
"type": "string",
"description": "The source record JSON object"
},
"sourceType": {
"type": "string",
"enum": ["FOLIO", "LINKED_DATA", "MARC"],
"description": "Source type"
}
},
"additionalProperties": true,
"required": [
"sourceRecordObject",
"sourceType"
]
}
119 changes: 20 additions & 99 deletions src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.inventory;

import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED;
Expand All @@ -15,6 +16,7 @@
import static org.folio.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.DataImportEventTypes.DI_PENDING_ORDER_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED;
Expand All @@ -25,45 +27,20 @@
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;

import java.util.ArrayList;
import io.vertx.core.Promise;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventTypes;
import org.folio.inventory.consortium.cache.ConsortiumDataCache;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.cache.ProfileSnapshotCache;
import org.folio.inventory.dataimport.consumers.DataImportKafkaHandler;
import org.folio.inventory.dataimport.util.ConsumerWrapperUtil;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.inventory.support.KafkaConsumerVerticle;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.events.EventManager;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;

public class DataImportConsumerVerticle extends AbstractVerticle {
public class DataImportConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(DataImportConsumerVerticle.class);

Expand Down Expand Up @@ -96,88 +73,32 @@ public class DataImportConsumerVerticle extends AbstractVerticle {
DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED,
DI_PENDING_ORDER_CREATED
);

private final int loadLimit = getLoadLimit();
private final int maxDistributionNumber = getMaxDistributionNumber();
private final List<KafkaConsumerWrapper<String, String>> consumerWrappers = new ArrayList<>();
private static final String LOAD_LIMIT_PROPERTY = "DataImportConsumer";
private static final String MAX_DISTRIBUTION_PROPERTY = "DataImportConsumerVerticle";

@Override
public void start(Promise<Void> startPromise) {
JsonObject config = vertx.getOrCreateContext().config();
KafkaConfig kafkaConfig = KafkaConfig.builder()
.envId(config.getString(KAFKA_ENV))
.kafkaHost(config.getString(KAFKA_HOST))
.kafkaPort(config.getString(KAFKA_PORT))
.okapiUrl(config.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
.build();
LOGGER.info("kafkaConfig: {}", kafkaConfig);
EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber);

HttpClient client = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, client);

String profileSnapshotExpirationTime = getCacheEnvVariable(config, "inventory.profile-snapshot-cache.expiration.time.seconds");
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

ProfileSnapshotCache profileSnapshotCache = new ProfileSnapshotCache(vertx, client, Long.parseLong(profileSnapshotExpirationTime));
ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client);
MappingMetadataCache mappingMetadataCache = MappingMetadataCache.getInstance(vertx, client);
DataImportKafkaHandler dataImportKafkaHandler = new DataImportKafkaHandler(
vertx, storage, client, profileSnapshotCache, kafkaConfig, mappingMetadataCache, consortiumDataCache);
var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(),
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

List<Future<KafkaConsumerWrapper<String, String>>> futures = EVENT_TYPES.stream()
.map(eventType -> createKafkaConsumerWrapper(kafkaConfig, eventType, dataImportKafkaHandler))
.collect(Collectors.toList());
var futures = EVENT_TYPES.stream()
.map(type -> super.createConsumer(type.value(), LOAD_LIMIT_PROPERTY))
.map(consumerWrapper -> consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper)
)
.toList();

GenericCompositeFuture.all(futures)
.onFailure(startPromise::fail)
.onSuccess(ar -> {
futures.forEach(future -> consumerWrappers.add(future.result()));
startPromise.complete();
});
.onSuccess(ar -> startPromise.complete());
}

@Override
public void stop(Promise<Void> stopPromise) {
List<Future<Void>> stopFutures = consumerWrappers.stream()
.map(KafkaConsumerWrapper::stop)
.collect(Collectors.toList());

GenericCompositeFuture.join(stopFutures).onComplete(ar -> stopPromise.complete());
}

private Future<KafkaConsumerWrapper<String, String>> createKafkaConsumerWrapper(KafkaConfig kafkaConfig, DataImportEventTypes eventType,
AsyncRecordHandler<String, String> recordHandler) {
SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(kafkaConfig.getEnvId(),
KafkaTopicNameHelper.getDefaultNameSpace(), eventType.value());

KafkaConsumerWrapper<String, String> consumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
.build();

return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
protected Logger getLogger() {
return LOGGER;
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumer.loadLimit", "5"));
}

private int getMaxDistributionNumber() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber", "100"));
}

private String getCacheEnvVariable(JsonObject config, String variableName) {
String cacheExpirationTime = config.getString(variableName);
if (StringUtils.isBlank(cacheExpirationTime)) {
cacheExpirationTime = "3600";
}
return cacheExpirationTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.folio.inventory;

import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;

import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.instanceingress.InstanceIngressEventConsumer;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(InstanceIngressConsumerVerticle.class);
private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress";
private static final String BASE_PROPERTY = "InstanceIngressConsumerVerticle";

@Override
public void start(Promise<Void> startPromise) {
var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx, getStorage(), getHttpClient(), getMappingMetadataCache());

var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, BASE_PROPERTY, false);

consumerWrapper.start(instanceIngressEventHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

@Override
protected Logger getLogger() {
return LOGGER;
}

}
8 changes: 8 additions & 0 deletions src/main/java/org/folio/inventory/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Launcher {
private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber";
private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber";
private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber";
private static final String INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngressConsumerVerticle.instancesNumber";
private static final VertxAssistant vertxAssistant = new VertxAssistant();

private static String inventoryModuleDeploymentId;
Expand All @@ -33,6 +34,7 @@ public class Launcher {
private static String quickMarcConsumerVerticleDeploymentId;
private static String marcBibUpdateConsumerVerticleDeploymentId;
private static String consortiumInstanceSharingVerticleDeploymentId;
private static String instanceIngressConsumerVerticleDeploymentId;

public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -96,12 +98,14 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1"));
int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3"));
int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3"));
int instanceIngressConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG, "3"));

CompletableFuture<String> future1 = new CompletableFuture<>();
CompletableFuture<String> future2 = new CompletableFuture<>();
CompletableFuture<String> future3 = new CompletableFuture<>();
CompletableFuture<String> future4 = new CompletableFuture<>();
CompletableFuture<String> future5 = new CompletableFuture<>();
CompletableFuture<String> future6 = new CompletableFuture<>();
vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(),
consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1);
vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(),
Expand All @@ -112,12 +116,15 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4);
vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(),
consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5);
vertxAssistant.deployVerticle(InstanceIngressConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceIngressConsumerVerticleNumber, future6);

consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS);
marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS);
quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS);
marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS);
consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS);
instanceIngressConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
}

private static void stop() {
Expand All @@ -133,6 +140,7 @@ private static void stop() {
.thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngressConsumerVerticleDeploymentId))
.thenAccept(v -> vertxAssistant.stop(stopped));

stopped.thenAccept(v -> log.info("Server Stopped"));
Expand Down
Loading

0 comments on commit 190b661

Please sign in to comment.