Skip to content

Commit

Permalink
CIRC-2037 Fix KafkaAdminClient config - add properties (#1469)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderkurash authored Apr 29, 2024
1 parent 0542fbe commit 6e24f85
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Future<Void> createConsumers() {
return Future.all(List.of(
createConsumer(CIRCULATION_RULES_UPDATED, new CirculationRulesUpdateEventHandler(),
// puts consumers into separate groups so that they all receive the same event
new UniqueKafkaModuleIdProvider(vertx, kafkaConfig, CIRCULATION_RULES_UPDATED))
new UniqueKafkaModuleIdProvider(vertx, CIRCULATION_RULES_UPDATED))
)).mapEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.folio.kafka.KafkaTopicNameHelper.formatGroupName;

import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;

import org.folio.circulation.domain.events.DomainEventType;
import org.folio.kafka.KafkaConfig;
import org.folio.kafka.services.KafkaEnvironmentProperties;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand All @@ -26,9 +25,14 @@ public class UniqueKafkaModuleIdProvider implements ModuleIdProvider {
private final KafkaAdminClient kafkaAdminClient;
private final DomainEventType eventType;

public UniqueKafkaModuleIdProvider(Vertx vertx, KafkaConfig kafkaConfig, DomainEventType eventType) {
Properties config = new Properties();
config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaUrl());
public UniqueKafkaModuleIdProvider(Vertx vertx, DomainEventType eventType) {
Map<String, String> config = KafkaConfig.builder()
.kafkaHost(KafkaEnvironmentProperties.host())
.kafkaPort(KafkaEnvironmentProperties.port())
.build()
.getProducerProps();

log.info("UniqueKafkaModuleIdProvider:: KafkaAdminClient config: {}", config);

this.kafkaAdminClient = KafkaAdminClient.create(vertx, config);
this.eventType = eventType;
Expand Down

0 comments on commit 6e24f85

Please sign in to comment.