Skip to content

Commit

Permalink
[COMMON] use MetricsReporter to collect metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Oct 22, 2024
1 parent a475815 commit 520225e
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static List<ConsumerThread> create(
return IntStream.range(0, consumers)
.mapToObj(
index -> {
var clientId = "consumer-" + index;
var clientId = Performance.CLIENT_ID_PREFIX + "-consumer-" + index;
var consumer = consumerSupplier.apply(clientId, new PartitionRatioListener(clientId));
var closed = new AtomicBoolean(false);
var closeLatch = closeLatches.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@

/** see docs/performance_benchmark.md for man page */
public class Performance {
static final String CLIENT_ID_PREFIX = "Performance";

/** Used in Automation, to achieve the end of one Performance and then start another. */
public static void main(String[] args) {
execute(Performance.Argument.parse(new Argument(), args));
Expand Down Expand Up @@ -268,17 +270,19 @@ String partitioner() {
validateWith = PositiveLongField.class)
int transactionSize = 1;

Producer<byte[], byte[]> createProducer() {
Producer<byte[], byte[]> createProducer(String clientId) {
return transactionSize > 1
? Producer.builder()
.configs(configs())
.bootstrapServers(bootstrapServers())
.config(ProducerConfigs.PARTITIONER_CLASS_CONFIG, partitioner())
.config(ProducerConfigs.CLIENT_ID_CONFIG, clientId)
.buildTransactional()
: Producer.builder()
.configs(configs())
.bootstrapServers(bootstrapServers())
.config(ProducerConfigs.PARTITIONER_CLASS_CONFIG, partitioner())
.config(ProducerConfigs.CLIENT_ID_CONFIG, clientId)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Utils;
Expand All @@ -46,7 +46,7 @@ public interface ProducerThread extends AbstractThread {

static List<ProducerThread> create(
List<ArrayBlockingQueue<List<Record<byte[], byte[]>>>> queues,
Supplier<Producer<byte[], byte[]>> producerSupplier,
Function<String, Producer<byte[], byte[]>> producerSupplier,
int interdependent) {
var producers = queues.size();
if (producers <= 0) return List.of();
Expand All @@ -66,9 +66,10 @@ static List<ProducerThread> create(
return IntStream.range(0, producers)
.mapToObj(
index -> {
var clientId = Performance.CLIENT_ID_PREFIX + "-producer-" + index;
var closeLatch = closeLatches.get(index);
var closed = new AtomicBoolean(false);
var producer = producerSupplier.get();
var producer = producerSupplier.apply(clientId);
var queue = queues.get(index);
var sensor = Sensor.builder().addStat(AVG_PROPERTY, Avg.of()).build();
// export the custom jmx for report thread
Expand Down
2 changes: 2 additions & 0 deletions app/src/main/java/org/astraea/app/performance/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static long recordsConsumedTotal() {
static List<Report> consumers() {

return ConsumerMetrics.fetch(JndiClient.local()).stream()
.filter(m -> m.clientId().startsWith(Performance.CLIENT_ID_PREFIX))
.map(
m ->
new Report() {
Expand Down Expand Up @@ -92,6 +93,7 @@ public Optional<Double> e2eLatency() {

static List<Report> producers() {
return ProducerMetrics.producer(JndiClient.local()).stream()
.filter(m -> m.clientId().startsWith(Performance.CLIENT_ID_PREFIX))
.map(
m ->
new Report() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.astraea.common.admin.Broker;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.collector.MetricFetcher;
import org.astraea.common.metrics.collector.MetricSender;

/** Keep fetching all kinds of metrics and publish to inner topics. */
public class MetricPublisher {
Expand All @@ -44,7 +45,7 @@ public static void main(String[] args) {
// Valid for testing
static void execute(Arguments arguments) {
try (var admin = Admin.of(arguments.bootstrapServers());
var topicSender = MetricFetcher.Sender.topic(arguments.bootstrapServers());
var topicSender = MetricSender.topic(arguments.bootstrapServers());
var metricFetcher =
MetricFetcher.builder()
.clientSupplier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.astraea.common.Utils;
import org.astraea.common.metrics.BeanObject;

public class LocalSenderReceiver implements MetricFetcher.Sender, MetricStore.Receiver {
public class LocalSenderReceiver implements MetricSender, MetricStore.Receiver {

public static LocalSenderReceiver of() {
return new LocalSenderReceiver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.FutureUtils;
import org.astraea.common.Utils;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.BeanQuery;
Expand All @@ -53,9 +52,6 @@
import org.astraea.common.metrics.connector.ConnectorMetrics;
import org.astraea.common.metrics.platform.HostMetrics;
import org.astraea.common.metrics.stats.Sum;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.Serializer;

public interface MetricFetcher extends AutoCloseable {

Expand Down Expand Up @@ -91,54 +87,13 @@ static Builder builder() {
@Override
void close();

interface Sender extends AutoCloseable {

static Sender local() {
return LocalSenderReceiver.of();
}

static Sender topic(String bootstrapServer) {
var producer =
Producer.builder()
.bootstrapServers(bootstrapServer)
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.BEAN_OBJECT)
.build();
String METRIC_TOPIC = "__metrics";
return new Sender() {
@Override
public CompletionStage<Void> send(int id, Collection<BeanObject> beans) {
var records =
beans.stream()
.map(bean -> Record.builder().topic(METRIC_TOPIC).key(id).value(bean).build())
.toList();
return FutureUtils.sequence(
producer.send(records).stream()
.map(CompletionStage::toCompletableFuture)
.toList())
.thenAccept(ignored -> {});
}

@Override
public void close() {
producer.close();
}
};
}

CompletionStage<Void> send(int id, Collection<BeanObject> beans);

@Override
default void close() {}
}

class Builder {

private int threads = 4;

private Duration fetchBeanDelay = Duration.ofSeconds(1);
private Duration fetchMetadataDelay = Duration.ofMinutes(5);
private Sender sender;
private MetricSender sender;
private Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier;

private Builder() {}
Expand All @@ -158,7 +113,7 @@ public Builder fetchMetadataDelay(Duration fetchMetadataDelay) {
return this;
}

public Builder sender(Sender sender) {
public Builder sender(MetricSender sender) {
this.sender = sender;
return this;
}
Expand Down Expand Up @@ -197,7 +152,7 @@ class MetricFetcherImpl implements MetricFetcher {

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final Sender sender;
private final MetricSender sender;

private final ExecutorService executor;

Expand All @@ -211,7 +166,7 @@ private MetricFetcherImpl(
int threads,
Duration fetchBeanDelay,
Duration fetchMetadataDelay,
Sender sender,
MetricSender sender,
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier) {
this.fetchBeanDelay = fetchBeanDelay;
this.sender = sender;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.collector;

import java.util.Collection;
import java.util.concurrent.CompletionStage;
import org.astraea.common.FutureUtils;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.ProducerConfigs;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.Serializer;

public interface MetricSender extends AutoCloseable {

static MetricSender local() {
return LocalSenderReceiver.of();
}

static MetricSender topic(String bootstrapServer) {
var producer =
Producer.builder()
.bootstrapServers(bootstrapServer)
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.BEAN_OBJECT)
.config(ProducerConfigs.ACKS_CONFIG, "0")
.config(ProducerConfigs.ENABLE_METRICS_PUSH_CONFIG, "false")
.config(ProducerConfigs.COMPRESSION_TYPE_CONFIG, "gzip")
.build();
String METRIC_TOPIC = "__metrics";
return new MetricSender() {
@Override
public CompletionStage<Void> send(int id, Collection<BeanObject> beans) {
var records =
beans.stream()
.map(bean -> Record.builder().topic(METRIC_TOPIC).key(id).value(bean).build())
.toList();
return FutureUtils.sequence(
producer.send(records).stream().map(CompletionStage::toCompletableFuture).toList())
.thenAccept(ignored -> {});
}

@Override
public void close() {
producer.close();
}
};
}

CompletionStage<Void> send(int id, Collection<BeanObject> beans);

@Override
default void close() {}
}
Loading

0 comments on commit 520225e

Please sign in to comment.