Skip to content

Commit

Permalink
refactor(broker): refactor opentelemertry exporter (#842)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Dec 19, 2023
1 parent eaf816f commit e268e19
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.ProfilerConfig;
import com.automq.rocketmq.common.config.TraceConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
Expand All @@ -47,23 +46,12 @@
import com.automq.rocketmq.store.MessageStoreBuilder;
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.api.MessageStore;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.pyroscope.http.Format;
import io.pyroscope.javaagent.EventType;
import io.pyroscope.javaagent.PyroscopeAgent;
import io.pyroscope.javaagent.config.Config;
import io.pyroscope.labels.Pyroscope;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -73,10 +61,6 @@
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.message.MessageService;

import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_INSTANCE_ID;
import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_NAME;
import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_VERSION;

public class BrokerController implements Lifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);

Expand All @@ -89,6 +73,7 @@ public class BrokerController implements Lifecycle {
private final StoreMetadataService storeMetadataService;
private final ProxyMetadataService proxyMetadataService;
private final ExtendMessagingProcessor messagingProcessor;
private final TelemetryExporter telemetryExporter;
private final MetricsExporter metricsExporter;
private final DeadLetterService dlqService;
private final MessageService messageService;
Expand Down Expand Up @@ -132,56 +117,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {

messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager, brokerConfig.proxy());

// Build resource.
Properties gitProperties;
try {
ClassLoader classLoader = getClass().getClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("git.properties");
gitProperties = new Properties();
gitProperties.load(inputStream);
} catch (Exception e) {
LOGGER.warn("read project version failed", e);
throw new RuntimeException(e);
}

AttributesBuilder builder = Attributes.builder();
builder.put(SERVICE_NAME, brokerConfig.name());
builder.put(SERVICE_VERSION, (String) gitProperties.get("git.build.version"));
builder.put("git.hash", (String) gitProperties.get("git.commit.id.describe"));
builder.put(SERVICE_INSTANCE_ID, brokerConfig.instanceId());

Resource resource = Resource.create(builder.build());

// Build trace provider.
TraceConfig traceConfig = brokerConfig.trace();
SdkTracerProvider sdkTracerProvider = null;
if (traceConfig.enabled()) {
OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(traceConfig.grpcExporterTarget())
.setTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.build();

SpanProcessor spanProcessor;
if (traceConfig.batchSize() == 0) {
spanProcessor = SimpleSpanProcessor.create(spanExporter);
} else {
spanProcessor = BatchSpanProcessor.builder(spanExporter)
.setExporterTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.setScheduleDelay(traceConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.setMaxExportBatchSize(traceConfig.batchSize())
.setMaxQueueSize(traceConfig.maxCachedSize())
.build();
}

sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.setResource(resource)
.build();
}

// Init the metrics exporter before accept requests.
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource,
sdkTracerProvider, metadataStore, s3MetadataService);
telemetryExporter = new TelemetryExporter(brokerConfig);
metricsExporter = new MetricsExporter(brokerConfig, telemetryExporter, messageStore, messagingProcessor, metadataStore, s3MetadataService);

// Init the profiler agent.
ProfilerConfig profilerConfig = brokerConfig.profiler();
Expand Down Expand Up @@ -228,6 +166,7 @@ public void shutdown() throws Exception {
messageStore.shutdown();
metadataStore.close();
metricsExporter.shutdown();
telemetryExporter.close();

// Shutdown the thread pool monitor.
ThreadPoolMonitor.shutdown();
Expand Down
160 changes: 18 additions & 142 deletions broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,24 @@
import com.automq.rocketmq.store.metrics.StoreMetricsManager;
import com.automq.rocketmq.store.metrics.StreamMetricsManager;
import com.google.common.base.Splitter;
import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.instrumentation.oshi.SystemMetrics;
import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.OpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_CUMULATIVE;
import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_DELTA;
Expand All @@ -73,16 +56,12 @@

public class MetricsExporter implements Lifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);
private static final Map<String, String> LABEL_MAP = new HashMap<>();
private volatile boolean started = false;
private final BrokerConfig brokerConfig;
private final MetricsConfig metricsConfig;
private final static Map<String, String> LABEL_MAP = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
private LoggingMetricExporter loggingMetricExporter;
private final TelemetryExporter telemetryExporter;
private Meter brokerMeter;
private OpenTelemetrySdk openTelemetrySdk;
private RuntimeMetrics runtimeMetrics;

private final ProxyMetricsManager proxyMetricsManager;
Expand All @@ -93,16 +72,17 @@ public class MetricsExporter implements Lifecycle {

public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;

public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore,
ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider,
MetadataStore metadataStore, S3MetadataService s3MetadataService) {
public MetricsExporter(BrokerConfig brokerConfig, TelemetryExporter telemetryExporter,
MessageStoreImpl messageStore, ExtendMessagingProcessor messagingProcessor, MetadataStore metadataStore,
S3MetadataService s3MetadataService) {
this.brokerConfig = brokerConfig;
this.metricsConfig = brokerConfig.metrics();
this.telemetryExporter = telemetryExporter;
this.proxyMetricsManager = new ProxyMetricsManager(messagingProcessor);
this.storeMetricsManager = new StoreMetricsManager(metricsConfig, messageStore);
this.streamMetricsManager = new StreamMetricsManager();
this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService);
init(resource, tracerProvider);
init();
}

public static AttributesBuilder newAttributesBuilder() {
Expand All @@ -115,30 +95,9 @@ public static AttributesBuilder newAttributesBuilder() {
return attributesBuilder;
}

private boolean checkConfig() {
if (metricsConfig == null) {
return false;
}
MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (!exporterType.isEnable()) {
return false;
}

return switch (exporterType) {
case OTLP_GRPC -> StringUtils.isNotBlank(metricsConfig.grpcExporterTarget());
case PROM, LOG -> true;
default -> false;
};
}

private void init(Resource resource, SdkTracerProvider tracerProvider) {
MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (metricsExporterType == MetricsExporterType.DISABLE) {
return;
}

if (!checkConfig()) {
LOGGER.error("check metrics config failed, will not export metrics");
private void init() {
if (!telemetryExporter.isMetricsEnabled()) {
LOGGER.info("MetricsExporter is disabled");
return;
}

Expand All @@ -163,81 +122,13 @@ private void init(Resource resource, SdkTracerProvider tracerProvider) {
LABEL_MAP.put(LABEL_NODE_NAME, brokerConfig.name());
LABEL_MAP.put(LABEL_INSTANCE_ID, brokerConfig.instanceId());

SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(resource);

if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
String endpoint = metricsConfig.grpcExporterTarget();
if (!endpoint.startsWith("http")) {
endpoint = "https://" + endpoint;
}
OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
.setEndpoint(endpoint)
.setTimeout(metricsConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.setAggregationTemporalitySelector(type -> {
if (metricsConfig.exportInDelta() &&
(type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) {
return AggregationTemporality.DELTA;
}
return AggregationTemporality.CUMULATIVE;
});

String headers = metricsConfig.grpcExporterHeader();
if (StringUtils.isNotBlank(headers)) {
Map<String, String> headerMap = new HashMap<>();
List<String> kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers);
for (String item : kvPairs) {
String[] split = item.split(":");
if (split.length != 2) {
LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers);
continue;
}
headerMap.put(split[0], split[1]);
}
headerMap.forEach(metricExporterBuilder::addHeader);
}

metricExporter = metricExporterBuilder.build();

periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
.setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();

providerBuilder.registerMetricReader(periodicMetricReader);
} else if (metricsExporterType == MetricsExporterType.PROM) {
String promExporterHost = metricsConfig.promExporterHost();
if (StringUtils.isBlank(promExporterHost)) {
throw new IllegalArgumentException("Config item promExporterHost is blank");
}
prometheusHttpServer = PrometheusHttpServer.builder()
.setHost(promExporterHost)
.setPort(metricsConfig.promExporterPort())
.build();
providerBuilder.registerMetricReader(prometheusHttpServer);
} else if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
loggingMetricExporter = LoggingMetricExporter.create(metricsConfig.exportInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
.setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();
providerBuilder.registerMetricReader(periodicMetricReader);
}

registerMetricsView(providerBuilder);

OpenTelemetrySdkBuilder telemetrySdkBuilder = OpenTelemetrySdk.builder();

if (tracerProvider != null) {
telemetrySdkBuilder.setTracerProvider(tracerProvider);
Optional<OpenTelemetrySdk> optional = telemetryExporter.openTelemetrySdk();
if (optional.isEmpty()) {
LOGGER.warn("OpenTelemetrySdk is not initialized");
return;
}

openTelemetrySdk = telemetrySdkBuilder
.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance())))
.setMeterProvider(providerBuilder.build())
.buildAndRegisterGlobal();

OpenTelemetrySdk openTelemetrySdk = optional.get();
brokerMeter = openTelemetrySdk.getMeter("automq-for-rocketmq");

// JVM metrics
Expand Down Expand Up @@ -281,14 +172,13 @@ private void initDynamicMetrics() {

@Override
public void start() {
MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (metricsExporterType != MetricsExporterType.DISABLE) {
if (telemetryExporter.isMetricsEnabled()) {
initDynamicMetrics();
this.started = true;
}
}

private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
protected static void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
for (Pair<InstrumentSelector, View> selectorViewPair : ProxyMetricsManager.getMetricsView()) {
providerBuilder.registerView(selectorViewPair.getLeft(), selectorViewPair.getRight());
}
Expand All @@ -307,24 +197,10 @@ public void shutdown() {
if (!started) {
return;
}
MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType());
if (exporterType == MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
metricExporter.shutdown();
} else if (exporterType == MetricsExporterType.PROM) {
prometheusHttpServer.forceFlush();
prometheusHttpServer.shutdown();
} else if (exporterType == MetricsExporterType.LOG) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
loggingMetricExporter.shutdown();
}
if (runtimeMetrics != null) {
runtimeMetrics.close();
}
storeMetricsManager.shutdown();
openTelemetrySdk.shutdown();
}
}

Loading

0 comments on commit e268e19

Please sign in to comment.