Skip to content

Commit

Permalink
fix codestyle
Browse files Browse the repository at this point in the history
  • Loading branch information
guyinyou committed Nov 16, 2023
1 parent 71b6ff2 commit d0c0277
Showing 1 changed file with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public String getTopicNamePrefix() {
int fetchCnt = 0;

private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt,
final String clusterName) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
final String clusterName) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException, InterruptedException {
Set<String> brokerList = cachedBrokerAddr.get(clusterName);
if (brokerList == null) {
brokerList =
Expand All @@ -113,35 +114,35 @@ private synchronized Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAd
@Override
public CompletableFuture<Void> createTopic(final String topic, final int partitions) {
return CompletableFuture.runAsync(
() -> {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setOrder(false);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}
() -> {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setOrder(false);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(partitions);
topicConfig.setWriteQueueNums(partitions);
topicConfig.setTopicName(topic);
if (Boolean.TRUE.equals(this.rmqClientConfig.batchCQ)) {
topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ");
}

try {
Set<String> brokerList =
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));
try {
Set<String> brokerList =
fetchMasterAndSlaveAddrByClusterName(
this.rmqAdmin, this.rmqClientConfig.clusterName);
topicConfig.setReadQueueNums(Math.max(1, partitions / brokerList.size()));
topicConfig.setWriteQueueNums(Math.max(1, partitions / brokerList.size()));

for (String brokerAddr : brokerList) {
this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
}
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create topic [%s] to cluster [%s]",
topic, this.rmqClientConfig.clusterName),
e);
for (String brokerAddr : brokerList) {
this.rmqAdmin.createAndUpdateTopicConfig(brokerAddr, topicConfig);
}
});
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to create topic [%s] to cluster [%s]",
topic, this.rmqClientConfig.clusterName),
e);
}
});
}

@Override
Expand Down

0 comments on commit d0c0277

Please sign in to comment.