Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Oct 22, 2024
1 parent a475815 commit a2d829b
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.astraea.common.admin.Broker;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.collector.MetricFetcher;
import org.astraea.common.metrics.collector.MetricSender;

/** Keep fetching all kinds of metrics and publish to inner topics. */
public class MetricPublisher {
Expand All @@ -44,7 +45,7 @@ public static void main(String[] args) {
// Valid for testing
static void execute(Arguments arguments) {
try (var admin = Admin.of(arguments.bootstrapServers());
var topicSender = MetricFetcher.Sender.topic(arguments.bootstrapServers());
var topicSender = MetricSender.topic(arguments.bootstrapServers());
var metricFetcher =
MetricFetcher.builder()
.clientSupplier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.astraea.common.Utils;
import org.astraea.common.metrics.BeanObject;

public class LocalSenderReceiver implements MetricFetcher.Sender, MetricStore.Receiver {
public class LocalSenderReceiver implements MetricSender, MetricStore.Receiver {

public static LocalSenderReceiver of() {
return new LocalSenderReceiver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.FutureUtils;
import org.astraea.common.Utils;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.BeanQuery;
Expand All @@ -53,9 +52,6 @@
import org.astraea.common.metrics.connector.ConnectorMetrics;
import org.astraea.common.metrics.platform.HostMetrics;
import org.astraea.common.metrics.stats.Sum;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.Serializer;

public interface MetricFetcher extends AutoCloseable {

Expand Down Expand Up @@ -91,54 +87,13 @@ static Builder builder() {
@Override
void close();

interface Sender extends AutoCloseable {

static Sender local() {
return LocalSenderReceiver.of();
}

static Sender topic(String bootstrapServer) {
var producer =
Producer.builder()
.bootstrapServers(bootstrapServer)
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.BEAN_OBJECT)
.build();
String METRIC_TOPIC = "__metrics";
return new Sender() {
@Override
public CompletionStage<Void> send(int id, Collection<BeanObject> beans) {
var records =
beans.stream()
.map(bean -> Record.builder().topic(METRIC_TOPIC).key(id).value(bean).build())
.toList();
return FutureUtils.sequence(
producer.send(records).stream()
.map(CompletionStage::toCompletableFuture)
.toList())
.thenAccept(ignored -> {});
}

@Override
public void close() {
producer.close();
}
};
}

CompletionStage<Void> send(int id, Collection<BeanObject> beans);

@Override
default void close() {}
}

class Builder {

private int threads = 4;

private Duration fetchBeanDelay = Duration.ofSeconds(1);
private Duration fetchMetadataDelay = Duration.ofMinutes(5);
private Sender sender;
private MetricSender sender;
private Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier;

private Builder() {}
Expand All @@ -158,7 +113,7 @@ public Builder fetchMetadataDelay(Duration fetchMetadataDelay) {
return this;
}

public Builder sender(Sender sender) {
public Builder sender(MetricSender sender) {
this.sender = sender;
return this;
}
Expand Down Expand Up @@ -197,7 +152,7 @@ class MetricFetcherImpl implements MetricFetcher {

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final Sender sender;
private final MetricSender sender;

private final ExecutorService executor;

Expand All @@ -211,7 +166,7 @@ private MetricFetcherImpl(
int threads,
Duration fetchBeanDelay,
Duration fetchMetadataDelay,
Sender sender,
MetricSender sender,
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier) {
this.fetchBeanDelay = fetchBeanDelay;
this.sender = sender;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.collector;

import java.util.Collection;
import java.util.concurrent.CompletionStage;
import org.astraea.common.FutureUtils;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.ProducerConfigs;
import org.astraea.common.producer.Record;
import org.astraea.common.producer.Serializer;

public interface MetricSender extends AutoCloseable {

static MetricSender local() {
return LocalSenderReceiver.of();
}

static MetricSender topic(String bootstrapServer) {
var producer =
Producer.builder()
.bootstrapServers(bootstrapServer)
.keySerializer(Serializer.INTEGER)
.valueSerializer(Serializer.BEAN_OBJECT)
.config(ProducerConfigs.ACKS_CONFIG, "0")
.build();
String METRIC_TOPIC = "__metrics";
return new MetricSender() {
@Override
public CompletionStage<Void> send(int id, Collection<BeanObject> beans) {
var records =
beans.stream()
.map(bean -> Record.builder().topic(METRIC_TOPIC).key(id).value(bean).build())
.toList();
return FutureUtils.sequence(
producer.send(records).stream().map(CompletionStage::toCompletableFuture).toList())
.thenAccept(ignored -> {});
}

@Override
public void close() {
producer.close();
}
};
}

CompletionStage<Void> send(int id, Collection<BeanObject> beans);

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ static Builder builder() {
void close();

interface Receiver extends AutoCloseable {

static MetricFetcher.Sender local() {
return LocalSenderReceiver.of();
}

static Receiver fixed(Map<Integer, Collection<BeanObject>> beans) {
return new Receiver() {
private final AtomicBoolean done = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.collector;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.astraea.common.metrics.BeanQuery;
import org.astraea.common.metrics.JndiClient;

public class ServerMetricFetcher implements MetricsReporter, ClientTelemetry {
private static final String BOOTSTRAP_SERVERS = "server.metric.fetcher.bootstrap.servers";
private String bootstrapServers;
private int nodeId = -1;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>(1);

@Override
public void init(List<KafkaMetric> list) {}

@Override
public void metricChange(KafkaMetric kafkaMetric) {}

@Override
public void metricRemoval(KafkaMetric kafkaMetric) {}

@Override
public void close() {
closed.set(true);
queue.offer(true);
}

@Override
public void configure(Map<String, ?> map) {
if (!map.containsKey(BOOTSTRAP_SERVERS))
throw new RuntimeException(BOOTSTRAP_SERVERS + " is required");
if (!map.containsKey("node.id")) throw new RuntimeException("node.id is required");
this.bootstrapServers = map.get(BOOTSTRAP_SERVERS).toString();
nodeId = Integer.parseInt(map.get("node.id").toString());
CompletableFuture.runAsync(
() -> {
MetricSender sender = null;
var lastSent = System.currentTimeMillis();
try {
while (!closed.get()) {
var done = queue.poll(3, TimeUnit.SECONDS);
if (done == null) done = false;
if (done) return;
if (System.currentTimeMillis() - lastSent <= Duration.ofSeconds(3).toMillis())
continue;
if (sender == null) sender = MetricSender.topic(bootstrapServers);
var beans = JndiClient.local().beans(BeanQuery.all());
System.out.println("[CHIA] send " + beans.size() + " beans");
sender.send(nodeId, beans);
lastSent = System.currentTimeMillis();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (sender != null) sender.close();
}
});
}

@Override
public ClientTelemetryReceiver clientReceiver() {
System.out.println("[CHIA] create ClientTelemetryReceiver");
return (__, ___) -> {
queue.offer(false);
System.out.println("[CHIA] receive ClientTelemetryReceiver");
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void testPublishAndClose() {
var beans = List.of(new BeanObject(Utils.randomString(), Map.of(), Map.of()));
var client = Mockito.mock(JndiClient.class);
Mockito.when(client.beans(Mockito.any(), Mockito.any())).thenReturn(beans);
var sender = Mockito.mock(MetricFetcher.Sender.class);
var sender = Mockito.mock(MetricSender.class);
var queue = new ConcurrentHashMap<Integer, Collection<BeanObject>>();
Mockito.when(sender.send(Mockito.anyInt(), Mockito.any()))
.thenAnswer(
Expand Down Expand Up @@ -90,7 +90,7 @@ void testPublishAndClose() {
void testNullCheck() {
var builder = MetricFetcher.builder();
Assertions.assertThrows(NullPointerException.class, builder::build);
builder.sender(MetricFetcher.Sender.local());
builder.sender(MetricSender.local());
Assertions.assertThrows(NullPointerException.class, builder::build);
builder.clientSupplier(() -> CompletableFuture.completedStage(Map.of()));
var fetcher = builder.build();
Expand All @@ -102,7 +102,7 @@ void testFetchBeanDelay() {
var client = Mockito.mock(JndiClient.class);
try (var fetcher =
MetricFetcher.builder()
.sender(MetricFetcher.Sender.local())
.sender(MetricSender.local())
.clientSupplier(() -> CompletableFuture.completedStage(Map.of(-1000, client)))
.fetchBeanDelay(Duration.ofSeconds(1000))
.build()) {
Expand All @@ -122,7 +122,7 @@ void testFetchMetadataDelay() {
.thenReturn(CompletableFuture.completedStage(Map.of(-1000, client)));
try (var fetcher =
MetricFetcher.builder()
.sender(MetricFetcher.Sender.local())
.sender(MetricSender.local())
.clientSupplier(supplier)
.fetchMetadataDelay(Duration.ofSeconds(1000))
.build()) {
Expand All @@ -137,7 +137,7 @@ void testFetchMetadataDelay() {
@Test
void testTopic() throws InterruptedException, ExecutionException {
var testBean = new BeanObject("java.lang", Map.of("name", "n1"), Map.of("value", "v1"));
try (var topicSender = MetricFetcher.Sender.topic(SERVICE.bootstrapServers())) {
try (var topicSender = MetricSender.topic(SERVICE.bootstrapServers())) {
topicSender.send(1, List.of(testBean));

// Test topic creation
Expand Down
7 changes: 7 additions & 0 deletions docker/start_broker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ RUN ./gradlew clean releaseTarGz
RUN mkdir /opt/kafka
RUN tar -zxvf \$(find ./core/build/distributions/ -maxdepth 1 -type f \( -iname \"kafka*tgz\" ! -iname \"*sit*\" \)) -C /opt/kafka --strip-components=1
# build astraea from source code
RUN git clone https://github.com/chia7712/astraea /tmp/astraea
WORKDIR /tmp/astraea
RUN git checkout tmp
RUN ./gradlew clean build -x test
RUN cp /tmp/astraea/common/build/libs/*.jar /opt/kafka/libs/
FROM azul/zulu-openjdk:21-jre
# copy kafka
Expand Down

0 comments on commit a2d829b

Please sign in to comment.