diff --git a/src/main/java/org/folio/circulation/EventConsumerVerticle.java b/src/main/java/org/folio/circulation/EventConsumerVerticle.java index 2d35a4819d..c02648aa72 100644 --- a/src/main/java/org/folio/circulation/EventConsumerVerticle.java +++ b/src/main/java/org/folio/circulation/EventConsumerVerticle.java @@ -85,7 +85,7 @@ private Future createConsumers() { return Future.all(List.of( createConsumer(CIRCULATION_RULES_UPDATED, new CirculationRulesUpdateEventHandler(), // puts consumers into separate groups so that they all receive the same event - new UniqueKafkaModuleIdProvider(vertx, kafkaConfig, CIRCULATION_RULES_UPDATED)) + new UniqueKafkaModuleIdProvider(vertx, CIRCULATION_RULES_UPDATED)) )).mapEmpty(); } diff --git a/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java b/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java index f4925580ab..4ca312c9ac 100644 --- a/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java +++ b/src/main/java/org/folio/circulation/services/events/UniqueKafkaModuleIdProvider.java @@ -2,17 +2,16 @@ import static java.util.Comparator.comparing; import static java.util.stream.Collectors.toMap; -import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.folio.kafka.KafkaTopicNameHelper.formatGroupName; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.function.Consumer; import java.util.function.UnaryOperator; import org.folio.circulation.domain.events.DomainEventType; import org.folio.kafka.KafkaConfig; +import org.folio.kafka.services.KafkaEnvironmentProperties; import io.vertx.core.Future; import io.vertx.core.Vertx; @@ -26,9 +25,14 @@ public class UniqueKafkaModuleIdProvider implements ModuleIdProvider { private final KafkaAdminClient kafkaAdminClient; private final DomainEventType eventType; - public UniqueKafkaModuleIdProvider(Vertx vertx, KafkaConfig kafkaConfig, DomainEventType eventType) { - Properties config = new Properties(); - config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaUrl()); + public UniqueKafkaModuleIdProvider(Vertx vertx, DomainEventType eventType) { + Map config = KafkaConfig.builder() + .kafkaHost(KafkaEnvironmentProperties.host()) + .kafkaPort(KafkaEnvironmentProperties.port()) + .build() + .getProducerProps(); + + log.info("UniqueKafkaModuleIdProvider:: KafkaAdminClient config: {}", config); this.kafkaAdminClient = KafkaAdminClient.create(vertx, config); this.eventType = eventType;