Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MODINV-986: InstanceIngress events consumption #716

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 21 additions & 98 deletions src/main/java/org/folio/inventory/DataImportConsumerVerticle.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.inventory;

import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_CREATED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_MATCHED;
import static org.folio.DataImportEventTypes.DI_INVENTORY_HOLDING_NOT_MATCHED;
Expand All @@ -15,6 +16,7 @@
import static org.folio.DataImportEventTypes.DI_MARC_FOR_UPDATE_RECEIVED;
import static org.folio.DataImportEventTypes.DI_PENDING_ORDER_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_RECORD_MATCHED;
Expand All @@ -25,45 +27,21 @@
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_MODIFIED_READY_FOR_POST_PROCESSING;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDING_RECORD_CREATED;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_AUTHORITY_RECORD_DELETED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV;
import static org.folio.DataImportEventTypes.DI_INCOMING_MARC_BIB_RECORD_PARSED;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL;

import java.util.ArrayList;
import io.vertx.core.Promise;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.DataImportEventTypes;
import org.folio.inventory.consortium.cache.ConsortiumDataCache;
import org.folio.inventory.dataimport.cache.MappingMetadataCache;
import org.folio.inventory.dataimport.cache.ProfileSnapshotCache;
import org.folio.inventory.dataimport.consumers.DataImportKafkaHandler;
import org.folio.inventory.dataimport.util.ConsumerWrapperUtil;
import org.folio.inventory.storage.Storage;
import org.folio.kafka.AsyncRecordHandler;
import org.folio.kafka.GlobalLoadSensor;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.KafkaConsumerWrapper;
import org.folio.kafka.KafkaTopicNameHelper;
import org.folio.kafka.SubscriptionDefinition;
import org.folio.inventory.support.KafkaConsumerVerticle;
import org.folio.okapi.common.GenericCompositeFuture;
import org.folio.processing.events.EventManager;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;

public class DataImportConsumerVerticle extends AbstractVerticle {
public class DataImportConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(DataImportConsumerVerticle.class);

Expand Down Expand Up @@ -97,89 +75,34 @@ public class DataImportConsumerVerticle extends AbstractVerticle {
DI_PENDING_ORDER_CREATED
);

private final int loadLimit = getLoadLimit();
private final int maxDistributionNumber = getMaxDistributionNumber();
private final List<KafkaConsumerWrapper<String, String>> consumerWrappers = new ArrayList<>();

@Override
public void start(Promise<Void> startPromise) {
JsonObject config = vertx.getOrCreateContext().config();
KafkaConfig kafkaConfig = KafkaConfig.builder()
.envId(config.getString(KAFKA_ENV))
.kafkaHost(config.getString(KAFKA_HOST))
.kafkaPort(config.getString(KAFKA_PORT))
.okapiUrl(config.getString(OKAPI_URL))
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR)))
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE)))
.build();
LOGGER.info("kafkaConfig: {}", kafkaConfig);
EventManager.registerKafkaEventPublisher(kafkaConfig, vertx, maxDistributionNumber);

HttpClient client = vertx.createHttpClient();
Storage storage = Storage.basedUpon(config, client);
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber());

String profileSnapshotExpirationTime = getCacheEnvVariable(config, "inventory.profile-snapshot-cache.expiration.time.seconds");
String mappingMetadataExpirationTime = getCacheEnvVariable(config, "inventory.mapping-metadata-cache.expiration.time.seconds");
var profileSnapshotExpirationTime = getCacheEnvVariable("inventory.profile-snapshot-cache.expiration.time.seconds");

ProfileSnapshotCache profileSnapshotCache = new ProfileSnapshotCache(vertx, client, Long.parseLong(profileSnapshotExpirationTime));
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime));
ConsortiumDataCache consortiumDataCache = new ConsortiumDataCache(vertx, client);
var profileSnapshotCache = new ProfileSnapshotCache(vertx, getHttpClient(), Long.parseLong(profileSnapshotExpirationTime));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

DataImportKafkaHandler dataImportKafkaHandler = new DataImportKafkaHandler(
vertx, storage, client, profileSnapshotCache, kafkaConfig, mappingMetadataCache, consortiumDataCache);
var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), profileSnapshotCache,
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

List<Future<KafkaConsumerWrapper<String, String>>> futures = EVENT_TYPES.stream()
.map(eventType -> createKafkaConsumerWrapper(kafkaConfig, eventType, dataImportKafkaHandler))
.collect(Collectors.toList());
var futures = EVENT_TYPES.stream()
.map(eventType -> {
var consumerWrapper = super.createConsumer(eventType.value());
return consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
})
.toList();

GenericCompositeFuture.all(futures)
.onFailure(startPromise::fail)
.onSuccess(ar -> {
futures.forEach(future -> consumerWrappers.add(future.result()));
startPromise.complete();
});
.onSuccess(ar -> startPromise.complete());
}

@Override
public void stop(Promise<Void> stopPromise) {
List<Future<Void>> stopFutures = consumerWrappers.stream()
.map(KafkaConsumerWrapper::stop)
.collect(Collectors.toList());

GenericCompositeFuture.join(stopFutures).onComplete(ar -> stopPromise.complete());
}

private Future<KafkaConsumerWrapper<String, String>> createKafkaConsumerWrapper(KafkaConfig kafkaConfig, DataImportEventTypes eventType,
AsyncRecordHandler<String, String> recordHandler) {
SubscriptionDefinition subscriptionDefinition = KafkaTopicNameHelper.createSubscriptionDefinition(kafkaConfig.getEnvId(),
KafkaTopicNameHelper.getDefaultNameSpace(), eventType.value());

KafkaConsumerWrapper<String, String> consumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(kafkaConfig)
.loadLimit(loadLimit)
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(subscriptionDefinition)
.build();

return consumerWrapper.start(recordHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper);
protected Logger getLogger() {
return LOGGER;
}

private int getLoadLimit() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumer.loadLimit", "5"));
}

private int getMaxDistributionNumber() {
return Integer.parseInt(System.getProperty("inventory.kafka.DataImportConsumerVerticle.maxDistributionNumber", "100"));
}

private String getCacheEnvVariable(JsonObject config, String variableName) {
String cacheExpirationTime = config.getString(variableName);
if (StringUtils.isBlank(cacheExpirationTime)) {
cacheExpirationTime = "3600";
}
return cacheExpirationTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.folio.inventory;

import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;

import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.handler.InstanceIngestEventHandler;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngestConsumerVerticle extends KafkaConsumerVerticle {

private static final String INSTANCE_INGEST_TOPIC = "inventory.instance_ingest";
private static final Logger LOGGER = LogManager.getLogger(InstanceIngestConsumerVerticle.class);

@Override
public void start(Promise<Void> startPromise) {
var instanceIngestEventHandler = new InstanceIngestEventHandler();

var consumerWrapper = createConsumer(INSTANCE_INGEST_TOPIC);

consumerWrapper.start(instanceIngestEventHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

@Override
protected Logger getLogger() {
return LOGGER;
}
}
10 changes: 9 additions & 1 deletion src/main/java/org/folio/inventory/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class Launcher {
private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber";
private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber";
private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber";
private static final String INSTANCE_INGEST_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngestConsumerVerticle.instancesNumber";
private static final VertxAssistant vertxAssistant = new VertxAssistant();

private static String inventoryModuleDeploymentId;
Expand All @@ -33,6 +34,7 @@ public class Launcher {
private static String quickMarcConsumerVerticleDeploymentId;
private static String marcBibUpdateConsumerVerticleDeploymentId;
private static String consortiumInstanceSharingVerticleDeploymentId;
private static String instanceIngestConsumerVerticleDeploymentId;

public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -96,28 +98,33 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1"));
int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3"));
int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3"));
int instanceIngestConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGEST_VERTICLE_NUMBER_CONFIG, "3"));

CompletableFuture<String> future1 = new CompletableFuture<>();
CompletableFuture<String> future2 = new CompletableFuture<>();
CompletableFuture<String> future3 = new CompletableFuture<>();
CompletableFuture<String> future4 = new CompletableFuture<>();
CompletableFuture<String> future5 = new CompletableFuture<>();
CompletableFuture<String> future6 = new CompletableFuture<>();
vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(),
consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1);
vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(),
vertxAssistant.deployVerticle(MarcBibInstanceHridSetConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceHridSetConsumerVerticleNumber, future2);
vertxAssistant.deployVerticle(QuickMarcConsumerVerticle.class.getName(),
consumerVerticlesConfig, quickMarcConsumerVerticleNumber, future3);
vertxAssistant.deployVerticle(MarcBibUpdateConsumerVerticle.class.getName(),
consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4);
vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(),
consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5);
vertxAssistant.deployVerticle(InstanceIngestConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceIngestConsumerVerticleNumber, future6);

consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS);
marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS);
quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS);
marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS);
consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS);
instanceIngestConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
}

private static void stop() {
Expand All @@ -133,6 +140,7 @@ private static void stop() {
.thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngestConsumerVerticleDeploymentId))
.thenAccept(v -> vertxAssistant.stop(stopped));

stopped.thenAccept(v -> log.info("Server Stopped"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.folio.inventory;

import static org.folio.DataImportEventTypes.DI_SRS_MARC_BIB_INSTANCE_HRID_SET;
import static org.folio.DataImportEventTypes.DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET;
import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.dataimport.consumers.MarcBibInstanceHridSetKafkaHandler;
import org.folio.inventory.dataimport.consumers.MarcHoldingsRecordHridSetKafkaHandler;
import org.folio.inventory.dataimport.handlers.actions.HoldingsUpdateDelegate;
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate;
import org.folio.inventory.services.HoldingsCollectionService;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class MarcBibInstanceHridSetConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(MarcBibInstanceHridSetConsumerVerticle.class);

@Override
public void start(Promise<Void> startPromise) {
var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value());
var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value());

var holdingsCollectionService = new HoldingsCollectionService();
var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());
var holdingsRecordUpdateDelegate = new HoldingsUpdateDelegate(getStorage(), holdingsCollectionService);

var marcBibInstanceHridSetKafkaHandler = new MarcBibInstanceHridSetKafkaHandler(instanceUpdateDelegate, getMappingMetadataCache());
var marcHoldingsRecordHridSetKafkaHandler = new MarcHoldingsRecordHridSetKafkaHandler(holdingsRecordUpdateDelegate, getMappingMetadataCache());

CompositeFuture.all(
marcBibConsumerWrapper.start(marcBibInstanceHridSetKafkaHandler, constructModuleName()),
marcHoldingsConsumerWrapper.start(marcHoldingsRecordHridSetKafkaHandler, constructModuleName())
)
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}

@Override
protected Logger getLogger() {
return LOGGER;
}

}
Loading