Skip to content

Commit

Permalink
feat: Adding Open telemetry to publisher (#1427)
Browse files Browse the repository at this point in the history
* adding open telemetry dependencies to project and publisher

* added open telemetry test to publisher

* work in progress

* tests are working but not finished

* added parameterized tests

* removed unused dependency

* work in progress

* refactored to use Optional for tracer and spans

* pulling in updates from other branch

* publisher changes mostly done. Need to add tests

* work in progress

* tests are almost done

* added depedency

* fixed merge issue

* finished updating tests

* cleanup tests

* cleanup
  • Loading branch information
mmicatka authored Sep 12, 2023
1 parent 85e8996 commit dbe80d4
Show file tree
Hide file tree
Showing 4 changed files with 428 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -127,9 +128,7 @@ public class Publisher implements PublisherInterface {

private final GrpcCallContext publishContext;
private final GrpcCallContext publishContextWithCompression;

private OpenTelemetry openTelemetry;
private Tracer openTelemetryTracer;
private Optional<Tracer> tracer = Optional.empty();

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
Expand Down Expand Up @@ -214,10 +213,9 @@ private Publisher(Builder builder) throws IOException {
this.publishContextWithCompression =
GrpcCallContext.createDefault()
.withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION));
this.openTelemetry = builder.openTelemetry;
if (this.openTelemetry != null) {
if (builder.openTelemetry.isPresent()) {
// Create a tracer if we have an instance of OpenTelemetry
this.openTelemetryTracer = this.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME);
this.tracer = Optional.of(builder.openTelemetry.get().getTracer(OPEN_TELEMETRY_TRACER_NAME));
}
}

Expand Down Expand Up @@ -262,35 +260,45 @@ public String getTopicNameString() {
@Override
public ApiFuture<String> publish(PubsubMessage message) {
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");

final String orderingKey = message.getOrderingKey();
Preconditions.checkState(
orderingKey.isEmpty() || enableMessageOrdering,
"Cannot publish a message with an ordering key when message ordering is not enabled in the "
+ "Publisher client. Please create a Publisher client with "
+ "setEnableMessageOrdering(true) in the builder.");

final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));
PubsubMessageWrapper pubsubMessageWrapper =
PubsubMessageWrapper.newBuilder(this.messageTransform.apply(message))
.setTopicName(this.topicName)
.build();
pubsubMessageWrapper.startPublishSpan(this.tracer);

final OutstandingPublish outstandingPublish = new OutstandingPublish(pubsubMessageWrapper);

if (flowController != null) {
outstandingPublish.pubsubMessageWrapper.startPublishFlowControlSpan(this.tracer);
try {
flowController.acquire(outstandingPublish.messageSize);
outstandingPublish.pubsubMessageWrapper.endPublishFlowControlSpan();
} catch (FlowController.FlowControlException e) {
if (!orderingKey.isEmpty()) {
sequentialExecutor.stopPublish(orderingKey);
}
outstandingPublish.publishResult.setException(e);
outstandingPublish.pubsubMessageWrapper.setPublishFlowControlSpanException(e);
return outstandingPublish.publishResult;
}
}

List<OutstandingBatch> batchesToSend;
messagesBatchLock.lock();
try {
outstandingPublish.pubsubMessageWrapper.startPublishSchedulerSpan(this.tracer);
if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) {
outstandingPublish.publishResult.setException(
SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION);
outstandingPublish.pubsubMessageWrapper.setPublishSchedulerException(
SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION);
return outstandingPublish.publishResult;
}
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
Expand Down Expand Up @@ -463,12 +471,22 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) {
context = publishContextWithCompression;
}

int numMessagesInBatch = outstandingBatch.size();
List<PubsubMessage> pubsubMessageList = new ArrayList<PubsubMessage>(numMessagesInBatch);

for (PubsubMessageWrapper pubsubMessageWrapper : outstandingBatch.getMessageWrappers()) {
pubsubMessageWrapper.endPublishSchedulerSpan();
pubsubMessageWrapper.startPublishRpcSpan(this.tracer, numMessagesInBatch);
pubsubMessageList.add(pubsubMessageWrapper.getPubsubMessage());
}

return publisherStub
.publishCallable()
.futureCall(
PublishRequest.newBuilder()
.setTopic(topicName)
.addAllMessages(outstandingBatch.getMessages())
.addAllMessages(pubsubMessageList)
.build(),
context);
}
Expand All @@ -478,6 +496,7 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
logger.log(Level.WARNING, "Attempted to publish batch with zero messages.");
return;
}

final ApiFutureCallback<PublishResponse> futureCallback =
new ApiFutureCallback<PublishResponse>() {
@Override
Expand Down Expand Up @@ -566,10 +585,10 @@ int size() {
return outstandingPublishes.size();
}

private List<PubsubMessage> getMessages() {
List<PubsubMessage> results = new ArrayList<>(outstandingPublishes.size());
private List<PubsubMessageWrapper> getMessageWrappers() {
List<PubsubMessageWrapper> results = new ArrayList<>(outstandingPublishes.size());
for (OutstandingPublish outstandingPublish : outstandingPublishes) {
results.add(outstandingPublish.message);
results.add(outstandingPublish.pubsubMessageWrapper);
}
return results;
}
Expand All @@ -579,31 +598,36 @@ private void onFailure(Throwable t) {
if (flowController != null) {
flowController.release(outstandingPublish.messageSize);
}
outstandingPublish.pubsubMessageWrapper.setPublishRpcSpanException(t);
outstandingPublish.publishResult.setException(t);
}
}

private void onSuccess(Iterable<String> results) {
Iterator<OutstandingPublish> messagesResultsIt = outstandingPublishes.iterator();

for (String messageId : results) {
OutstandingPublish nextPublish = messagesResultsIt.next();
if (flowController != null) {
flowController.release(nextPublish.messageSize);
}
nextPublish.publishResult.set(messageId);
nextPublish.pubsubMessageWrapper.setPublishSpanMessageIdAttribute(messageId);
nextPublish.pubsubMessageWrapper.endPublishRpcSpan();
nextPublish.pubsubMessageWrapper.endPublishSpan();
}
}
}

private static final class OutstandingPublish {
final SettableApiFuture<String> publishResult;
final PubsubMessage message;
final PubsubMessageWrapper pubsubMessageWrapper;
final int messageSize;

OutstandingPublish(PubsubMessage message) {
OutstandingPublish(PubsubMessageWrapper pubsubMessageWrapper) {
this.publishResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getSerializedSize();
this.pubsubMessageWrapper = pubsubMessageWrapper;
this.messageSize = pubsubMessageWrapper.getPubsubMessage().getSerializedSize();
}
}

Expand Down Expand Up @@ -759,7 +783,7 @@ public PubsubMessage apply(PubsubMessage input) {
private boolean enableCompression = DEFAULT_ENABLE_COMPRESSION;
private long compressionBytesThreshold = DEFAULT_COMPRESSION_BYTES_THRESHOLD;

private OpenTelemetry openTelemetry;
private Optional<OpenTelemetry> openTelemetry = Optional.empty();

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
Expand Down Expand Up @@ -893,7 +917,7 @@ public static BatchingSettings getDefaultBatchingSettings() {

/** Sets the Open Telemetry Instance to allow for tracing. */
public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
this.openTelemetry = Optional.of(openTelemetry);
return this;
}

Expand Down
Loading

0 comments on commit dbe80d4

Please sign in to comment.