Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds RocketMQ plugin #1449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions brave-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@
<artifactId>brave-spring-beans</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
1 change: 1 addition & 0 deletions instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Here's a brief overview of what's packaged here:
* [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/)
* [spring-webmvc](spring-webmvc/README.md) - Tracing filter and span customizing interceptors for [Spring WebMVC](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html)
* [vertx-web](vertx-web/README.md) - Tracing routing context handler for [Vert.x Web](http://vertx.io/docs/vertx-web/js/)
* [rocketmq-client](rocketmq-client/README.md) - Tracing Producer, MessageListenerConcurrently and MessageListenerOrderly for [Apache RocketMQ](https://github.com/apache/rocketmq/)

Here are other tools we provide for configuring or testing instrumentation:
* [http](http/README.md) - `HttpTracing` that allows portable configuration of HTTP instrumentation
Expand Down
12 changes: 12 additions & 0 deletions instrumentation/benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.rocketmq.client;

import brave.Tracing;
import brave.kafka.clients.TracingProducerBenchmarks;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;

import static org.apache.rocketmq.client.producer.SendStatus.SEND_OK;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class RocketMQProducerBenchmarks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add an example of this run in the PR desc in triple backtick

Message message;
DefaultMQProducer producer, tracingProducer;

@Setup(Level.Trial) public void init() {
message = new Message("zipkin", "zipkin".getBytes());
Tracing tracing = Tracing.newBuilder().build();
producer = new FakeProducer();
tracingProducer = new FakeProducer();
tracingProducer.getDefaultMQProducerImpl().registerSendMessageHook(
new TracingSendMessageHook(RocketMQTracing.newBuilder(tracing).build())
);
}

@TearDown(Level.Trial) public void close() {
Tracing.current().close();
}

@Benchmark public SendResult send_baseCase() throws Exception {
return producer.send(message);
}

@Benchmark public void send_traced() throws Exception {
tracingProducer.send(message);
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.addProfiler("gc")
.include(".*" + TracingProducerBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
}

static final class FakeProducer extends DefaultMQProducer {
@Override public SendResult send(Message msg) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("zipkin");
sendResult.setSendStatus(SEND_OK);
return sendResult;
}
}
}
1 change: 1 addition & 0 deletions instrumentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<module>kafka-streams</module>
<module>netty-codec-http</module>
<module>vertx-web</module>
<module>rocketmq-client</module>
</modules>

<!-- ${project.groupId}:brave version is set in the root pom.
Expand Down
74 changes: 74 additions & 0 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Brave RocketMQ Client instrumentation
This module provides instrumentation for rocketmq-client 5.x+ consumers and
producers.

## Setup
Setup the generic RocketMQ component like this:
```java
rocketmqTracing = RocketMQTracing.newBuilder(messagingTracing)
.remoteServiceName("my-broker")
.build();
```

## Sampling Policy
The default sampling policy is to use the default (trace ID) sampler for
producer and consumer requests.

You can use an [MessagingRuleSampler](../messaging/README.md) to override this
based on RocketMQ topic names.

Ex. Here's a sampler that traces 100 consumer requests per second, except for
the "alerts" topic. Other requests will use a global rate provided by the
`Tracing` component.

```java
import brave.sampler.Matchers;

import static brave.messaging.MessagingRequestMatchers.channelNameEquals;

messagingTracingBuilder.consumerSampler(MessagingRuleSampler.newBuilder()
.putRule(channelNameEquals("alerts"), Sampler.NEVER_SAMPLE)
.putRule(Matchers.alwaysMatch(), RateLimitingSampler.create(100))
.build());

rocketmqTracing = RocketMQTracing.create(messagingTracing);
```

## Producer

Register `brave.rocketmq.client.RocketMQTracing.newSendMessageHook()` to trace the message.

```java
Message message = new Message("zipkin", "zipkin", "zipkin".getBytes());
DefaultMQProducer producer = new DefaultMQProducer("testSend");
producer.getDefaultMQProducerImpl()
.registerSendMessageHook(producerTracing.newSendMessageHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.send(message);

producer.shutdown();
```

## Consumer

Wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
using `brave.rocketmq.client.RocketMQTracing.messageListenerOrderly(org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`,
or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.messageListenerConcurrently`
using `brave.rocketmq.client.RocketMQTracing.messageListenerConcurrently(org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`;

```java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("zipkin", "*");
MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.messageListenerConcurrently(
new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
// do something
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);

consumer.start();
```
6 changes: 6 additions & 0 deletions instrumentation/rocketmq-client/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# We use brave.internal.Nullable, but it is not used at runtime.
Import-Package: \
!brave.internal*,\
*
Export-Package: \
brave.rocketmq.client
51 changes: 51 additions & 0 deletions instrumentation/rocketmq-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0"?>
<!--
Copyright The OpenZipkin Authors
SPDX-License-Identifier: Apache-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-parent</artifactId>
<version>6.0.4-SNAPSHOT</version>
</parent>

<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<name>Brave Instrumentation: RocketMQ Client</name>

<properties>
<!-- Matches Export-Package in bnd.bnd -->
<module.name>brave.rocketmq.client</module.name>

<main.basedir>${project.basedir}/../..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-tests</artifactId>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.internal.Nullable;
import brave.messaging.MessagingRequest;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import static brave.Span.Kind.CONSUMER;
import static brave.internal.Throwables.propagateIfFatal;
import static brave.rocketmq.client.RocketMQTracing.ROCKETMQ_TOPIC;

/**
* Read records headers to create and complete a child of the incoming
* producers span if possible.
* The spans are modeled as a duration 1 {@link Span.Kind#CONSUMER} span to represent consuming the
* message from the rocketmq broker with a child span representing the processing of the message.
*/
abstract class AbstractMessageListener {
final RocketMQTracing rocketMQTracing;
final Tracing tracing;
final Tracer tracer;
final TraceContext.Extractor<MessageConsumerRequest> extractor;
final SamplerFunction<MessagingRequest> sampler;
@Nullable final String remoteServiceName;

AbstractMessageListener(RocketMQTracing rocketMQTracing) {
this.rocketMQTracing = rocketMQTracing;
this.tracing = rocketMQTracing.messagingTracing.tracing();
this.tracer = tracing.tracer();
this.extractor = rocketMQTracing.consumerExtractor;
this.sampler = rocketMQTracing.consumerSampler;
this.remoteServiceName = rocketMQTracing.remoteServiceName;
}

<T> T processConsumeMessage(
List<MessageExt> msgs,
Function<List<MessageExt>, T> consumerFunc,
BiFunction<T, T, Boolean> successFunc,
T successStatus
) {
for (MessageExt message : msgs) {
MessageConsumerRequest request = new MessageConsumerRequest(message);
TraceContextOrSamplingFlags extracted =
rocketMQTracing.extractAndClearTraceIdHeaders(extractor, request, message.getProperties());
Span consumerSpan = rocketMQTracing.nextMessagingSpan(sampler, request, extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());

if (!consumerSpan.isNoop()) {
setConsumerSpan(consumerSpan, message.getTopic());
// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}

Tracer.SpanInScope scope = tracer.withSpanInScope(listenerSpan);
Throwable error = null;
T result;

try {
result = consumerFunc.apply(msgs);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) listenerSpan.error(error);
listenerSpan.finish();
scope.close();
}

if (!successFunc.apply(result, successStatus)) {
return result;
}
}
return successStatus;
}

void setConsumerSpan(Span span, String topic) {
span.name("receive").kind(CONSUMER);
span.tag(ROCKETMQ_TOPIC, topic);
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
}
}
Loading