Skip to content

Commit

Permalink
Add automatic resource providers
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor committed Feb 9, 2024
1 parent fc5ba4f commit 7bfda76
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 34 deletions.
3 changes: 3 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ The Apache Software License, Version 2.0
- io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
- io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
- io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
- io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar

BSD 3-clause "New" or "Revised" License
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ flexible messaging model and an intuitive client API.</description>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
<opentelemetry.version>1.34.1</opentelemetry.version>
<opentelemetry.alpha.version>1.34.1-alpha</opentelemetry.alpha.version>
<opentelemetry.instrumentation.version>1.32.1-alpha</opentelemetry.instrumentation.version>
<opentelemetry.semconv.version>1.23.1-alpha</opentelemetry.semconv.version>

<!-- test dependencies -->
Expand Down Expand Up @@ -1464,6 +1465,11 @@ flexible messaging model and an intuitive client API.</description>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-resources</artifactId>
<version>${opentelemetry.instrumentation.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions pulsar-opentelemetry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-resources</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.pulsar.opentelemetry;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounterBuilder;
Expand All @@ -33,11 +32,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
import org.assertj.core.api.AbstractCharSequenceAssert;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -91,25 +91,7 @@ public void testClusterNameCannotBeEmpty() {
}

@Test
public void testIsClusterNameSet() throws Exception {
@Cleanup
var reader = InMemoryMetricReader.create();

@Cleanup
var ots = OpenTelemetryService.builder().
sdkBuilderConsumer(getSdkBuilderConsumer(reader,
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"))).
clusterName("testCluster").
build();

assertThat(reader.collectAllMetrics())
.allSatisfy(metric -> assertThat(metric)
.hasResourceSatisfying(
resource -> resource.hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, "testCluster")));
}

@Test
public void testIsServiceNameAndVersionSet() throws Exception {
public void testResourceAttributesAreSet() throws Exception {
@Cleanup
var reader = InMemoryMetricReader.create();

Expand All @@ -127,7 +109,8 @@ public void testIsServiceNameAndVersionSet() throws Exception {
.hasResourceSatisfying(resource -> resource
.hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, "testServiceNameAndVersion")
.hasAttribute(ResourceAttributes.SERVICE_NAME, "openTelemetryServiceTestService")
.hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")));
.hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")
.hasAttribute(satisfies(ResourceAttributes.HOST_NAME, AbstractCharSequenceAssert::isNotBlank))));
}

@Test
Expand Down Expand Up @@ -200,18 +183,17 @@ public void testServiceIsDisabledByDefault() throws Exception {
meter.counterBuilder("dummyCounterD").setUnit("unit")
);

var callbackCount = new AtomicInteger();
var callback = new AtomicBoolean();
// Validate that no matter how the counters are being built, they are all backed by the same underlying object.
// This ensures we conserve memory when the SDK is disabled.
assertEquals(builders.stream().map(LongCounterBuilder::build).distinct().count(), 1);
assertEquals(builders.stream().map(LongCounterBuilder::buildObserver).distinct().count(), 1);
assertEquals(builders.stream().map(b -> b.buildWithCallback(__ -> callbackCount.incrementAndGet()))
.distinct().count(), 1);
assertThat(builders.stream().map(LongCounterBuilder::build).distinct()).hasSize(1);
assertThat(builders.stream().map(LongCounterBuilder::buildObserver).distinct()).hasSize(1);
assertThat(builders.stream().map(b -> b.buildWithCallback(__ -> callback.set(true))).distinct()).hasSize(1);

// Validate that no metrics are being emitted at all.
assertTrue(metricReader.collectAllMetrics().isEmpty());
assertThat(metricReader.collectAllMetrics()).isEmpty();

// Validate that the callback has not being called.
assertEquals(callbackCount.get(), 0);
assertThat(callback).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
Expand Down Expand Up @@ -70,6 +71,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte
public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING =
Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING"));

@Getter
protected final String hostname;
private final String serviceName;
private final String serviceEntryPoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,28 @@ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {

var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK.
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyBroker(), prometheusExporterPort);
var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
Pair.of("service_version", PulsarVersion.getVersion()),
Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
Pair.of("service_version", PulsarVersion.getVersion()),
Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
Pair.of("service_version", PulsarVersion.getVersion()),
Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty();
});
}

Expand Down

0 comments on commit 7bfda76

Please sign in to comment.