From 28ec2c6882c2fad0fffc7367d16e458dbd2e091c Mon Sep 17 00:00:00 2001 From: Ching-Hong Fang Date: Tue, 23 May 2023 19:40:02 +0800 Subject: [PATCH] [METRICS] Migrate topic collector to balancer, partitioiner (#1765) --- .../java/org/astraea/app/web/WebService.java | 71 ++++++++++----- .../org/astraea/app/web/TopicHandlerTest.java | 4 +- .../org/astraea/app/web/WebServiceTest.java | 86 ++++++++++++++++++- .../partitioner/StrictCostPartitioner.java | 57 ++++++++---- .../StrictCostPartitionerTest.java | 79 +++++++++++++++++ 5 files changed, 257 insertions(+), 40 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/WebService.java b/app/src/main/java/org/astraea/app/web/WebService.java index c81f2a7b36..354bb95cf1 100644 --- a/app/src/main/java/org/astraea/app/web/WebService.java +++ b/app/src/main/java/org/astraea/app/web/WebService.java @@ -24,17 +24,20 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import org.astraea.app.argument.DurationField; import org.astraea.app.argument.IntegerMapField; import org.astraea.app.argument.NonNegativeIntegerField; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Broker; import org.astraea.common.admin.NodeInfo; import org.astraea.common.metrics.JndiClient; import org.astraea.common.metrics.MBeanClient; @@ -42,6 +45,10 @@ import org.astraea.common.metrics.collector.MetricStore; public class WebService implements AutoCloseable { + public static final String METRIC_STORE_KEY = "metric.store"; + public static final String METRIC_STORE_LOCAL = "local"; + public static final String METRIC_STORE_TOPIC = "topic"; + public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; private final HttpServer server; private final Admin admin; @@ -51,32 +58,48 @@ public WebService( Admin admin, int port, Function brokerIdToJmxPort, - Duration beanExpiration) { + Duration beanExpiration, + Configuration config) { this.admin = admin; - Supplier>> clientSupplier = + Supplier>> sensorsSupplier = () -> - admin - .brokers() - .thenApply( - brokers -> - brokers.stream() - .collect( - Collectors.toUnmodifiableMap( - NodeInfo::id, - b -> - JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id()))))); + sensors.metricSensors().stream() + .distinct() + .collect( + Collectors.toUnmodifiableMap(Function.identity(), ignored -> (id, ee) -> {})); + + List receivers = + switch (config.string(METRIC_STORE_KEY).orElse(METRIC_STORE_LOCAL)) { + case METRIC_STORE_LOCAL -> { + Function, Map> asBeanClientMap = + brokers -> + brokers.stream() + .collect( + Collectors.toUnmodifiableMap( + NodeInfo::id, + b -> JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))); + yield List.of( + MetricStore.Receiver.local(() -> admin.brokers().thenApply(asBeanClientMap))); + } + case METRIC_STORE_TOPIC -> List.of( + MetricStore.Receiver.topic(config.requireString(BOOTSTRAP_SERVERS_KEY)), + MetricStore.Receiver.local( + () -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local())))); + default -> throw new IllegalArgumentException( + "unknown metric store type: " + + config.string(METRIC_STORE_KEY) + + ". use " + + METRIC_STORE_LOCAL + + " or " + + METRIC_STORE_TOPIC); + }; var metricStore = MetricStore.builder() .beanExpiration(beanExpiration) - .receivers(List.of(MetricStore.Receiver.local(clientSupplier))) - .sensorsSupplier( - () -> - sensors.metricSensors().stream() - .distinct() - .collect( - Collectors.toUnmodifiableMap( - Function.identity(), ignored -> (id, ee) -> {}))) + .receivers(receivers) + .sensorsSupplier(sensorsSupplier) .build(); + server = Utils.packException(() -> HttpServer.create(new InetSocketAddress(port), 0)); server.createContext("/topics", to(new TopicHandler(admin))); server.createContext("/groups", to(new GroupHandler(admin))); @@ -109,7 +132,11 @@ public static void main(String[] args) throws Exception { throw new IllegalArgumentException("you must define either --jmx.port or --jmx.ports"); try (var service = new WebService( - Admin.of(arg.configs()), arg.port, arg::jmxPortMapping, arg.beanExpiration)) { + Admin.of(arg.configs()), + arg.port, + arg::jmxPortMapping, + arg.beanExpiration, + new Configuration(arg.configs()))) { if (arg.ttl == null) { System.out.println("enter ctrl + c to terminate web service"); TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE); diff --git a/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java b/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java index 972394489e..ce8569ee66 100644 --- a/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/TopicHandlerTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.common.admin.TopicPartition; @@ -69,7 +70,8 @@ void testWithWebService() { Admin.of(SERVICE.bootstrapServers()), 0, id -> SERVICE.jmxServiceURL().getPort(), - Duration.ofMillis(5))) { + Duration.ofMillis(5), + Configuration.EMPTY)) { Response response = HttpExecutor.builder() .build() diff --git a/app/src/test/java/org/astraea/app/web/WebServiceTest.java b/app/src/test/java/org/astraea/app/web/WebServiceTest.java index d40700a0b8..5ba689465a 100644 --- a/app/src/test/java/org/astraea/app/web/WebServiceTest.java +++ b/app/src/test/java/org/astraea/app/web/WebServiceTest.java @@ -17,13 +17,18 @@ package org.astraea.app.web; import java.time.Duration; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import org.astraea.app.argument.Argument; +import org.astraea.common.Configuration; import org.astraea.common.admin.Admin; +import org.astraea.common.metrics.collector.MetricStore; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; public class WebServiceTest { @@ -40,7 +45,9 @@ void testArgument() { @Timeout(10) @Test void testClose() { - var web = new WebService(Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5)); + var web = + new WebService( + Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5), Configuration.EMPTY); web.close(); } @@ -84,4 +91,81 @@ void testJmxPort() { Assertions.assertThrows( IllegalArgumentException.class, () -> noDefaultArgument.jmxPortMapping(4)); } + + @Test + void testMetricStoreConfiguration() { + try (var mockedReceiver = Mockito.mockStatic(MetricStore.Receiver.class)) { + var topicReceiverCount = new AtomicInteger(0); + var localReceiverCount = new AtomicInteger(0); + mockedReceiver + .when(() -> MetricStore.Receiver.topic(Mockito.any())) + .then( + (Answer) + invocation -> { + topicReceiverCount.incrementAndGet(); + return Mockito.mock(MetricStore.Receiver.class); + }); + mockedReceiver + .when(() -> MetricStore.Receiver.local(Mockito.any())) + .then( + (Answer) + invocation -> { + localReceiverCount.incrementAndGet(); + return Mockito.mock(MetricStore.Receiver.class); + }); + // Test default metric store configuration + try (var web = + new WebService( + Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5), Configuration.EMPTY)) { + + Assertions.assertEquals(1, localReceiverCount.get()); + Assertions.assertEquals(0, topicReceiverCount.get()); + } + localReceiverCount.set(0); + topicReceiverCount.set(0); + // Test local metric store configuration + try (var web = + new WebService( + Mockito.mock(Admin.class), + 0, + id -> -1, + Duration.ofMillis(5), + new Configuration( + Map.of(WebService.METRIC_STORE_KEY, WebService.METRIC_STORE_LOCAL)))) { + + Assertions.assertEquals(1, localReceiverCount.get()); + Assertions.assertEquals(0, topicReceiverCount.get()); + } + localReceiverCount.set(0); + topicReceiverCount.set(0); + // Test topic metric store configuration + try (var web = + new WebService( + Mockito.mock(Admin.class), + 0, + id -> -1, + Duration.ofMillis(5), + new Configuration( + Map.of( + WebService.METRIC_STORE_KEY, + WebService.METRIC_STORE_TOPIC, + WebService.BOOTSTRAP_SERVERS_KEY, + "ignore")))) { + + // topic collector may create local receiver to receive local jmx metric + Assertions.assertEquals(1, topicReceiverCount.get()); + } + + // Test invalid metric store configuration + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + new WebService( + Mockito.mock(Admin.class), + 0, + id -> -1, + Duration.ofMillis(5), + new Configuration(Map.of(WebService.METRIC_STORE_KEY, "unknown")))); + } + } } diff --git a/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java b/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java index 8b9fa570db..e06a2983fe 100644 --- a/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java +++ b/common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.function.Supplier; @@ -38,6 +39,7 @@ import org.astraea.common.metrics.JndiClient; import org.astraea.common.metrics.MBeanClient; import org.astraea.common.metrics.collector.MetricStore; +import org.astraea.common.producer.ProducerConfigs; /** * this partitioner scores the nodes by multiples cost functions. Each function evaluate the target @@ -55,6 +57,9 @@ * `org.astraea.cost.ThroughputCost=1,org.astraea.cost.broker.BrokerOutputCost=1`. */ public class StrictCostPartitioner extends Partitioner { + public static final String METRIC_STORE_KEY = "metric.store"; + public static final String METRIC_STORE_TOPIC = "topic"; + public static final String METRIC_STORE_LOCAL = "local"; static final int ROUND_ROBIN_LENGTH = 400; static final String JMX_PORT = "jmx.port"; static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease"; @@ -140,27 +145,47 @@ public void configure(Configuration config) { .string(ROUND_ROBIN_LEASE_KEY) .map(Utils::toDuration) .ifPresent(d -> this.roundRobinLease = d); - Supplier>> clientSupplier = - () -> - admin - .brokers() - .thenApply( - brokers -> { - var map = new HashMap(); - brokers.forEach( - b -> - map.put( - b.id(), JndiClient.of(b.host(), jmxPortGetter.apply(b.id())))); - // add local client to fetch consumer metrics - map.put(-1, JndiClient.local()); - return Collections.unmodifiableMap(map); - }); + List receivers = + switch (config.string(METRIC_STORE_KEY).orElse(METRIC_STORE_LOCAL)) { + case METRIC_STORE_TOPIC -> List.of( + MetricStore.Receiver.topic( + config.requireString(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG)), + MetricStore.Receiver.local( + () -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local())))); + case METRIC_STORE_LOCAL -> { + Supplier>> clientSupplier = + () -> + admin + .brokers() + .thenApply( + brokers -> { + var map = new HashMap(); + brokers.forEach( + b -> + map.put( + b.id(), + JndiClient.of(b.host(), jmxPortGetter.apply(b.id())))); + // add local client to fetch consumer metrics + map.put(-1, JndiClient.local()); + return Collections.unmodifiableMap(map); + }); + yield List.of(MetricStore.Receiver.local(clientSupplier)); + } + default -> throw new IllegalArgumentException( + "unknown metric store type: " + + config.string(METRIC_STORE_KEY) + + ". Use " + + METRIC_STORE_TOPIC + + " or " + + METRIC_STORE_LOCAL); + }; metricStore = MetricStore.builder() - .receivers(List.of(MetricStore.Receiver.local(clientSupplier))) + .receivers(receivers) .sensorsSupplier(() -> Map.of(this.costFunction.metricSensor(), (integer, e) -> {})) .build(); + this.roundRobinKeeper = RoundRobinKeeper.of(ROUND_ROBIN_LENGTH, roundRobinLease); } diff --git a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java index 4912bb8517..233be18563 100644 --- a/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/StrictCostPartitionerTest.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.common.Configuration; @@ -36,9 +37,12 @@ import org.astraea.common.cost.NodeThroughputCost; import org.astraea.common.cost.ReplicaLeaderCost; import org.astraea.common.metrics.ClusterBean; +import org.astraea.common.metrics.collector.MetricStore; +import org.astraea.common.producer.ProducerConfigs; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; public class StrictCostPartitionerTest { @@ -291,4 +295,79 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) { } } } + + /** Test if the partitioner use correct metric store */ + @Test + void testMetricStoreConfigure() { + try (var mockedReceiver = Mockito.mockStatic(MetricStore.Receiver.class)) { + var topicReceiverCount = new AtomicInteger(0); + var localReceiverCount = new AtomicInteger(0); + mockedReceiver + .when(() -> MetricStore.Receiver.topic(Mockito.any())) + .then( + (Answer) + invocation -> { + topicReceiverCount.incrementAndGet(); + return Mockito.mock(MetricStore.Receiver.class); + }); + mockedReceiver + .when(() -> MetricStore.Receiver.local(Mockito.any())) + .then( + (Answer) + invocation -> { + localReceiverCount.incrementAndGet(); + return Mockito.mock(MetricStore.Receiver.class); + }); + + try (var partitioner = new StrictCostPartitioner()) { + // Check default metric store + var config = + new Configuration(Map.of(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")); + partitioner.configure(config); + Assertions.assertNotEquals(0, localReceiverCount.get()); + Assertions.assertEquals(0, topicReceiverCount.get()); + + // Check topic metric store + localReceiverCount.set(0); + topicReceiverCount.set(0); + config = + new Configuration( + Map.of( + ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG, + "localhost:9092", + StrictCostPartitioner.METRIC_STORE_KEY, + StrictCostPartitioner.METRIC_STORE_TOPIC)); + partitioner.configure(config); + Assertions.assertNotEquals(0, topicReceiverCount.get()); + // topic collector may use local receiver to get local jmx metric + + // Check local metric store + topicReceiverCount.set(0); + localReceiverCount.set(0); + config = + new Configuration( + Map.of( + ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG, + "localhost:9092", + StrictCostPartitioner.METRIC_STORE_KEY, + StrictCostPartitioner.METRIC_STORE_LOCAL)); + partitioner.configure(config); + Assertions.assertNotEquals(0, localReceiverCount.get()); + Assertions.assertEquals(0, topicReceiverCount.get()); + + // Check unknown metric store + localReceiverCount.set(0); + topicReceiverCount.set(0); + var config2 = + new Configuration( + Map.of( + ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG, + "localhost:9092", + StrictCostPartitioner.METRIC_STORE_KEY, + "unknown")); + Assertions.assertThrows( + IllegalArgumentException.class, () -> partitioner.configure(config2)); + } + } + } }