From c447fe500ba48ba4fde27d97f10ef7664d09363b Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 7 Oct 2024 23:43:44 -0400 Subject: [PATCH] docs: Add OpenTelemetry samples (#2208) * docs: Add OpenTelemetry samples * docs: Fix sample file naming --- .../pubsub/OpenTelemetryPublisherExample.java | 99 +++++++++++++++++ .../OpenTelemetrySubscriberExample.java | 100 ++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/OpenTelemetryPublisherExample.java create mode 100644 samples/snippets/src/main/java/pubsub/OpenTelemetrySubscriberExample.java diff --git a/samples/snippets/src/main/java/pubsub/OpenTelemetryPublisherExample.java b/samples/snippets/src/main/java/pubsub/OpenTelemetryPublisherExample.java new file mode 100644 index 000000000..606c6fe36 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/OpenTelemetryPublisherExample.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_publish_otel_tracing] + +import com.google.api.core.ApiFuture; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.ResourceAttributes; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class OpenTelemetryPublisherExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + + openTelemetryPublisherExample(projectId, topicId); + } + + public static void openTelemetryPublisherExample(String projectId, String topicId) + throws IOException, ExecutionException, InterruptedException { + Resource resource = + Resource.getDefault().toBuilder() + .put(ResourceAttributes.SERVICE_NAME, "publisher-example") + .build(); + + // Creates a Cloud Trace exporter. + SpanExporter traceExporter = + TraceExporter.createWithConfiguration( + TraceConfiguration.builder().setProjectId(projectId).build()); + + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(SimpleSpanProcessor.create(traceExporter)) + .setSampler(Sampler.alwaysOn()) + .build(); + + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal(); + + TopicName topicName = TopicName.of(projectId, topicId); + + Publisher publisher = null; + try { + // Create a publisher instance with the created OpenTelemetry object and enabling tracing. + publisher = + Publisher.newBuilder(topicName) + .setOpenTelemetry(openTelemetry) + .setEnableOpenTelemetryTracing(true) + .build(); + + String message = "Hello World!"; + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + + // Once published, returns a server-assigned message id (unique within the topic) + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + String messageId = messageIdFuture.get(); + System.out.println("Published message ID: " + messageId); + } finally { + if (publisher != null) { + // When finished with the publisher, shutdown to free up resources. + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } +} +// [END pubsub_publish_otel_tracing] diff --git a/samples/snippets/src/main/java/pubsub/OpenTelemetrySubscriberExample.java b/samples/snippets/src/main/java/pubsub/OpenTelemetrySubscriberExample.java new file mode 100644 index 000000000..f78c38d19 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/OpenTelemetrySubscriberExample.java @@ -0,0 +1,100 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_subscribe_otel_tracing] + +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.opentelemetry.semconv.ResourceAttributes; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class OpenTelemetrySubscriberExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String subscriptionId = "your-subscription-id"; + + openTelemetrySubscriberExample(projectId, subscriptionId); + } + + public static void openTelemetrySubscriberExample(String projectId, String subscriptionId) { + Resource resource = + Resource.getDefault().toBuilder() + .put(ResourceAttributes.SERVICE_NAME, "subscriber-example") + .build(); + + // Creates a Cloud Trace exporter. + SpanExporter traceExporter = + TraceExporter.createWithConfiguration( + TraceConfiguration.builder().setProjectId(projectId).build()); + + SdkTracerProvider sdkTracerProvider = + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(SimpleSpanProcessor.create(traceExporter)) + .setSampler(Sampler.alwaysOn()) + .build(); + + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider).buildAndRegisterGlobal(); + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + consumer.ack(); + }; + + Subscriber subscriber = null; + try { + subscriber = + Subscriber.newBuilder(subscriptionName, receiver) + .setOpenTelemetry(openTelemetry) + .setEnableOpenTelemetryTracing(true) + .build(); + + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } +} + // [END pubsub_subscribe_otel_tracing]