diff --git a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java index d5e2edeb..cc54c757 100644 --- a/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java +++ b/driver-rocketmq/src/main/java/io/openmessaging/benchmark/driver/rocketmq/RocketMQBenchmarkDriver.java @@ -93,7 +93,8 @@ public String getTopicNamePrefix() { int fetchCnt = 0; private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, - final String clusterName) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + final String clusterName) throws RemotingConnectException, RemotingSendRequestException, + RemotingTimeoutException, MQBrokerException, InterruptedException { Set brokerList = cachedBrokerAddr.get(clusterName); if (brokerList == null) { brokerList = @@ -113,35 +114,35 @@ private synchronized Set fetchMasterAndSlaveAddrByClusterName(final MQAd @Override public CompletableFuture createTopic(final String topic, final int partitions) { return CompletableFuture.runAsync( - () -> { - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setOrder(false); - topicConfig.setPerm(6); - topicConfig.setReadQueueNums(partitions); - topicConfig.setWriteQueueNums(partitions); - topicConfig.setTopicName(topic); - if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { - topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); - } + () -> { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setOrder(false); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(partitions); + topicConfig.setWriteQueueNums(partitions); + topicConfig.setTopicName(topic); + if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) { + topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); + } - try { - Set brokerList = - fetchMasterAndSlaveAddrByClusterName( - this.rmqAdmin, this.rmqClientConfig.clusterName); - topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); - topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); + try { + Set brokerList = + fetchMasterAndSlaveAddrByClusterName( + this.rmqAdmin, this.rmqClientConfig.clusterName); + topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size())); + topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size())); - for (String brokerAddr : brokerList) { - this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); - } - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Failed to create topic [%s] to cluster [%s]", - topic, this.rmqClientConfig.clusterName), - e); + for (String brokerAddr : brokerList) { + this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig); } - }); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to create topic [%s] to cluster [%s]", + topic, this.rmqClientConfig.clusterName), + e); + } + }); } @Override