Skip to content

Commit

Permalink
feat(broker): introduce continuous profiling (#592)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 7, 2023
1 parent d58e555 commit 52a7a0a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
6 changes: 5 additions & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>

Check failure on line 67 in broker/pom.xml

View workflow job for this annotation

GitHub Actions / qodana

Check dependency licenses

'agent' ('0.12.2') 'MPL-2.0' license is in the prohibited licenses list with project license 'Apache-2.0'
<groupId>io.pyroscope</groupId>
<artifactId>agent</artifactId>
<version>0.12.2</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
import com.automq.rocketmq.store.MessageStoreBuilder;
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.api.MessageStore;
import io.pyroscope.http.Format;
import io.pyroscope.javaagent.EventType;
import io.pyroscope.javaagent.PyroscopeAgent;
import io.pyroscope.javaagent.config.Config;
import io.pyroscope.labels.Pyroscope;
import java.util.Map;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +84,6 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
DataStore dataStore = new DataStoreFacade(messageStore.streamStore(), messageStore.s3ObjectOperator(), messageStore.topicQueueManager());
metadataStore.setDataStore(dataStore);


serviceManager = new DefaultServiceManager(brokerConfig, proxyMetadataService, dlqService, messageStore);
messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager);

Expand All @@ -88,6 +93,16 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
remotingServer = new RemotingProtocolServer(messagingProcessor);

metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor);

Pyroscope.setStaticLabels(Map.of("broker", brokerConfig.name()));
PyroscopeAgent.start(
new Config.Builder()
.setApplicationName("automq-for-rocketmq")
.setProfilingEvent(EventType.ITIMER)
.setFormat(Format.JFR)
.setServerAddress("http://10.129.193.87:4040")
.build()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import apache.rocketmq.controller.v1.SubscriptionMode;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.common.config.ProxyConfig;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.common.util.CommonUtil;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.exception.ProxyException;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
Expand Down Expand Up @@ -348,7 +348,7 @@ private CompletableFuture<InnerPopResult> popSpecifiedQueue(ConsumerGroup consum
return popSpecifiedQueueUnsafe(consumerGroup, topic, queueId, filter, batchSize, fifo, invisibleDuration)
.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS)
.whenComplete((v, throwable) -> {
// TODO: log exception.
LOGGER.error("Error while pop message from topic: {}, queue: {}, batch size: {}.", topic.getName(), queueId, batchSize, throwable);
// Release lock since complete or timeout.
lockService.release(topicId, queueId);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public static MessageExt convertTo(FlatMessageExt flatMessage, String topicName,
// Re-consume times is the number of delivery attempts minus 1.
messageExt.setReconsumeTimes(systemProperties.deliveryAttempts() - 1);

if (systemProperties.deliveryTimestamp() > 0) {
messageExt.setDeliverTimeMs(systemProperties.deliveryTimestamp());
}

KeyValue.Vector propertiesVector = flatMessage.message().userPropertiesVector();
for (int i = 0; i < propertiesVector.length(); i++) {
KeyValue keyValue = propertiesVector.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ CompletableFuture<Void> takeSnapshot(SnapshotTask task) {
.thenComposeAsync(appendResult -> {
// trim operation stream
return streamStore.trim(operationStreamId, snapshot.getSnapshotEndOffset() + 1);
}, backgroundExecutor).thenAcceptAsync(nil -> {
}, backgroundExecutor)
.thenAcceptAsync(nil -> {
// complete snapshot task
task.completeSuccess(snapshot.getSnapshotEndOffset() + 1);
}, backgroundExecutor);
Expand Down

0 comments on commit 52a7a0a

Please sign in to comment.