Skip to content

Commit

Permalink
Adds RocketMQ plugin (#1449)
Browse files Browse the repository at this point in the history
Fixes 1043
  • Loading branch information
CodePrometheus authored Feb 12, 2025
1 parent 06b47b1 commit ec007eb
Show file tree
Hide file tree
Showing 20 changed files with 1,077 additions and 0 deletions.
5 changes: 5 additions & 0 deletions brave-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@
<artifactId>brave-spring-beans</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
1 change: 1 addition & 0 deletions instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Here's a brief overview of what's packaged here:
* [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/)
* [spring-webmvc](spring-webmvc/README.md) - Tracing filter and span customizing interceptors for [Spring WebMVC](https://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html)
* [vertx-web](vertx-web/README.md) - Tracing routing context handler for [Vert.x Web](http://vertx.io/docs/vertx-web/js/)
* [rocketmq-client](rocketmq-client/README.md) - Tracing Producer, MessageListenerConcurrently and MessageListenerOrderly for [Apache RocketMQ](https://github.com/apache/rocketmq/)

Here are other tools we provide for configuring or testing instrumentation:
* [http](http/README.md) - `HttpTracing` that allows portable configuration of HTTP instrumentation
Expand Down
12 changes: 12 additions & 0 deletions instrumentation/benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.rocketmq.client;

import brave.Tracing;
import brave.kafka.clients.TracingProducerBenchmarks;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;

import static org.apache.rocketmq.client.producer.SendStatus.SEND_OK;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class RocketMQProducerBenchmarks {
Message message;
DefaultMQProducer producer, tracingProducer;

@Setup(Level.Trial) public void init() {
message = new Message("zipkin", "zipkin".getBytes());
Tracing tracing = Tracing.newBuilder().build();
producer = new FakeProducer();
tracingProducer = new FakeProducer();
tracingProducer.getDefaultMQProducerImpl().registerSendMessageHook(
new TracingSendMessageHook(RocketMQTracing.newBuilder(tracing).build())
);
}

@TearDown(Level.Trial) public void close() {
Tracing.current().close();
}

@Benchmark public SendResult send_baseCase() throws Exception {
return producer.send(message);
}

@Benchmark public void send_traced() throws Exception {
tracingProducer.send(message);
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.addProfiler("gc")
.include(".*" + TracingProducerBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
}

static final class FakeProducer extends DefaultMQProducer {
@Override public SendResult send(Message msg) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("zipkin");
sendResult.setSendStatus(SEND_OK);
return sendResult;
}
}
}
1 change: 1 addition & 0 deletions instrumentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<module>kafka-streams</module>
<module>netty-codec-http</module>
<module>vertx-web</module>
<module>rocketmq-client</module>
</modules>

<!-- ${project.groupId}:brave version is set in the root pom.
Expand Down
74 changes: 74 additions & 0 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Brave RocketMQ Client instrumentation
This module provides instrumentation for rocketmq-client 5.x+ consumers and
producers.

## Setup
Setup the generic RocketMQ component like this:
```java
rocketmqTracing = RocketMQTracing.newBuilder(messagingTracing)
.remoteServiceName("my-broker")
.build();
```

## Sampling Policy
The default sampling policy is to use the default (trace ID) sampler for
producer and consumer requests.

You can use an [MessagingRuleSampler](../messaging/README.md) to override this
based on RocketMQ topic names.

Ex. Here's a sampler that traces 100 consumer requests per second, except for
the "alerts" topic. Other requests will use a global rate provided by the
`Tracing` component.

```java
import brave.sampler.Matchers;

import static brave.messaging.MessagingRequestMatchers.channelNameEquals;

messagingTracingBuilder.consumerSampler(MessagingRuleSampler.newBuilder()
.putRule(channelNameEquals("alerts"), Sampler.NEVER_SAMPLE)
.putRule(Matchers.alwaysMatch(), RateLimitingSampler.create(100))
.build());

rocketmqTracing = RocketMQTracing.create(messagingTracing);
```

## Producer

Register `brave.rocketmq.client.RocketMQTracing.newSendMessageHook()` to trace the message.

```java
Message message = new Message("zipkin", "zipkin", "zipkin".getBytes());
DefaultMQProducer producer = new DefaultMQProducer("testSend");
producer.getDefaultMQProducerImpl()
.registerSendMessageHook(producerTracing.newSendMessageHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.send(message);

producer.shutdown();
```

## Consumer

Wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
using `brave.rocketmq.client.RocketMQTracing.messageListenerOrderly(org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`,
or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.messageListenerConcurrently`
using `brave.rocketmq.client.RocketMQTracing.messageListenerConcurrently(org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`;

```java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("zipkin", "*");
MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.messageListenerConcurrently(
new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
// do something
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);

consumer.start();
```
6 changes: 6 additions & 0 deletions instrumentation/rocketmq-client/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# We use brave.internal.Nullable, but it is not used at runtime.
Import-Package: \
!brave.internal*,\
*
Export-Package: \
brave.rocketmq.client
51 changes: 51 additions & 0 deletions instrumentation/rocketmq-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0"?>
<!--
Copyright The OpenZipkin Authors
SPDX-License-Identifier: Apache-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-parent</artifactId>
<version>6.0.4-SNAPSHOT</version>
</parent>

<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<name>Brave Instrumentation: RocketMQ Client</name>

<properties>
<!-- Matches Export-Package in bnd.bnd -->
<module.name>brave.rocketmq.client</module.name>

<main.basedir>${project.basedir}/../..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-tests</artifactId>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.internal.Nullable;
import brave.messaging.MessagingRequest;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import static brave.Span.Kind.CONSUMER;
import static brave.internal.Throwables.propagateIfFatal;
import static brave.rocketmq.client.RocketMQTracing.ROCKETMQ_TOPIC;

/**
* Read records headers to create and complete a child of the incoming
* producers span if possible.
* The spans are modeled as a duration 1 {@link Span.Kind#CONSUMER} span to represent consuming the
* message from the rocketmq broker with a child span representing the processing of the message.
*/
abstract class AbstractMessageListener {
final RocketMQTracing rocketMQTracing;
final Tracing tracing;
final Tracer tracer;
final TraceContext.Extractor<MessageConsumerRequest> extractor;
final SamplerFunction<MessagingRequest> sampler;
@Nullable final String remoteServiceName;

AbstractMessageListener(RocketMQTracing rocketMQTracing) {
this.rocketMQTracing = rocketMQTracing;
this.tracing = rocketMQTracing.messagingTracing.tracing();
this.tracer = tracing.tracer();
this.extractor = rocketMQTracing.consumerExtractor;
this.sampler = rocketMQTracing.consumerSampler;
this.remoteServiceName = rocketMQTracing.remoteServiceName;
}

<T> T processConsumeMessage(
List<MessageExt> msgs,
Function<List<MessageExt>, T> consumerFunc,
BiFunction<T, T, Boolean> successFunc,
T successStatus
) {
for (MessageExt message : msgs) {
MessageConsumerRequest request = new MessageConsumerRequest(message);
TraceContextOrSamplingFlags extracted =
rocketMQTracing.extractAndClearTraceIdHeaders(extractor, request, message.getProperties());
Span consumerSpan = rocketMQTracing.nextMessagingSpan(sampler, request, extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());

if (!consumerSpan.isNoop()) {
setConsumerSpan(consumerSpan, message.getTopic());
// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}

Tracer.SpanInScope scope = tracer.withSpanInScope(listenerSpan);
Throwable error = null;
T result;

try {
result = consumerFunc.apply(msgs);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) listenerSpan.error(error);
listenerSpan.finish();
scope.close();
}

if (!successFunc.apply(result, successStatus)) {
return result;
}
}
return successStatus;
}

void setConsumerSpan(Span span, String topic) {
span.name("receive").kind(CONSUMER);
span.tag(ROCKETMQ_TOPIC, topic);
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
}
}
Loading

0 comments on commit ec007eb

Please sign in to comment.