Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(broker): refactor opentelemertry exporter #842

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.ProfilerConfig;
import com.automq.rocketmq.common.config.TraceConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
Expand All @@ -47,23 +46,12 @@
import com.automq.rocketmq.store.MessageStoreBuilder;
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.api.MessageStore;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.pyroscope.http.Format;
import io.pyroscope.javaagent.EventType;
import io.pyroscope.javaagent.PyroscopeAgent;
import io.pyroscope.javaagent.config.Config;
import io.pyroscope.labels.Pyroscope;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -73,10 +61,6 @@
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.message.MessageService;

import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_INSTANCE_ID;
import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_NAME;
import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_VERSION;

public class BrokerController implements Lifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);

Expand All @@ -89,6 +73,7 @@ public class BrokerController implements Lifecycle {
private final StoreMetadataService storeMetadataService;
private final ProxyMetadataService proxyMetadataService;
private final ExtendMessagingProcessor messagingProcessor;
private final TelemetryExporter telemetryExporter;
private final MetricsExporter metricsExporter;
private final DeadLetterService dlqService;
private final MessageService messageService;
Expand Down Expand Up @@ -132,56 +117,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {

messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager, brokerConfig.proxy());

// Build resource.
Properties gitProperties;
try {
ClassLoader classLoader = getClass().getClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("git.properties");
gitProperties = new Properties();
gitProperties.load(inputStream);
} catch (Exception e) {
LOGGER.warn("read project version failed", e);
throw new RuntimeException(e);
}

AttributesBuilder builder = Attributes.builder();
builder.put(SERVICE_NAME, brokerConfig.name());
builder.put(SERVICE_VERSION, (String) gitProperties.get("git.build.version"));
builder.put("git.hash", (String) gitProperties.get("git.commit.id.describe"));
builder.put(SERVICE_INSTANCE_ID, brokerConfig.instanceId());

Resource resource = Resource.create(builder.build());

// Build trace provider.
TraceConfig traceConfig = brokerConfig.trace();
SdkTracerProvider sdkTracerProvider = null;
if (traceConfig.enabled()) {
OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(traceConfig.grpcExporterTarget())
.setTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.build();

SpanProcessor spanProcessor;
if (traceConfig.batchSize() == 0) {
spanProcessor = SimpleSpanProcessor.create(spanExporter);
} else {
spanProcessor = BatchSpanProcessor.builder(spanExporter)
.setExporterTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.setScheduleDelay(traceConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.setMaxExportBatchSize(traceConfig.batchSize())
.setMaxQueueSize(traceConfig.maxCachedSize())
.build();
}

sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.setResource(resource)
.build();
}

// Init the metrics exporter before accept requests.
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource,
sdkTracerProvider, metadataStore, s3MetadataService);
telemetryExporter = new TelemetryExporter(brokerConfig);
metricsExporter = new MetricsExporter(brokerConfig, telemetryExporter, messageStore, messagingProcessor, metadataStore, s3MetadataService);

// Init the profiler agent.
ProfilerConfig profilerConfig = brokerConfig.profiler();
Expand Down Expand Up @@ -228,6 +166,7 @@ public void shutdown() throws Exception {
messageStore.shutdown();
metadataStore.close();
metricsExporter.shutdown();
telemetryExporter.close();

// Shutdown the thread pool monitor.
ThreadPoolMonitor.shutdown();
Expand Down
160 changes: 18 additions & 142 deletions broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,24 @@
import com.automq.rocketmq.store.metrics.StoreMetricsManager;
import com.automq.rocketmq.store.metrics.StreamMetricsManager;
import com.google.common.base.Splitter;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.instrumentation.oshi.SystemMetrics;
import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_CUMULATIVE;
import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_DELTA;
Expand All @@ -73,16 +56,12 @@

public class MetricsExporter implements Lifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);
private static final Map<String, String> LABEL_MAP = new HashMap<>();
private volatile boolean started = false;
private final BrokerConfig brokerConfig;
private final MetricsConfig metricsConfig;
private final static Map<String, String> LABEL_MAP = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
private LoggingMetricExporter loggingMetricExporter;
private final TelemetryExporter telemetryExporter;
private Meter brokerMeter;
private OpenTelemetrySdk openTelemetrySdk;
private RuntimeMetrics runtimeMetrics;

private final ProxyMetricsManager proxyMetricsManager;
Expand All @@ -93,16 +72,17 @@ public class MetricsExporter implements Lifecycle {

public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;

public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore,
ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider,
MetadataStore metadataStore, S3MetadataService s3MetadataService) {
public MetricsExporter(BrokerConfig brokerConfig, TelemetryExporter telemetryExporter,
MessageStoreImpl messageStore, ExtendMessagingProcessor messagingProcessor, MetadataStore metadataStore,
S3MetadataService s3MetadataService) {
this.brokerConfig = brokerConfig;
this.metricsConfig = brokerConfig.metrics();
this.telemetryExporter = telemetryExporter;
this.proxyMetricsManager = new ProxyMetricsManager(messagingProcessor);
this.storeMetricsManager = new StoreMetricsManager(metricsConfig, messageStore);
this.streamMetricsManager = new StreamMetricsManager();
this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService);
init(resource, tracerProvider);
init();
}

public static AttributesBuilder newAttributesBuilder() {
Expand All @@ -115,30 +95,9 @@ public static AttributesBuilder newAttributesBuilder() {
return attributesBuilder;
}

private boolean checkConfig() {
if (metricsConfig == null) {
return false;
}
MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (!exporterType.isEnable()) {
return false;
}

return switch (exporterType) {
case OTLP_GRPC -> StringUtils.isNotBlank(metricsConfig.grpcExporterTarget());
case PROM, LOG -> true;
default -> false;
};
}

private void init(Resource resource, SdkTracerProvider tracerProvider) {
MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (metricsExporterType == MetricsExporterType.DISABLE) {
return;
}

if (!checkConfig()) {
LOGGER.error("check metrics config failed, will not export metrics");
private void init() {
if (!telemetryExporter.isMetricsEnabled()) {
LOGGER.info("MetricsExporter is disabled");
return;
}

Expand All @@ -163,81 +122,13 @@ private void init(Resource resource, SdkTracerProvider tracerProvider) {
LABEL_MAP.put(LABEL_NODE_NAME, brokerConfig.name());
LABEL_MAP.put(LABEL_INSTANCE_ID, brokerConfig.instanceId());

SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(resource);

if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
String endpoint = metricsConfig.grpcExporterTarget();
if (!endpoint.startsWith("http")) {
endpoint = "https://" + endpoint;
}
OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
.setEndpoint(endpoint)
.setTimeout(metricsConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.setAggregationTemporalitySelector(type -> {
if (metricsConfig.exportInDelta() &&
(type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) {
return AggregationTemporality.DELTA;
}
return AggregationTemporality.CUMULATIVE;
});

String headers = metricsConfig.grpcExporterHeader();
if (StringUtils.isNotBlank(headers)) {
Map<String, String> headerMap = new HashMap<>();
List<String> kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers);
for (String item : kvPairs) {
String[] split = item.split(":");
if (split.length != 2) {
LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers);
continue;
}
headerMap.put(split[0], split[1]);
}
headerMap.forEach(metricExporterBuilder::addHeader);
}

metricExporter = metricExporterBuilder.build();

periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
.setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();

providerBuilder.registerMetricReader(periodicMetricReader);
} else if (metricsExporterType == MetricsExporterType.PROM) {
String promExporterHost = metricsConfig.promExporterHost();
if (StringUtils.isBlank(promExporterHost)) {
throw new IllegalArgumentException("Config item promExporterHost is blank");
}
prometheusHttpServer = PrometheusHttpServer.builder()
.setHost(promExporterHost)
.setPort(metricsConfig.promExporterPort())
.build();
providerBuilder.registerMetricReader(prometheusHttpServer);
} else if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
loggingMetricExporter = LoggingMetricExporter.create(metricsConfig.exportInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
.setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();
providerBuilder.registerMetricReader(periodicMetricReader);
}

registerMetricsView(providerBuilder);

OpenTelemetrySdkBuilder telemetrySdkBuilder = OpenTelemetrySdk.builder();

if (tracerProvider != null) {
telemetrySdkBuilder.setTracerProvider(tracerProvider);
Optional<OpenTelemetrySdk> optional = telemetryExporter.openTelemetrySdk();
if (optional.isEmpty()) {
LOGGER.warn("OpenTelemetrySdk is not initialized");
return;
}

openTelemetrySdk = telemetrySdkBuilder
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance())))
.setMeterProvider(providerBuilder.build())
.buildAndRegisterGlobal();

OpenTelemetrySdk openTelemetrySdk = optional.get();
brokerMeter = openTelemetrySdk.getMeter("automq-for-rocketmq");

// JVM metrics
Expand Down Expand Up @@ -281,14 +172,13 @@ private void initDynamicMetrics() {

@Override
public void start() {
MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (metricsExporterType != MetricsExporterType.DISABLE) {
if (telemetryExporter.isMetricsEnabled()) {
initDynamicMetrics();
this.started = true;
}
}

private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
protected static void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
for (Pair<InstrumentSelector, View> selectorViewPair : ProxyMetricsManager.getMetricsView()) {
providerBuilder.registerView(selectorViewPair.getLeft(), selectorViewPair.getRight());
}
Expand All @@ -307,24 +197,10 @@ public void shutdown() {
if (!started) {
return;
}
MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (exporterType == MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
metricExporter.shutdown();
} else if (exporterType == MetricsExporterType.PROM) {
prometheusHttpServer.forceFlush();
prometheusHttpServer.shutdown();
} else if (exporterType == MetricsExporterType.LOG) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
loggingMetricExporter.shutdown();
}
if (runtimeMetrics != null) {
runtimeMetrics.close();
}
storeMetricsManager.shutdown();
openTelemetrySdk.shutdown();
}
}

Loading
Loading