Skip to content

Commit

Permalink
MODINV-986: instance_ingest -> instance_ingress
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Apr 24, 2024
1 parent 85cf374 commit 07d83c8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
import io.vertx.core.Promise;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.inventory.handler.InstanceIngestEventHandler;
import org.folio.inventory.handler.InstanceIngressEventHandler;
import org.folio.inventory.support.KafkaConsumerVerticle;

public class InstanceIngestConsumerVerticle extends KafkaConsumerVerticle {
public class InstanceIngressConsumerVerticle extends KafkaConsumerVerticle {

private static final String INSTANCE_INGEST_TOPIC = "inventory.instance_ingest";
private static final Logger LOGGER = LogManager.getLogger(InstanceIngestConsumerVerticle.class);
private static final String INSTANCE_INGRESS_TOPIC = "inventory.instance_ingress";
private static final Logger LOGGER = LogManager.getLogger(InstanceIngressConsumerVerticle.class);

@Override
public void start(Promise<Void> startPromise) {
var instanceIngestEventHandler = new InstanceIngestEventHandler();
var instanceIngressEventHandler = new InstanceIngressEventHandler();

var consumerWrapper = createConsumer(INSTANCE_INGEST_TOPIC);
var consumerWrapper = createConsumer(INSTANCE_INGRESS_TOPIC);

consumerWrapper.start(instanceIngestEventHandler, constructModuleName())
consumerWrapper.start(instanceIngressEventHandler, constructModuleName())
.onFailure(startPromise::fail)
.onSuccess(ar -> startPromise.complete());
}
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/org/folio/inventory/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class Launcher {
private static final String QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.QuickMarcConsumerVerticle.instancesNumber";
private static final String MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG = "inventory.kafka.MarcBibUpdateConsumerVerticle.instancesNumber";
private static final String CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG = "inventory.kafka.ConsortiumInstanceSharingConsumerVerticle.instancesNumber";
private static final String INSTANCE_INGEST_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngestConsumerVerticle.instancesNumber";
private static final String INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG = "inventory.kafka.InstanceIngressConsumerVerticle.instancesNumber";
private static final VertxAssistant vertxAssistant = new VertxAssistant();

private static String inventoryModuleDeploymentId;
Expand All @@ -34,7 +34,7 @@ public class Launcher {
private static String quickMarcConsumerVerticleDeploymentId;
private static String marcBibUpdateConsumerVerticleDeploymentId;
private static String consortiumInstanceSharingVerticleDeploymentId;
private static String instanceIngestConsumerVerticleDeploymentId;
private static String instanceIngressConsumerVerticleDeploymentId;

public static void main(String[] args)
throws InterruptedException, ExecutionException, TimeoutException {
Expand Down Expand Up @@ -98,7 +98,7 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
int quickMarcConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(QUICK_MARC_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "1"));
int marcBibUpdateConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(MARC_BIB_UPDATE_CONSUMER_VERTICLE_INSTANCES_NUMBER_CONFIG, "3"));
int consortiumInstanceSharingVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(CONSORTIUM_INSTANCE_SHARING_CONSUMER_VERTICLE_NUMBER_CONFIG, "3"));
int instanceIngestConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGEST_VERTICLE_NUMBER_CONFIG, "3"));
int instanceIngressConsumerVerticleNumber = Integer.parseInt(System.getenv().getOrDefault(INSTANCE_INGRESS_VERTICLE_NUMBER_CONFIG, "3"));

CompletableFuture<String> future1 = new CompletableFuture<>();
CompletableFuture<String> future2 = new CompletableFuture<>();
Expand All @@ -116,15 +116,15 @@ private static void startConsumerVerticles(Map<String, Object> consumerVerticles
consumerVerticlesConfig, marcBibUpdateConsumerVerticleNumber, future4);
vertxAssistant.deployVerticle(ConsortiumInstanceSharingConsumerVerticle.class.getName(),
consumerVerticlesConfig, consortiumInstanceSharingVerticleNumber, future5);
vertxAssistant.deployVerticle(InstanceIngestConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceIngestConsumerVerticleNumber, future6);
vertxAssistant.deployVerticle(InstanceIngressConsumerVerticle.class.getName(),
consumerVerticlesConfig, instanceIngressConsumerVerticleNumber, future6);

consumerVerticleDeploymentId = future1.get(20, TimeUnit.SECONDS);
marcInstHridSetConsumerVerticleDeploymentId = future2.get(20, TimeUnit.SECONDS);
quickMarcConsumerVerticleDeploymentId = future3.get(20, TimeUnit.SECONDS);
marcBibUpdateConsumerVerticleDeploymentId = future4.get(20, TimeUnit.SECONDS);
consortiumInstanceSharingVerticleDeploymentId = future5.get(20, TimeUnit.SECONDS);
instanceIngestConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
instanceIngressConsumerVerticleDeploymentId = future6.get(20, TimeUnit.SECONDS);
}

private static void stop() {
Expand All @@ -140,7 +140,8 @@ private static void stop() {
.thenCompose(v -> vertxAssistant.undeployVerticle(quickMarcConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(marcBibUpdateConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(consortiumInstanceSharingVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(instanceIngestConsumerVerticleDeploymentId))
.thenCompose(v -> vertxAssistant.undeployVerticle(
instanceIngressConsumerVerticleDeploymentId))
.thenAccept(v -> vertxAssistant.stop(stopped));

stopped.thenAccept(v -> log.info("Server Stopped"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
import org.apache.logging.log4j.Logger;
import org.folio.kafka.AsyncRecordHandler;

public class InstanceIngestEventHandler implements AsyncRecordHandler<String, String> {
public class InstanceIngressEventHandler implements AsyncRecordHandler<String, String> {

private static final Logger LOGGER = LogManager.getLogger(InstanceIngestEventHandler.class);
private static final Logger LOGGER = LogManager.getLogger(
InstanceIngressEventHandler.class);

@Override
public Future<String> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
// to extract and re-use common logic from CreateInstanceEventHandler
// 1. Change event; 2. Re-use all except: source type to be changed to BIBFRAME, DI event not to be sent
LOGGER.info("to be replaced with actual code in Step 2 of MODINV-986");

return null;
}

Expand Down

0 comments on commit 07d83c8

Please sign in to comment.