Skip to content

Commit

Permalink
[INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903
Browse files Browse the repository at this point in the history
)
  • Loading branch information
fuweng11 authored Mar 31, 2024
1 parent 5bc955d commit c62bfff
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -304,11 +305,11 @@ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo, Integer messageCount) throws Exception {
String groupId = streamInfo.getInlongGroupId();
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(groupInfo.getInlongClusterTag(),
null, ClusterType.PULSAR);
List<ClusterInfo> pulsarClusterList =
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR);
String tenant = inlongPulsarInfo.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
tenant = pulsarCluster.getPulsarTenant();
if (StringUtils.isBlank(tenant) && CollectionUtils.isNotEmpty(pulsarClusterList)) {
tenant = ((PulsarClusterInfo) pulsarClusterList.get(0)).getPulsarTenant();
}

String namespace = groupInfo.getMqResource();
Expand All @@ -317,8 +318,14 @@ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
String clusterTag = inlongPulsarInfo.getInlongClusterTag();
String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
List<BriefMQMessage> briefMQMessages = pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs,
messageCount, streamInfo, serial);
List<BriefMQMessage> briefMQMessages = new ArrayList<>();
for (ClusterInfo clusterInfo : pulsarClusterList) {
briefMQMessages = pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo, fullTopicName, subs,
messageCount, streamInfo, serial);
if (CollectionUtils.isNotEmpty(briefMQMessages)) {
break;
}
}

// insert the consumer group info into the inlong_consume table
Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
Expand Down

0 comments on commit c62bfff

Please sign in to comment.