Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODINV-986: InstanceIngress events consumption #716

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 21 additions & 98 deletions src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.inventory;

import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED;
Expand All @@ -15,6 +16,7 @@
import static org.folio.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.DataImportEventTypes.DI_PENDING_ORDER_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED;
Expand All @@ -25,45 +27,21 @@
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;

import java.util.ArrayList;
import io.vertx.core.Promise;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventTypes;
import org.folio.inventory.consortium.cache.ConsortiumDataCache;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.cache.ProfileSnapshotCache;
import org.folio.inventory.dataimport.consumers.DataImportKafkaHandler;
import org.folio.inventory.dataimport.util.ConsumerWrapperUtil;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.inventory.support.KafkaConsumerVerticle;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.events.EventManager;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;

public class DataImportConsumerVerticle extends AbstractVerticle {
public class DataImportConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(DataImportConsumerVerticle.class);

Expand Down Expand Up @@ -97,89 +75,34 @@ public class DataImportConsumerVerticle extends AbstractVerticle {
DI_PENDING_ORDER_CREATED
);

private final int loadLimit = getLoadLimit();
private final int maxDistributionNumber = getMaxDistributionNumber();
private final List<KafkaConsumerWrapper<String, String>> consumerWrappers = new ArrayList<>();

@Override
public void start(Promise<Void> startPromise) {
JsonObject config = vertx.getOrCreateContext().config();
KafkaConfig kafkaConfig = KafkaConfig.builder()
.envId(config.getString(KAFKA_ENV))
.kafkaHost(config.getString(KAFKA_HOST))
.kafkaPort(config.getString(KAFKA_PORT))
.okapiUrl(config.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
.build();
LOGGER.info("kafkaConfig: {}", kafkaConfig);
EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber);

HttpClient client = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, client);
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber());

String profileSnapshotExpirationTime = getCacheEnvVariable(config, "inventory.profile-snapshot-cache.expiration.time.seconds");
String mappingMetadataExpirationTime = getCacheEnvVariable(config, "inventory.mapping-metadata-cache.expiration.time.seconds");
var profileSnapshotExpirationTime = getCacheEnvVariable(getConfig(), "inventory.profile-snapshot-cache.expiration.time.seconds");

ProfileSnapshotCache profileSnapshotCache = new ProfileSnapshotCache(vertx, client, Long.parseLong(profileSnapshotExpirationTime));
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));
ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client);
var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

DataImportKafkaHandler dataImportKafkaHandler = new DataImportKafkaHandler(
vertx, storage, client, profileSnapshotCache, kafkaConfig, mappingMetadataCache, consortiumDataCache);
var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache,
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

List<Future<KafkaConsumerWrapper<String, String>>> futures = EVENT_TYPES.stream()
.map(eventType -> createKafkaConsumerWrapper(kafkaConfig, eventType, dataImportKafkaHandler))
.collect(Collectors.toList());
var futures = EVENT_TYPES.stream()
.map(eventType -> {
var consumerWrapper = super.createConsumer(eventType.value());
return consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
})
.toList();

GenericCompositeFuture.all(futures)
.onFailure(startPromise::fail)
.onSuccess(ar -> {
futures.forEach(future -> consumerWrappers.add(future.result()));
startPromise.complete();
});
.onSuccess(ar -> startPromise.complete());
}

@Override
public void stop(Promise<Void> stopPromise) {
List<Future<Void>> stopFutures = consumerWrappers.stream()
.map(KafkaConsumerWrapper::stop)
.collect(Collectors.toList());

GenericCompositeFuture.join(stopFutures).onComplete(ar -> stopPromise.complete());
}

private Future<KafkaConsumerWrapper<String, String>> createKafkaConsumerWrapper(KafkaConfig kafkaConfig, DataImportEventTypes eventType,
AsyncRecordHandler<String, String> recordHandler) {
SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(kafkaConfig.getEnvId(),
KafkaTopicNameHelper.getDefaultNameSpace(), eventType.value());

KafkaConsumerWrapper<String, String> consumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
.build();

return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
protected Logger getLogger() {
return LOGGER;
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumer.loadLimit", "5"));
}

private int getMaxDistributionNumber() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber", "100"));
}

private String getCacheEnvVariable(JsonObject config, String variableName) {
String cacheExpirationTime = config.getString(variableName);
if (StringUtils.isBlank(cacheExpirationTime)) {
cacheExpirationTime = "3600";
}
return cacheExpirationTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.folio.inventory;

import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;

import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.handler.InstanceIngestEventHandler;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngestConsumerVerticle extends KafkaConsumerVerticle {

private static final String INSTANCE_INGEST_TOPIC = "inventory.instance_ingest";
private static final Logger LOGGER = LogManager.getLogger(InstanceIngestConsumerVerticle.class);

@Override
public void start(Promise<Void> startPromise) {
var instanceIngestEventHandler = new InstanceIngestEventHandler();

var consumerWrapper = createConsumer(INSTANCE_INGEST_TOPIC);

consumerWrapper.start(instanceIngestEventHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

@Override
protected Logger getLogger() {
return LOGGER;
}
}
8 changes: 8 additions & 0 deletions src/main/java/org/folio/inventory/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Launcher {
private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber";
private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber";
private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber";
private static final String INSTANCE_INGEST_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngestConsumerVerticle.instancesNumber";
private static final VertxAssistant vertxAssistant = new VertxAssistant();

private static String inventoryModuleDeploymentId;
Expand All @@ -33,6 +34,7 @@ public class Launcher {
private static String quickMarcConsumerVerticleDeploymentId;
private static String marcBibUpdateConsumerVerticleDeploymentId;
private static String consortiumInstanceSharingVerticleDeploymentId;
private static String instanceIngestConsumerVerticleDeploymentId;

public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -96,12 +98,14 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1"));
int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3"));
int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3"));
int instanceIngestConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGEST_VERTICLE_NUMBER_CONFIG, "3"));

CompletableFuture<String> future1 = new CompletableFuture<>();
CompletableFuture<String> future2 = new CompletableFuture<>();
CompletableFuture<String> future3 = new CompletableFuture<>();
CompletableFuture<String> future4 = new CompletableFuture<>();
CompletableFuture<String> future5 = new CompletableFuture<>();
CompletableFuture<String> future6 = new CompletableFuture<>();
vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(),
consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1);
vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(),
Expand All @@ -112,12 +116,15 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4);
vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(),
consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5);
vertxAssistant.deployVerticle(InstanceIngestConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceIngestConsumerVerticleNumber, future6);

consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS);
marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS);
quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS);
marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS);
consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS);
instanceIngestConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
}

private static void stop() {
Expand All @@ -133,6 +140,7 @@ private static void stop() {
.thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngestConsumerVerticleDeploymentId))
.thenAccept(v -> vertxAssistant.stop(stopped));

stopped.thenAccept(v -> log.info("Server Stopped"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,109 +1,35 @@
package org.folio.inventory;

import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;
import io.vertx.core.AbstractVerticle;

import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.consumers.MarcBibUpdateKafkaHandler;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class MarcBibUpdateConsumerVerticle extends AbstractVerticle {
public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class);
private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor();
private static final String SRS_MARC_BIB_TOPIC_NAME = "srs.marc-bib";
private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds";
private final int loadLimit = getLoadLimit();
private KafkaConsumerWrapper<String, String> marcBibUpdateConsumerWrapper;
private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib";

@Override
public void start(Promise<Void> startPromise) {
JsonObject config = vertx.getOrCreateContext().config();
KafkaConfig kafkaConfig = getKafkaConfig(config);

HttpClient client = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, client);
InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage);
var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());

var mappingMetadataExpirationTime = getCacheEnvVariable(config, METADATA_EXPIRATION_TIME);
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));
var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx,
getMaxDistributionNumber(), getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache());

MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx,
getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache);
var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT);

marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME);
marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

private KafkaConsumerWrapper<String, String> createConsumer(KafkaConfig kafkaConfig, String topicEventType) {
SubscriptionDefinition subscriptionDefinition = SubscriptionDefinition.builder()
.eventType(topicEventType)
.subscriptionPattern(formatSubscriptionPattern(kafkaConfig.getEnvId(), topicEventType))
.build();

return KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(GLOBAL_LOAD_SENSOR)
.subscriptionDefinition(subscriptionDefinition)
.build();
}

@Override
public void stop(Promise<Void> stopPromise) {
marcBibUpdateConsumerWrapper.stop()
.onComplete(ar -> stopPromise.complete());
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.loadLimit","5"));
protected Logger getLogger() {
return LOGGER;
}

private int getMaxDistributionNumber() {
return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.maxDistributionNumber", "100"));
}

private KafkaConfig getKafkaConfig(JsonObject config) {
KafkaConfig kafkaConfig = KafkaConfig.builder()
.envId(config.getString(KAFKA_ENV))
.kafkaHost(config.getString(KAFKA_HOST))
.kafkaPort(config.getString(KAFKA_PORT))
.okapiUrl(config.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
.build();
LOGGER.info("kafkaConfig: {}", kafkaConfig);
return kafkaConfig;
}

private String getCacheEnvVariable(JsonObject config, String variableName) {
String cacheExpirationTime = config.getString(variableName);
if (StringUtils.isBlank(cacheExpirationTime)) {
cacheExpirationTime = "3600";
}
return cacheExpirationTime;
}

public static String formatSubscriptionPattern(String env, String eventType) {
return String.join("\\.", env, "\\w{1,}", eventType);
}
}
Loading