Skip to content

Commit

Permalink
MODINV-986: return consumers base property
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 17, 2024
1 parent ebfe393 commit 13b647f
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ public class DataImportConsumerVerticle extends KafkaConsumerVerticle {
DI_SRS_MARC_HOLDINGS_RECORD_NOT_MATCHED,
DI_PENDING_ORDER_CREATED
);
private static final String LOAD_LIMIT_PROPERTY = "DataImportConsumer";
private static final String MAX_DISTRIBUTION_PROPERTY = "DataImportConsumerVerticle";

@Override
public void start(Promise<Void> startPromise) {
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber());
EventManager.registerKafkaEventPublisher(getKafkaConfig(), vertx, getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY));
var consortiumDataCache = new ConsortiumDataCache(vertx, getHttpClient());

var dataImportKafkaHandler = new DataImportKafkaHandler(vertx, getStorage(), getHttpClient(), getProfileSnapshotCache(),
getKafkaConfig(), getMappingMetadataCache(), consortiumDataCache);

var futures = EVENT_TYPES.stream()
.map(type -> super.createConsumer(type.value()))
.map(type -> super.createConsumer(type.value(), LOAD_LIMIT_PROPERTY))
.map(consumerWrapper -> consumerWrapper.start(dataImportKafkaHandler, ConsumerWrapperUtil.constructModuleName())
.map(consumerWrapper)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {

private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress";
private static final Logger LOGGER = LogManager.getLogger(InstanceIngressConsumerVerticle.class);
private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress";
private static final String BASE_PROPERTY = "InstanceIngressConsumerVerticle";

@Override
public void start(Promise<Void> startPromise) {
var instanceIngressEventHandler = new InstanceIngressEventConsumer(vertx, getStorage(), getHttpClient(), getMappingMetadataCache());

var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, false);
var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC, BASE_PROPERTY, false);

consumerWrapper.start(instanceIngressEventHandler, constructModuleName())
.onFailure(startPromise::fail)
Expand All @@ -28,4 +29,5 @@ public void start(Promise<Void> startPromise) {
protected Logger getLogger() {
return LOGGER;
}

}
2 changes: 1 addition & 1 deletion src/main/java/org/folio/inventory/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
CompletableFuture<String> future6 = new CompletableFuture<>();
vertxAssistant.deployVerticle(DataImportConsumerVerticle.class.getName(),
consumerVerticlesConfig, dataImportConsumerVerticleNumber, future1);
vertxAssistant.deployVerticle(MarcBibInstanceHridSetConsumerVerticle.class.getName(),
vertxAssistant.deployVerticle(MarcHridSetConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceHridSetConsumerVerticleNumber, future2);
vertxAssistant.deployVerticle(QuickMarcConsumerVerticle.class.getName(),
consumerVerticlesConfig, quickMarcConsumerVerticleNumber, future3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
public class MarcBibUpdateConsumerVerticle extends KafkaConsumerVerticle {
private static final Logger LOGGER = LogManager.getLogger(MarcBibUpdateConsumerVerticle.class);
private static final String SRS_MARC_BIB_EVENT = "srs.marc-bib";
private static final String BASE_PROPERTY = "MarcBibUpdateConsumer";

@Override
public void start(Promise<Void> startPromise) {
var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());

var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx,
getMaxDistributionNumber(), getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache());
var marcBibUpdateKafkaHandler = new MarcBibUpdateKafkaHandler(vertx, getMaxDistributionNumber(BASE_PROPERTY),
getKafkaConfig(), instanceUpdateDelegate, getMappingMetadataCache());

var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT);
var marcBibUpdateConsumerWrapper = createConsumer(SRS_MARC_BIB_EVENT, BASE_PROPERTY);

marcBibUpdateConsumerWrapper.start(marcBibUpdateKafkaHandler, constructModuleName())
.onFailure(startPromise::fail)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
import org.folio.inventory.services.HoldingsCollectionService;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class MarcBibInstanceHridSetConsumerVerticle extends KafkaConsumerVerticle {
public class MarcHridSetConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(MarcBibInstanceHridSetConsumerVerticle.class);
private static final Logger LOGGER = LogManager.getLogger(MarcHridSetConsumerVerticle.class);
private static final String BASE_PROPERTY = "MarcBibInstanceHridSetConsumer";

@Override
public void start(Promise<Void> startPromise) {
var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value());
var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value());
var marcBibConsumerWrapper = createConsumer(DI_SRS_MARC_BIB_INSTANCE_HRID_SET.value(), BASE_PROPERTY);
var marcHoldingsConsumerWrapper = createConsumer(DI_SRS_MARC_HOLDINGS_HOLDING_HRID_SET.value(), BASE_PROPERTY);

var holdingsCollectionService = new HoldingsCollectionService();
var instanceUpdateDelegate = new InstanceUpdateDelegate(getStorage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
public class QuickMarcConsumerVerticle extends KafkaConsumerVerticle {

private static final Logger LOGGER = LogManager.getLogger(QuickMarcConsumerVerticle.class);
private static final String LOAD_LIMIT_PROPERTY = "QuickMarcConsumer";
private static final String MAX_DISTRIBUTION_PROPERTY = "QuickMarcConsumerVerticle";

@Override
public void start(Promise<Void> startPromise) {
var precedingSucceedingTitlesHelper = new PrecedingSucceedingTitlesHelper(WebClient.wrap(getHttpClient()));
var holdingsCollectionService = new HoldingsCollectionService();
var handler = new QuickMarcKafkaHandler(vertx, getStorage(), getMaxDistributionNumber(),
var handler = new QuickMarcKafkaHandler(vertx, getStorage(), getMaxDistributionNumber(MAX_DISTRIBUTION_PROPERTY),
getKafkaConfig(), precedingSucceedingTitlesHelper, holdingsCollectionService);

var consumer = createConsumer(QMEventTypes.QM_SRS_MARC_RECORD_UPDATED.name());
var consumer = createConsumer(QMEventTypes.QM_SRS_MARC_RECORD_UPDATED.name(), LOAD_LIMIT_PROPERTY);

consumer.start(handler, ConsumerWrapperUtil.constructModuleName())
.map(consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
Expand Down Expand Up @@ -58,16 +57,16 @@ public void stop(Promise<Void> stopPromise) {

protected abstract Logger getLogger();

protected KafkaConsumerWrapper<String, String> createConsumer(String eventType) {
return createConsumer(eventType, true);
protected KafkaConsumerWrapper<String, String> createConsumer(String eventType, String loadLimitPropertyKey) {
return createConsumer(eventType, loadLimitPropertyKey, true);
}

protected KafkaConsumerWrapper<String, String> createConsumer(String eventType, boolean namespacedTopic) {
protected KafkaConsumerWrapper<String, String> createConsumer(String eventType, String loadLimitPropertyKey, boolean namespacedTopic) {
var kafkaConsumerWrapper = KafkaConsumerWrapper.<String, String>builder()
.context(context)
.vertx(vertx)
.kafkaConfig(getKafkaConfig())
.loadLimit(getLoadLimit())
.loadLimit(getLoadLimit(loadLimitPropertyKey))
.globalLoadSensor(new GlobalLoadSensor())
.subscriptionDefinition(getSubscriptionDefinition(getKafkaConfig().getEnvId(), eventType, namespacedTopic))
.build();
Expand Down Expand Up @@ -124,8 +123,8 @@ protected String getCacheEnvVariable(String variableName) {
return cacheExpirationTime;
}

protected int getMaxDistributionNumber() {
return getConsumerProperty(MAX_DISTRIBUTION_NUMBER_TEMPLATE, MAX_DISTRIBUTION_NUMBER_DEFAULT);
protected int getMaxDistributionNumber(String property) {
return getConsumerProperty(MAX_DISTRIBUTION_NUMBER_TEMPLATE, property, MAX_DISTRIBUTION_NUMBER_DEFAULT);
}

private JsonObject getConfig() {
Expand All @@ -152,14 +151,12 @@ private String formatSubscriptionPattern(String env, String eventType) {
return join("\\.", env, "\\w{1,}", eventType);
}

private int getLoadLimit() {
return getConsumerProperty(LOAD_LIMIT_TEMPLATE, LOAD_LIMIT_DEFAULT);
private int getLoadLimit(String propertyKey) {
return getConsumerProperty(LOAD_LIMIT_TEMPLATE, propertyKey, LOAD_LIMIT_DEFAULT);
}

private int getConsumerProperty(String nameTemplate, String defaultValue) {
var consumerClassName = getClass().getSimpleName();
var cleanConsumerName = consumerClassName.substring(0, consumerClassName.indexOf(Verticle.class.getSimpleName()));
return parseInt(getProperty(format(nameTemplate, cleanConsumerName), defaultValue));
private int getConsumerProperty(String nameTemplate, String propertyKey, String defaultValue) {
return parseInt(getProperty(format(nameTemplate, propertyKey), defaultValue));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import org.folio.inventory.MarcBibInstanceHridSetConsumerVerticle;
import org.folio.inventory.MarcHridSetConsumerVerticle;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -22,7 +22,7 @@
import static org.folio.inventory.dataimport.util.KafkaConfigConstants.KAFKA_REPLICATION_FACTOR;

@RunWith(VertxUnitRunner.class)
public class MarcBibInstanceHridSetConsumerVerticleTest {
public class MarcHridSetConsumerVerticleTest {

private static final String TENANT_ID = "diku";
private static final String KAFKA_ENV_NAME = "test-env";
Expand All @@ -46,7 +46,7 @@ public void shouldDeployVerticle(TestContext context) {
.setWorker(true);

Promise<String> promise = Promise.promise();
vertx.deployVerticle(MarcBibInstanceHridSetConsumerVerticle.class.getName(), options, promise);
vertx.deployVerticle(MarcHridSetConsumerVerticle.class.getName(), options, promise);

promise.future().onComplete(ar -> {
context.assertTrue(ar.succeeded());
Expand Down

0 comments on commit 13b647f

Please sign in to comment.