diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index eb7c1287d000f..7c95811faf7de 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -527,6 +527,9 @@ The Apache Software License, Version 2.0
- io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
- io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
+ - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
+ - io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
+ - io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar
BSD 3-clause "New" or "Revised" License
diff --git a/pom.xml b/pom.xml
index 1486c24bb18d1..52a638ac09f3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -251,6 +251,7 @@ flexible messaging model and an intuitive client API.
2.0.6
1.34.1
1.34.1-alpha
+ 1.32.1-alpha
1.23.1-alpha
@@ -1464,6 +1465,11 @@ flexible messaging model and an intuitive client API.
pom
import
+
+ io.opentelemetry.instrumentation
+ opentelemetry-resources
+ ${opentelemetry.instrumentation.version}
+
io.opentelemetry.semconv
opentelemetry-semconv
diff --git a/pulsar-opentelemetry/pom.xml b/pulsar-opentelemetry/pom.xml
index d8d1f6952af0c..82a9658cc9d31 100644
--- a/pulsar-opentelemetry/pom.xml
+++ b/pulsar-opentelemetry/pom.xml
@@ -50,6 +50,10 @@
io.opentelemetry
opentelemetry-sdk-extension-autoconfigure
+
+ io.opentelemetry.instrumentation
+ opentelemetry-resources
+
io.opentelemetry.semconv
opentelemetry-semconv
diff --git a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
index 558d8b12502f5..093947b02be0f 100644
--- a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
+++ b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
@@ -19,8 +19,7 @@
package org.apache.pulsar.opentelemetry;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounterBuilder;
@@ -33,11 +32,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.assertj.core.api.AbstractCharSequenceAssert;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -91,25 +91,7 @@ public void testClusterNameCannotBeEmpty() {
}
@Test
- public void testIsClusterNameSet() throws Exception {
- @Cleanup
- var reader = InMemoryMetricReader.create();
-
- @Cleanup
- var ots = OpenTelemetryService.builder().
- sdkBuilderConsumer(getSdkBuilderConsumer(reader,
- Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"))).
- clusterName("testCluster").
- build();
-
- assertThat(reader.collectAllMetrics())
- .allSatisfy(metric -> assertThat(metric)
- .hasResourceSatisfying(
- resource -> resource.hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, "testCluster")));
- }
-
- @Test
- public void testIsServiceNameAndVersionSet() throws Exception {
+ public void testResourceAttributesAreSet() throws Exception {
@Cleanup
var reader = InMemoryMetricReader.create();
@@ -127,7 +109,8 @@ public void testIsServiceNameAndVersionSet() throws Exception {
.hasResourceSatisfying(resource -> resource
.hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER, "testServiceNameAndVersion")
.hasAttribute(ResourceAttributes.SERVICE_NAME, "openTelemetryServiceTestService")
- .hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")));
+ .hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")
+ .hasAttribute(satisfies(ResourceAttributes.HOST_NAME, AbstractCharSequenceAssert::isNotBlank))));
}
@Test
@@ -200,18 +183,17 @@ public void testServiceIsDisabledByDefault() throws Exception {
meter.counterBuilder("dummyCounterD").setUnit("unit")
);
- var callbackCount = new AtomicInteger();
+ var callback = new AtomicBoolean();
// Validate that no matter how the counters are being built, they are all backed by the same underlying object.
// This ensures we conserve memory when the SDK is disabled.
- assertEquals(builders.stream().map(LongCounterBuilder::build).distinct().count(), 1);
- assertEquals(builders.stream().map(LongCounterBuilder::buildObserver).distinct().count(), 1);
- assertEquals(builders.stream().map(b -> b.buildWithCallback(__ -> callbackCount.incrementAndGet()))
- .distinct().count(), 1);
+ assertThat(builders.stream().map(LongCounterBuilder::build).distinct()).hasSize(1);
+ assertThat(builders.stream().map(LongCounterBuilder::buildObserver).distinct()).hasSize(1);
+ assertThat(builders.stream().map(b -> b.buildWithCallback(__ -> callback.set(true))).distinct()).hasSize(1);
// Validate that no metrics are being emitted at all.
- assertTrue(metricReader.collectAllMetrics().isEmpty());
+ assertThat(metricReader.collectAllMetrics()).isEmpty();
// Validate that the callback has not being called.
- assertEquals(callbackCount.get(), 0);
+ assertThat(callback).isFalse();
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 56d64ce5b2c8e..77cdc1bfd28a9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -26,6 +26,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -70,6 +71,7 @@ public abstract class PulsarContainer> exte
public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING =
Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING"));
+ @Getter
protected final String hostname;
private final String serviceName;
private final String serviceEntryPoint;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
index 2965606ccca78..38afc1f127d18 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
@@ -122,25 +122,28 @@ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK.
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
- var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyBroker(), prometheusExporterPort);
+ var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME),
- Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME),
- Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty();
});
Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
return !metrics.findByNameAndLabels(metricName,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME),
- Pair.of("service_version", PulsarVersion.getVersion())).isEmpty();
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty();
});
}