diff --git a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java index 9fee9626a..4546b8edc 100644 --- a/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/DataImportConsumerVerticle.java @@ -73,17 +73,19 @@ public class DataImportConsumerVerticle extends KafkaConsumerVerticle { DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED, DI_PENDING_ORDER_CREATED ); + private static final String LOAD_LIMIT_PROPERTY = "DataImportConsumer"; + private static final String MAX_DISTRIBUTION_PROPERTY = "DataImportConsumerVerticle"; @Override public void start(Promise startPromise) { - EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber()); + EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY)); var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient()); var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(), getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache); var futures = EVENT_TYPES.stream() - .map(type -> super.createConsumer(type.value())) + .map(type -> super.createConsumer(type.value(), LOAD_LIMIT_PROPERTY)) .map(consumerWrapper -> consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName()) .map(consumerWrapper) ) diff --git a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java index fb1835463..be82c7d95 100644 --- a/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/InstanceIngressConsumerVerticle.java @@ -10,14 +10,15 @@ public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle { - private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress"; private static final Logger LOGGER = LogManager.getLogger(InstanceIngressConsumerVerticle.class); + private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress"; + private static final String BASE_PROPERTY = "InstanceIngressConsumerVerticle"; @Override public void start(Promise startPromise) { var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx, getStorage(), getHttpClient(), getMappingMetadataCache()); - var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, false); + var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, BASE_PROPERTY, false); consumerWrapper.start(instanceIngressEventHandler, constructModuleName()) .onFailure(startPromise::fail) @@ -28,4 +29,5 @@ public void start(Promise startPromise) { protected Logger getLogger() { return LOGGER; } + } diff --git a/src/main/java/org/folio/inventory/Launcher.java b/src/main/java/org/folio/inventory/Launcher.java index 5eb03e397..f4251e0fa 100644 --- a/src/main/java/org/folio/inventory/Launcher.java +++ b/src/main/java/org/folio/inventory/Launcher.java @@ -108,7 +108,7 @@ private static void startConsumerVerticles(Map consumerVerticles CompletableFuture future6 = new CompletableFuture<>(); vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(), consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1); - vertxAssistant.deployVerticle(MarcBibInstanceHridSetConsumerVerticle.class.getName(), + vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(), consumerVerticlesConfig, instanceHridSetConsumerVerticleNumber, future2); vertxAssistant.deployVerticle(QuickMarcConsumerVerticle.class.getName(), consumerVerticlesConfig, quickMarcConsumerVerticleNumber, future3); diff --git a/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java index 34a25e742..9c355c769 100644 --- a/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/MarcBibUpdateConsumerVerticle.java @@ -12,15 +12,16 @@ public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle { private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class); private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib"; + private static final String BASE_PROPERTY = "MarcBibUpdateConsumer"; @Override public void start(Promise startPromise) { var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage()); - var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, - getMaxDistributionNumber(), getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache()); + var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(BASE_PROPERTY), + getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache()); - var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT); + var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT, BASE_PROPERTY); marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName()) .onFailure(startPromise::fail) diff --git a/src/main/java/org/folio/inventory/MarcBibInstanceHridSetConsumerVerticle.java b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java similarity index 88% rename from src/main/java/org/folio/inventory/MarcBibInstanceHridSetConsumerVerticle.java rename to src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java index 9150836eb..947a694ea 100644 --- a/src/main/java/org/folio/inventory/MarcBibInstanceHridSetConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/MarcHridSetConsumerVerticle.java @@ -15,14 +15,15 @@ import org.folio.inventory.services.HoldingsCollectionService; import org.folio.inventory.support.KafkaConsumerVerticle; -public class MarcBibInstanceHridSetConsumerVerticle extends KafkaConsumerVerticle { +public class MarcHridSetConsumerVerticle extends KafkaConsumerVerticle { - private static final Logger LOGGER = LogManager.getLogger(MarcBibInstanceHridSetConsumerVerticle.class); + private static final Logger LOGGER = LogManager.getLogger(MarcHridSetConsumerVerticle.class); + private static final String BASE_PROPERTY = "MarcBibInstanceHridSetConsumer"; @Override public void start(Promise startPromise) { - var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value()); - var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value()); + var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value(), BASE_PROPERTY); + var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value(), BASE_PROPERTY); var holdingsCollectionService = new HoldingsCollectionService(); var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage()); diff --git a/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java b/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java index b2b2a92ad..b0414f310 100644 --- a/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/QuickMarcConsumerVerticle.java @@ -14,15 +14,17 @@ public class QuickMarcConsumerVerticle extends KafkaConsumerVerticle { private static final Logger LOGGER = LogManager.getLogger(QuickMarcConsumerVerticle.class); + private static final String LOAD_LIMIT_PROPERTY = "QuickMarcConsumer"; + private static final String MAX_DISTRIBUTION_PROPERTY = "QuickMarcConsumerVerticle"; @Override public void start(Promise startPromise) { var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(getHttpClient())); var holdingsCollectionService = new HoldingsCollectionService(); - var handler = new QuickMarcKafkaHandler(vertx, getStorage(), getMaxDistributionNumber(), + var handler = new QuickMarcKafkaHandler(vertx, getStorage(), getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY), getKafkaConfig(), precedingSucceedingTitlesHelper, holdingsCollectionService); - var consumer = createConsumer(QMEventTypes.QM_SRS_MARC_RECORD_UPDATED.name()); + var consumer = createConsumer(QMEventTypes.QM_SRS_MARC_RECORD_UPDATED.name(), LOAD_LIMIT_PROPERTY); consumer.start(handler, ConsumerWrapperUtil.constructModuleName()) .map(consumer) diff --git a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java index dad4234a5..2e0211aeb 100644 --- a/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java +++ b/src/main/java/org/folio/inventory/support/KafkaConsumerVerticle.java @@ -16,7 +16,6 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; -import io.vertx.core.Verticle; import io.vertx.core.http.HttpClient; import io.vertx.core.json.JsonObject; import java.util.ArrayList; @@ -58,16 +57,16 @@ public void stop(Promise stopPromise) { protected abstract Logger getLogger(); - protected KafkaConsumerWrapper createConsumer(String eventType) { - return createConsumer(eventType, true); + protected KafkaConsumerWrapper createConsumer(String eventType, String loadLimitPropertyKey) { + return createConsumer(eventType, loadLimitPropertyKey, true); } - protected KafkaConsumerWrapper createConsumer(String eventType, boolean namespacedTopic) { + protected KafkaConsumerWrapper createConsumer(String eventType, String loadLimitPropertyKey, boolean namespacedTopic) { var kafkaConsumerWrapper = KafkaConsumerWrapper.builder() .context(context) .vertx(vertx) .kafkaConfig(getKafkaConfig()) - .loadLimit(getLoadLimit()) + .loadLimit(getLoadLimit(loadLimitPropertyKey)) .globalLoadSensor(new GlobalLoadSensor()) .subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType, namespacedTopic)) .build(); @@ -124,8 +123,8 @@ protected String getCacheEnvVariable(String variableName) { return cacheExpirationTime; } - protected int getMaxDistributionNumber() { - return getConsumerProperty(MAX_DISTRIBUTION_NUMBER_TEMPLATE, MAX_DISTRIBUTION_NUMBER_DEFAULT); + protected int getMaxDistributionNumber(String property) { + return getConsumerProperty(MAX_DISTRIBUTION_NUMBER_TEMPLATE, property, MAX_DISTRIBUTION_NUMBER_DEFAULT); } private JsonObject getConfig() { @@ -152,14 +151,12 @@ private String formatSubscriptionPattern(String env, String eventType) { return join("\\.", env, "\\w{1,}", eventType); } - private int getLoadLimit() { - return getConsumerProperty(LOAD_LIMIT_TEMPLATE, LOAD_LIMIT_DEFAULT); + private int getLoadLimit(String propertyKey) { + return getConsumerProperty(LOAD_LIMIT_TEMPLATE, propertyKey, LOAD_LIMIT_DEFAULT); } - private int getConsumerProperty(String nameTemplate, String defaultValue) { - var consumerClassName = getClass().getSimpleName(); - var cleanConsumerName = consumerClassName.substring(0, consumerClassName.indexOf(Verticle.class.getSimpleName())); - return parseInt(getProperty(format(nameTemplate, cleanConsumerName), defaultValue)); + private int getConsumerProperty(String nameTemplate, String propertyKey, String defaultValue) { + return parseInt(getProperty(format(nameTemplate, propertyKey), defaultValue)); } } diff --git a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetConsumerVerticleTest.java b/src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java similarity index 90% rename from src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetConsumerVerticleTest.java rename to src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java index 519035bfc..61843652f 100644 --- a/src/test/java/org/folio/inventory/dataimport/consumers/MarcBibInstanceHridSetConsumerVerticleTest.java +++ b/src/test/java/org/folio/inventory/dataimport/consumers/MarcHridSetConsumerVerticleTest.java @@ -8,7 +8,7 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import org.folio.inventory.MarcBibInstanceHridSetConsumerVerticle; +import org.folio.inventory.MarcHridSetConsumerVerticle; import org.junit.AfterClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -22,7 +22,7 @@ import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR; @RunWith(VertxUnitRunner.class) -public class MarcBibInstanceHridSetConsumerVerticleTest { +public class MarcHridSetConsumerVerticleTest { private static final String TENANT_ID = "diku"; private static final String KAFKA_ENV_NAME = "test-env"; @@ -46,7 +46,7 @@ public void shouldDeployVerticle(TestContext context) { .setWorker(true); Promise promise = Promise.promise(); - vertx.deployVerticle(MarcBibInstanceHridSetConsumerVerticle.class.getName(), options, promise); + vertx.deployVerticle(MarcHridSetConsumerVerticle.class.getName(), options, promise); promise.future().onComplete(ar -> { context.assertTrue(ar.succeeded());