-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
MODINV-986: step 1: prepare InstanceIngestConsumerVerticle re-using o…
…ther consumer verticles logic
- Loading branch information
Showing
8 changed files
with
262 additions
and
341 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
31 changes: 31 additions & 0 deletions
31
src/main/java/org/folio/inventory/InstanceIngestConsumerVerticle.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package org.folio.inventory; | ||
|
||
import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; | ||
|
||
import io.vertx.core.Promise; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.folio.inventory.handler.InstanceIngestEventHandler; | ||
import org.folio.inventory.support.KafkaConsumerVerticle; | ||
|
||
public class InstanceIngestConsumerVerticle extends KafkaConsumerVerticle { | ||
|
||
private static final String INSTANCE_INGEST_TOPIC = "inventory.instance_ingest"; | ||
private static final Logger LOGGER = LogManager.getLogger(InstanceIngestConsumerVerticle.class); | ||
|
||
@Override | ||
public void start(Promise<Void> startPromise) { | ||
var instanceIngestEventHandler = new InstanceIngestEventHandler(); | ||
|
||
var consumerWrapper = createConsumer(INSTANCE_INGEST_TOPIC); | ||
|
||
consumerWrapper.start(instanceIngestEventHandler, constructModuleName()) | ||
.onFailure(startPromise::fail) | ||
.onSuccess(ar -> startPromise.complete()); | ||
} | ||
|
||
@Override | ||
protected Logger getLogger() { | ||
return LOGGER; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
94 changes: 10 additions & 84 deletions
94
src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,109 +1,35 @@ | ||
package org.folio.inventory; | ||
|
||
import static org.folio.inventory.dataimport.util.ConsumerWrapperUtil.constructModuleName; | ||
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_ENV; | ||
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_HOST; | ||
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_MAX_REQUEST_SIZE; | ||
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_PORT; | ||
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; | ||
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.OKAPI_URL; | ||
import io.vertx.core.AbstractVerticle; | ||
|
||
import io.vertx.core.Promise; | ||
import io.vertx.core.http.HttpClient; | ||
import io.vertx.core.json.JsonObject; | ||
import org.apache.commons.lang.StringUtils; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.folio.inventory.dataimport.cache.MappingMetadataCache; | ||
import org.folio.inventory.dataimport.consumers.MarcBibUpdateKafkaHandler; | ||
import org.folio.inventory.dataimport.handlers.actions.InstanceUpdateDelegate; | ||
import org.folio.inventory.storage.Storage; | ||
import org.folio.kafka.GlobalLoadSensor; | ||
import org.folio.kafka.KafkaConfig; | ||
import org.folio.kafka.KafkaConsumerWrapper; | ||
import org.folio.kafka.SubscriptionDefinition; | ||
import org.folio.inventory.support.KafkaConsumerVerticle; | ||
|
||
public class MarcBibUpdateConsumerVerticle extends AbstractVerticle { | ||
public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle { | ||
private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class); | ||
private static final GlobalLoadSensor GLOBAL_LOAD_SENSOR = new GlobalLoadSensor(); | ||
private static final String SRS_MARC_BIB_TOPIC_NAME = "srs.marc-bib"; | ||
private static final String METADATA_EXPIRATION_TIME = "inventory.mapping-metadata-cache.expiration.time.seconds"; | ||
private final int loadLimit = getLoadLimit(); | ||
private KafkaConsumerWrapper<String, String> marcBibUpdateConsumerWrapper; | ||
private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib"; | ||
|
||
@Override | ||
public void start(Promise<Void> startPromise) { | ||
JsonObject config = vertx.getOrCreateContext().config(); | ||
KafkaConfig kafkaConfig = getKafkaConfig(config); | ||
|
||
HttpClient client = vertx.createHttpClient(); | ||
Storage storage = Storage.basedUpon(config, client); | ||
InstanceUpdateDelegate instanceUpdateDelegate = new InstanceUpdateDelegate(storage); | ||
var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage()); | ||
|
||
var mappingMetadataExpirationTime = getCacheEnvVariable(config, METADATA_EXPIRATION_TIME); | ||
MappingMetadataCache mappingMetadataCache = new MappingMetadataCache(vertx, client, Long.parseLong(mappingMetadataExpirationTime)); | ||
var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, | ||
getMaxDistributionNumber(), getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache()); | ||
|
||
MarcBibUpdateKafkaHandler marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, | ||
getMaxDistributionNumber(), kafkaConfig, instanceUpdateDelegate, mappingMetadataCache); | ||
var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT); | ||
|
||
marcBibUpdateConsumerWrapper = createConsumer(kafkaConfig, SRS_MARC_BIB_TOPIC_NAME); | ||
marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName()) | ||
.onFailure(startPromise::fail) | ||
.onSuccess(ar -> startPromise.complete()); | ||
} | ||
|
||
private KafkaConsumerWrapper<String, String> createConsumer(KafkaConfig kafkaConfig, String topicEventType) { | ||
SubscriptionDefinition subscriptionDefinition = SubscriptionDefinition.builder() | ||
.eventType(topicEventType) | ||
.subscriptionPattern(formatSubscriptionPattern(kafkaConfig.getEnvId(), topicEventType)) | ||
.build(); | ||
|
||
return KafkaConsumerWrapper.<String, String>builder() | ||
.context(context) | ||
.vertx(vertx) | ||
.kafkaConfig(kafkaConfig) | ||
.loadLimit(loadLimit) | ||
.globalLoadSensor(GLOBAL_LOAD_SENSOR) | ||
.subscriptionDefinition(subscriptionDefinition) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public void stop(Promise<Void> stopPromise) { | ||
marcBibUpdateConsumerWrapper.stop() | ||
.onComplete(ar -> stopPromise.complete()); | ||
} | ||
|
||
private int getLoadLimit() { | ||
return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.loadLimit","5")); | ||
protected Logger getLogger() { | ||
return LOGGER; | ||
} | ||
|
||
private int getMaxDistributionNumber() { | ||
return Integer.parseInt(System.getProperty("inventory.kafka.MarcBibUpdateConsumer.maxDistributionNumber", "100")); | ||
} | ||
|
||
private KafkaConfig getKafkaConfig(JsonObject config) { | ||
KafkaConfig kafkaConfig = KafkaConfig.builder() | ||
.envId(config.getString(KAFKA_ENV)) | ||
.kafkaHost(config.getString(KAFKA_HOST)) | ||
.kafkaPort(config.getString(KAFKA_PORT)) | ||
.okapiUrl(config.getString(OKAPI_URL)) | ||
.replicationFactor(Integer.parseInt(config.getString(KAFKA_REPLICATION_FACTOR))) | ||
.maxRequestSize(Integer.parseInt(config.getString(KAFKA_MAX_REQUEST_SIZE))) | ||
.build(); | ||
LOGGER.info("kafkaConfig: {}", kafkaConfig); | ||
return kafkaConfig; | ||
} | ||
|
||
private String getCacheEnvVariable(JsonObject config, String variableName) { | ||
String cacheExpirationTime = config.getString(variableName); | ||
if (StringUtils.isBlank(cacheExpirationTime)) { | ||
cacheExpirationTime = "3600"; | ||
} | ||
return cacheExpirationTime; | ||
} | ||
|
||
public static String formatSubscriptionPattern(String env, String eventType) { | ||
return String.join("\\.", env, "\\w{1,}", eventType); | ||
} | ||
} |
Oops, something went wrong.