From e268e1982a4900bdf192c22f8e7b581b73c9ab34 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 19 Dec 2023 12:57:11 +0800 Subject: [PATCH] refactor(broker): refactor opentelemertry exporter (#842) Signed-off-by: SSpirits --- .../rocketmq/broker/BrokerController.java | 69 +---- .../rocketmq/broker/MetricsExporter.java | 160 ++-------- .../rocketmq/broker/TelemetryExporter.java | 285 ++++++++++++++++++ 3 files changed, 307 insertions(+), 207 deletions(-) create mode 100644 broker/src/main/java/com/automq/rocketmq/broker/TelemetryExporter.java diff --git a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java index d4b233f3a..a28a7f1fe 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/BrokerController.java @@ -20,7 +20,6 @@ import com.automq.rocketmq.common.api.DataStore; import com.automq.rocketmq.common.config.BrokerConfig; import com.automq.rocketmq.common.config.ProfilerConfig; -import com.automq.rocketmq.common.config.TraceConfig; import com.automq.rocketmq.common.util.Lifecycle; import com.automq.rocketmq.controller.MetadataStore; import com.automq.rocketmq.controller.server.ControllerServiceImpl; @@ -47,23 +46,12 @@ import com.automq.rocketmq.store.MessageStoreBuilder; import com.automq.rocketmq.store.MessageStoreImpl; import com.automq.rocketmq.store.api.MessageStore; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; -import io.opentelemetry.sdk.resources.Resource; -import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.SpanProcessor; -import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; -import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.pyroscope.http.Format; import io.pyroscope.javaagent.EventType; import io.pyroscope.javaagent.PyroscopeAgent; import io.pyroscope.javaagent.config.Config; import io.pyroscope.labels.Pyroscope; -import java.io.InputStream; import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.ProducerManager; import org.apache.rocketmq.common.constant.LoggerName; @@ -73,10 +61,6 @@ import org.apache.rocketmq.proxy.service.ServiceManager; import org.apache.rocketmq.proxy.service.message.MessageService; -import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_INSTANCE_ID; -import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_NAME; -import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_VERSION; - public class BrokerController implements Lifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class); @@ -89,6 +73,7 @@ public class BrokerController implements Lifecycle { private final StoreMetadataService storeMetadataService; private final ProxyMetadataService proxyMetadataService; private final ExtendMessagingProcessor messagingProcessor; + private final TelemetryExporter telemetryExporter; private final MetricsExporter metricsExporter; private final DeadLetterService dlqService; private final MessageService messageService; @@ -132,56 +117,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception { messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager, brokerConfig.proxy()); - // Build resource. - Properties gitProperties; - try { - ClassLoader classLoader = getClass().getClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream("git.properties"); - gitProperties = new Properties(); - gitProperties.load(inputStream); - } catch (Exception e) { - LOGGER.warn("read project version failed", e); - throw new RuntimeException(e); - } - - AttributesBuilder builder = Attributes.builder(); - builder.put(SERVICE_NAME, brokerConfig.name()); - builder.put(SERVICE_VERSION, (String) gitProperties.get("git.build.version")); - builder.put("git.hash", (String) gitProperties.get("git.commit.id.describe")); - builder.put(SERVICE_INSTANCE_ID, brokerConfig.instanceId()); - - Resource resource = Resource.create(builder.build()); - - // Build trace provider. - TraceConfig traceConfig = brokerConfig.trace(); - SdkTracerProvider sdkTracerProvider = null; - if (traceConfig.enabled()) { - OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() - .setEndpoint(traceConfig.grpcExporterTarget()) - .setTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) - .build(); - - SpanProcessor spanProcessor; - if (traceConfig.batchSize() == 0) { - spanProcessor = SimpleSpanProcessor.create(spanExporter); - } else { - spanProcessor = BatchSpanProcessor.builder(spanExporter) - .setExporterTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) - .setScheduleDelay(traceConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS) - .setMaxExportBatchSize(traceConfig.batchSize()) - .setMaxQueueSize(traceConfig.maxCachedSize()) - .build(); - } - - sdkTracerProvider = SdkTracerProvider.builder() - .addSpanProcessor(spanProcessor) - .setResource(resource) - .build(); - } - // Init the metrics exporter before accept requests. - metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource, - sdkTracerProvider, metadataStore, s3MetadataService); + telemetryExporter = new TelemetryExporter(brokerConfig); + metricsExporter = new MetricsExporter(brokerConfig, telemetryExporter, messageStore, messagingProcessor, metadataStore, s3MetadataService); // Init the profiler agent. ProfilerConfig profilerConfig = brokerConfig.profiler(); @@ -228,6 +166,7 @@ public void shutdown() throws Exception { messageStore.shutdown(); metadataStore.close(); metricsExporter.shutdown(); + telemetryExporter.close(); // Shutdown the thread pool monitor. ThreadPoolMonitor.shutdown(); diff --git a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java index ae51b0410..0944791ab 100644 --- a/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java +++ b/broker/src/main/java/com/automq/rocketmq/broker/MetricsExporter.java @@ -29,41 +29,24 @@ import com.automq.rocketmq.store.metrics.StoreMetricsManager; import com.automq.rocketmq.store.metrics.StreamMetricsManager; import com.google.common.base.Splitter; -import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.exporter.logging.LoggingMetricExporter; -import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; -import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; -import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; import io.opentelemetry.instrumentation.oshi.SystemMetrics; import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics; import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.InstrumentType; -import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.View; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; -import io.opentelemetry.sdk.resources.Resource; -import io.opentelemetry.sdk.trace.SdkTracerProvider; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Optional; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.rocketmq.common.metrics.MetricsExporterType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.slf4j.bridge.SLF4JBridgeHandler; import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_CUMULATIVE; import static com.automq.rocketmq.broker.MetricsConstant.AGGREGATION_DELTA; @@ -73,16 +56,12 @@ public class MetricsExporter implements Lifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class); + private static final Map LABEL_MAP = new HashMap<>(); private volatile boolean started = false; private final BrokerConfig brokerConfig; private final MetricsConfig metricsConfig; - private final static Map LABEL_MAP = new HashMap<>(); - private OtlpGrpcMetricExporter metricExporter; - private PeriodicMetricReader periodicMetricReader; - private PrometheusHttpServer prometheusHttpServer; - private LoggingMetricExporter loggingMetricExporter; + private final TelemetryExporter telemetryExporter; private Meter brokerMeter; - private OpenTelemetrySdk openTelemetrySdk; private RuntimeMetrics runtimeMetrics; private final ProxyMetricsManager proxyMetricsManager; @@ -93,16 +72,17 @@ public class MetricsExporter implements Lifecycle { public static Supplier attributesBuilderSupplier = Attributes::builder; - public MetricsExporter(BrokerConfig brokerConfig, MessageStoreImpl messageStore, - ExtendMessagingProcessor messagingProcessor, Resource resource, SdkTracerProvider tracerProvider, - MetadataStore metadataStore, S3MetadataService s3MetadataService) { + public MetricsExporter(BrokerConfig brokerConfig, TelemetryExporter telemetryExporter, + MessageStoreImpl messageStore, ExtendMessagingProcessor messagingProcessor, MetadataStore metadataStore, + S3MetadataService s3MetadataService) { this.brokerConfig = brokerConfig; this.metricsConfig = brokerConfig.metrics(); + this.telemetryExporter = telemetryExporter; this.proxyMetricsManager = new ProxyMetricsManager(messagingProcessor); this.storeMetricsManager = new StoreMetricsManager(metricsConfig, messageStore); this.streamMetricsManager = new StreamMetricsManager(); this.topicMetricsManager = new TopicMetricsManager(metadataStore, s3MetadataService); - init(resource, tracerProvider); + init(); } public static AttributesBuilder newAttributesBuilder() { @@ -115,30 +95,9 @@ public static AttributesBuilder newAttributesBuilder() { return attributesBuilder; } - private boolean checkConfig() { - if (metricsConfig == null) { - return false; - } - MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); - if (!exporterType.isEnable()) { - return false; - } - - return switch (exporterType) { - case OTLP_GRPC -> StringUtils.isNotBlank(metricsConfig.grpcExporterTarget()); - case PROM, LOG -> true; - default -> false; - }; - } - - private void init(Resource resource, SdkTracerProvider tracerProvider) { - MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); - if (metricsExporterType == MetricsExporterType.DISABLE) { - return; - } - - if (!checkConfig()) { - LOGGER.error("check metrics config failed, will not export metrics"); + private void init() { + if (!telemetryExporter.isMetricsEnabled()) { + LOGGER.info("MetricsExporter is disabled"); return; } @@ -163,81 +122,13 @@ private void init(Resource resource, SdkTracerProvider tracerProvider) { LABEL_MAP.put(LABEL_NODE_NAME, brokerConfig.name()); LABEL_MAP.put(LABEL_INSTANCE_ID, brokerConfig.instanceId()); - SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() - .setResource(resource); - - if (metricsExporterType == MetricsExporterType.OTLP_GRPC) { - String endpoint = metricsConfig.grpcExporterTarget(); - if (!endpoint.startsWith("http")) { - endpoint = "https://" + endpoint; - } - OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() - .setEndpoint(endpoint) - .setTimeout(metricsConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) - .setAggregationTemporalitySelector(type -> { - if (metricsConfig.exportInDelta() && - (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) { - return AggregationTemporality.DELTA; - } - return AggregationTemporality.CUMULATIVE; - }); - - String headers = metricsConfig.grpcExporterHeader(); - if (StringUtils.isNotBlank(headers)) { - Map headerMap = new HashMap<>(); - List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers); - for (String item : kvPairs) { - String[] split = item.split(":"); - if (split.length != 2) { - LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers); - continue; - } - headerMap.put(split[0], split[1]); - } - headerMap.forEach(metricExporterBuilder::addHeader); - } - - metricExporter = metricExporterBuilder.build(); - - periodicMetricReader = PeriodicMetricReader.builder(metricExporter) - .setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS) - .build(); - - providerBuilder.registerMetricReader(periodicMetricReader); - } else if (metricsExporterType == MetricsExporterType.PROM) { - String promExporterHost = metricsConfig.promExporterHost(); - if (StringUtils.isBlank(promExporterHost)) { - throw new IllegalArgumentException("Config item promExporterHost is blank"); - } - prometheusHttpServer = PrometheusHttpServer.builder() - .setHost(promExporterHost) - .setPort(metricsConfig.promExporterPort()) - .build(); - providerBuilder.registerMetricReader(prometheusHttpServer); - } else if (metricsExporterType == MetricsExporterType.LOG) { - SLF4JBridgeHandler.removeHandlersForRootLogger(); - SLF4JBridgeHandler.install(); - loggingMetricExporter = LoggingMetricExporter.create(metricsConfig.exportInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE); - java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST); - periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter) - .setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS) - .build(); - providerBuilder.registerMetricReader(periodicMetricReader); - } - - registerMetricsView(providerBuilder); - - OpenTelemetrySdkBuilder telemetrySdkBuilder = OpenTelemetrySdk.builder(); - - if (tracerProvider != null) { - telemetrySdkBuilder.setTracerProvider(tracerProvider); + Optional optional = telemetryExporter.openTelemetrySdk(); + if (optional.isEmpty()) { + LOGGER.warn("OpenTelemetrySdk is not initialized"); + return; } - openTelemetrySdk = telemetrySdkBuilder - .setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))) - .setMeterProvider(providerBuilder.build()) - .buildAndRegisterGlobal(); - + OpenTelemetrySdk openTelemetrySdk = optional.get(); brokerMeter = openTelemetrySdk.getMeter("automq-for-rocketmq"); // JVM metrics @@ -281,14 +172,13 @@ private void initDynamicMetrics() { @Override public void start() { - MetricsExporterType metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); - if (metricsExporterType != MetricsExporterType.DISABLE) { + if (telemetryExporter.isMetricsEnabled()) { initDynamicMetrics(); this.started = true; } } - private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { + protected static void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { for (Pair selectorViewPair : ProxyMetricsManager.getMetricsView()) { providerBuilder.registerView(selectorViewPair.getLeft(), selectorViewPair.getRight()); } @@ -307,24 +197,10 @@ public void shutdown() { if (!started) { return; } - MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); - if (exporterType == MetricsExporterType.OTLP_GRPC) { - periodicMetricReader.forceFlush(); - periodicMetricReader.shutdown(); - metricExporter.shutdown(); - } else if (exporterType == MetricsExporterType.PROM) { - prometheusHttpServer.forceFlush(); - prometheusHttpServer.shutdown(); - } else if (exporterType == MetricsExporterType.LOG) { - periodicMetricReader.forceFlush(); - periodicMetricReader.shutdown(); - loggingMetricExporter.shutdown(); - } if (runtimeMetrics != null) { runtimeMetrics.close(); } storeMetricsManager.shutdown(); - openTelemetrySdk.shutdown(); } } diff --git a/broker/src/main/java/com/automq/rocketmq/broker/TelemetryExporter.java b/broker/src/main/java/com/automq/rocketmq/broker/TelemetryExporter.java new file mode 100644 index 000000000..4207ee204 --- /dev/null +++ b/broker/src/main/java/com/automq/rocketmq/broker/TelemetryExporter.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.broker; + +import com.automq.rocketmq.common.config.BrokerConfig; +import com.automq.rocketmq.common.config.MetricsConfig; +import com.automq.rocketmq.common.config.TraceConfig; +import com.google.common.base.Splitter; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.exporter.logging.LoggingMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.io.Closeable; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.metrics.MetricsExporterType; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_INSTANCE_ID; +import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_NAME; +import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_VERSION; + +public class TelemetryExporter implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(TelemetryExporter.class); + + private final BrokerConfig brokerConfig; + private final MetricsConfig metricsConfig; + private final TraceConfig traceConfig; + + private Resource resource; + private OpenTelemetrySdk openTelemetrySdk; + + // Metrics + MetricsExporterType metricsExporterType; + + private OtlpGrpcMetricExporter metricExporter; + private PeriodicMetricReader periodicMetricReader; + private PrometheusHttpServer prometheusHttpServer; + private LoggingMetricExporter loggingMetricExporter; + + // Trace + private SdkTracerProvider tracerProvider; + + public TelemetryExporter(BrokerConfig brokerConfig) { + this.brokerConfig = brokerConfig; + this.metricsConfig = brokerConfig.metrics(); + this.traceConfig = brokerConfig.trace(); + init(); + } + + private boolean isTelemetryEnabled() { + try { + metricsExporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); + } catch (Exception e) { + LOGGER.warn("invalid metrics exporter type: {}", metricsConfig.exporterType()); + metricsExporterType = MetricsExporterType.DISABLE; + } + + return metricsExporterType.isEnable() || traceConfig.enabled(); + } + + public boolean isMetricsEnabled() { + return metricsExporterType.isEnable(); + } + + public boolean isTraceEnabled() { + return traceConfig.enabled(); + } + + public Optional openTelemetrySdk() { + return Optional.ofNullable(openTelemetrySdk); + } + + public void init() { + if (!isTelemetryEnabled()) { + return; + } + + // Build resource. + Properties gitProperties; + try { + ClassLoader classLoader = getClass().getClassLoader(); + InputStream inputStream = classLoader.getResourceAsStream("git.properties"); + gitProperties = new Properties(); + gitProperties.load(inputStream); + } catch (Exception e) { + LOGGER.warn("read project version failed", e); + throw new RuntimeException(e); + } + + AttributesBuilder builder = Attributes.builder(); + builder.put(SERVICE_NAME, brokerConfig.name()); + builder.put(SERVICE_VERSION, (String) gitProperties.get("git.build.version")); + builder.put("git.hash", (String) gitProperties.get("git.commit.id.describe")); + builder.put(SERVICE_INSTANCE_ID, brokerConfig.instanceId()); + + resource = Resource.create(builder.build()); + + OpenTelemetrySdkBuilder telemetrySdkBuilder = OpenTelemetrySdk.builder(); + + buildMeterProvider().ifPresent(telemetrySdkBuilder::setMeterProvider); + buildTracerProvider().ifPresent(tracerProvider -> { + telemetrySdkBuilder.setTracerProvider(tracerProvider); + telemetrySdkBuilder.setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))); + }); + + openTelemetrySdk = telemetrySdkBuilder.buildAndRegisterGlobal(); + } + + private boolean checkMetricsConfig() { + return switch (metricsExporterType) { + case OTLP_GRPC -> StringUtils.isNotBlank(metricsConfig.grpcExporterTarget()); + case PROM, LOG -> true; + default -> false; + }; + } + + private Optional buildMeterProvider() { + if (!checkMetricsConfig()) { + LOGGER.error("check metrics config failed, will not export metrics"); + return Optional.empty(); + } + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(resource); + + if (metricsExporterType == MetricsExporterType.OTLP_GRPC) { + String endpoint = metricsConfig.grpcExporterTarget(); + if (!endpoint.startsWith("http")) { + endpoint = "https://" + endpoint; + } + OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(metricsConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .setAggregationTemporalitySelector(type -> { + if (metricsConfig.exportInDelta() && + (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) { + return AggregationTemporality.DELTA; + } + return AggregationTemporality.CUMULATIVE; + }); + + String headers = metricsConfig.grpcExporterHeader(); + if (StringUtils.isNotBlank(headers)) { + Map headerMap = new HashMap<>(); + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers); + continue; + } + headerMap.put(split[0], split[1]); + } + headerMap.forEach(metricExporterBuilder::addHeader); + } + + metricExporter = metricExporterBuilder.build(); + + periodicMetricReader = PeriodicMetricReader.builder(metricExporter) + .setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + + providerBuilder.registerMetricReader(periodicMetricReader); + } else if (metricsExporterType == MetricsExporterType.PROM) { + String promExporterHost = metricsConfig.promExporterHost(); + if (StringUtils.isBlank(promExporterHost)) { + throw new IllegalArgumentException("Config item promExporterHost is blank"); + } + prometheusHttpServer = PrometheusHttpServer.builder() + .setHost(promExporterHost) + .setPort(metricsConfig.promExporterPort()) + .build(); + providerBuilder.registerMetricReader(prometheusHttpServer); + } else if (metricsExporterType == MetricsExporterType.LOG) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + loggingMetricExporter = LoggingMetricExporter.create(metricsConfig.exportInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE); + java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST); + periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter) + .setInterval(metricsConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + providerBuilder.registerMetricReader(periodicMetricReader); + } + + MetricsExporter.registerMetricsView(providerBuilder); + + return Optional.of(providerBuilder.build()); + } + + private Optional buildTracerProvider() { + if (traceConfig.enabled()) { + OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint(traceConfig.grpcExporterTarget()) + .setTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .build(); + + SpanProcessor spanProcessor; + if (traceConfig.batchSize() == 0) { + spanProcessor = SimpleSpanProcessor.create(spanExporter); + } else { + spanProcessor = BatchSpanProcessor.builder(spanExporter) + .setExporterTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .setScheduleDelay(traceConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .setMaxExportBatchSize(traceConfig.batchSize()) + .setMaxQueueSize(traceConfig.maxCachedSize()) + .build(); + } + + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setResource(resource) + .build(); + return Optional.of(tracerProvider); + } + return Optional.empty(); + } + + @Override + public void close() { + MetricsExporterType exporterType = MetricsExporterType.valueOf(metricsConfig.exporterType()); + if (exporterType == MetricsExporterType.OTLP_GRPC) { + periodicMetricReader.forceFlush(); + periodicMetricReader.shutdown(); + metricExporter.shutdown(); + } else if (exporterType == MetricsExporterType.PROM) { + prometheusHttpServer.forceFlush(); + prometheusHttpServer.shutdown(); + } else if (exporterType == MetricsExporterType.LOG) { + periodicMetricReader.forceFlush(); + periodicMetricReader.shutdown(); + loggingMetricExporter.shutdown(); + } + + if (tracerProvider != null) { + tracerProvider.shutdown(); + } + + if (openTelemetrySdk != null) { + openTelemetrySdk.shutdown(); + } + } +}