diff --git a/consistence/coordinator_rpc.go b/consistence/coordinator_rpc.go index 025e65c1..5ac7e88b 100644 --- a/consistence/coordinator_rpc.go +++ b/consistence/coordinator_rpc.go @@ -577,7 +577,7 @@ func (self *NsqdCoordRpcServer) GetTopicStats(topic string) *NodeTopicStats { // all topic status topicStats = self.nsqdCoord.localNsqd.GetStats(false, true) } else { - topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, true) + topicStats = self.nsqdCoord.localNsqd.GetTopicStatsWithFilter(false, topic, "", true) } stat := NewNodeTopicStats(self.nsqdCoord.myNode.GetID(), len(topicStats)*2, runtime.NumCPU()) for _, ts := range topicStats { diff --git a/internal/clusterinfo/data.go b/internal/clusterinfo/data.go index 2d3b6352..8f8e85cf 100644 --- a/internal/clusterinfo/data.go +++ b/internal/clusterinfo/data.go @@ -977,11 +977,8 @@ func (c *ClusterInfo) getNSQDStats(producers Producers, selectedTopic string, so addr := p.HTTPAddress() endpoint := fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t&needClients=%t", addr, leaderOnly, needClient) - if !needClient { - endpoint = fmt.Sprintf("http://%s/stats?format=json&leaderOnly=%t", addr, leaderOnly) - } if selectedTopic != "" { - endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t", addr, selectedTopic, leaderOnly) + endpoint = fmt.Sprintf("http://%s/stats?format=json&topic=%s&leaderOnly=%t&needClients=%t", addr, selectedTopic, leaderOnly, needClient) } c.logf("CI: querying nsqd %s", endpoint) diff --git a/internal/clusterinfo/types.go b/internal/clusterinfo/types.go index 5b8ca412..94b98ec2 100644 --- a/internal/clusterinfo/types.go +++ b/internal/clusterinfo/types.go @@ -140,11 +140,13 @@ type TopicStats struct { MessageCount int64 `json:"message_count"` NodeStats []*TopicStats `json:"nodes"` Channels []*ChannelStats `json:"channels"` + ChannelNum int64 `json:"channel_num"` TotalChannelDepth int64 `json:"total_channel_depth"` Paused bool `json:"paused"` HourlyPubSize int64 `json:"hourly_pubsize"` PartitionHourlyPubSize []int64 `json:"partition_hourly_pubsize"` Clients []ClientPubStats `json:"client_pub_stats"` + ClientNum int64 `json:"client_num"` MessageSizeStats [16]int64 `json:"msg_size_stats"` MessageLatencyStats [16]int64 `json:"msg_write_latency_stats"` @@ -224,6 +226,7 @@ type ChannelStats struct { Selected bool `json:"-"` NodeStats []*ChannelStats `json:"nodes"` Clients []*ClientStats `json:"clients"` + ClientNum int64 `json:"client_num"` Paused bool `json:"paused"` Skipped bool `json:"skipped"` ZanTestSkipped bool `json:"zan_test_skipped"` diff --git a/internal/http_api/api_request.go b/internal/http_api/api_request.go index 12f90a6c..c12d9638 100644 --- a/internal/http_api/api_request.go +++ b/internal/http_api/api_request.go @@ -47,7 +47,7 @@ type Client struct { } func NewClient(tlsConfig *tls.Config) *Client { - transport := NewDeadlineTransport(5 * time.Second) + transport := NewDeadlineTransport(15 * time.Second) transport.TLSClientConfig = tlsConfig return &Client{ c: &http.Client{ diff --git a/nsqadmin/http.go b/nsqadmin/http.go index 74fdbb00..a8bf3fc1 100644 --- a/nsqadmin/http.go +++ b/nsqadmin/http.go @@ -495,7 +495,7 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h s.ctx.nsqadmin.logf("WARNING: %s", err) messages = append(messages, pe.Error()) } - topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "partition", true) + topicStats, _, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { @@ -587,7 +587,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps s.ctx.nsqadmin.logf("WARNING: %s", err) messages = append(messages, pe.Error()) } - _, allChannelStats, err := s.ci.GetNSQDStats(producers, topicName, "partition", true) + _, allChannelStats, err := s.ci.GetNSQDStatsWithClients(producers, topicName, "partition", true) if err != nil { pe, ok := err.(clusterinfo.PartialErr) if !ok { @@ -662,7 +662,11 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht var totalMessages int64 for _, ts := range topicStats { for _, cs := range ts.Channels { - totalClients += int64(len(cs.Clients)) + if len(cs.Clients) != 0 { + totalClients += int64(len(cs.Clients)) + } else { + totalClients += int64(cs.ClientNum) + } } totalMessages += ts.MessageCount } diff --git a/nsqd/stats.go b/nsqd/stats.go index a6af0922..8e9b76d6 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -27,6 +27,7 @@ type TopicStats struct { TopicFullName string `json:"topic_full_name"` TopicPartition string `json:"topic_partition"` Channels []ChannelStats `json:"channels"` + ChannelNum int64 `json:"channel_num"` Depth int64 `json:"depth"` BackendDepth int64 `json:"backend_depth"` BackendStart int64 `json:"backend_start"` @@ -62,6 +63,7 @@ func NewTopicStats(t *Topic, channels []ChannelStats, filterClients bool) TopicS TopicFullName: t.GetFullName(), TopicPartition: strconv.Itoa(t.GetTopicPart()), Channels: channels, + ChannelNum: int64(len(channels)), Depth: t.TotalDataSize(), BackendDepth: t.TotalDataSize(), BackendStart: t.GetQueueReadStart(), @@ -260,10 +262,10 @@ func (n *NSQD) GetStats(leaderOnly bool, filterClients bool) []TopicStats { } n.RUnlock() - return n.getTopicStats(realTopics, filterClients) + return n.getTopicStats(realTopics, "", filterClients) } -func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicStats { +func (n *NSQD) getTopicStats(realTopics []*Topic, ch string, filterClients bool) []TopicStats { sort.Sort(TopicsByName{realTopics}) topics := make([]TopicStats, 0, len(realTopics)) for _, t := range realTopics { @@ -282,9 +284,11 @@ func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicSta if filterClients { clients = nil } else { - clients = make([]ClientStats, 0, len(c.clients)) - for _, client := range c.clients { - clients = append(clients, client.Stats()) + if len(ch) == 0 || c.name == ch { + clients = make([]ClientStats, 0, len(c.clients)) + for _, client := range c.clients { + clients = append(clients, client.Stats()) + } } } c.RUnlock() @@ -295,7 +299,7 @@ func (n *NSQD) getTopicStats(realTopics []*Topic, filterClients bool) []TopicSta return topics } -func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClients bool) []TopicStats { +func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, ch string, filterClients bool) []TopicStats { n.RLock() realTopics := make([]*Topic, 0, len(n.topicMap)) for name, topicParts := range n.topicMap { @@ -310,11 +314,11 @@ func (n *NSQD) GetTopicStatsWithFilter(leaderOnly bool, topic string, filterClie } } n.RUnlock() - return n.getTopicStats(realTopics, filterClients) + return n.getTopicStats(realTopics, ch, filterClients) } func (n *NSQD) GetTopicStats(leaderOnly bool, topic string) []TopicStats { - return n.GetTopicStatsWithFilter(leaderOnly, topic, false) + return n.GetTopicStatsWithFilter(leaderOnly, topic, "", false) } type DetailStatsInfo struct { diff --git a/nsqdserver/context.go b/nsqdserver/context.go index 9aae75b5..d5935e77 100644 --- a/nsqdserver/context.go +++ b/nsqdserver/context.go @@ -93,9 +93,9 @@ func (c *context) setHealth(err error) { c.nsqd.SetHealth(err) } -func (c *context) getStats(leaderOnly bool, selectedTopic string, filterClients bool) []nsqd.TopicStats { +func (c *context) getStats(leaderOnly bool, selectedTopic string, ch string, filterClients bool) []nsqd.TopicStats { if selectedTopic != "" { - return c.nsqd.GetTopicStats(leaderOnly, selectedTopic) + return c.nsqd.GetTopicStatsWithFilter(leaderOnly, selectedTopic, ch, filterClients) } return c.nsqd.GetStats(leaderOnly, filterClients) } diff --git a/nsqdserver/http.go b/nsqdserver/http.go index 72b60f69..f2907594 100644 --- a/nsqdserver/http.go +++ b/nsqdserver/http.go @@ -1062,7 +1062,7 @@ func (s *httpServer) doMessageHistoryStats(w http.ResponseWriter, req *http.Requ } if topicName == "" && topicPartStr == "" { - topicStats := s.ctx.getStats(true, "", true) + topicStats := s.ctx.getStats(true, "", "", true) var topicHourlyPubStatsList []*clusterinfo.NodeHourlyPubsize for _, topicStat := range topicStats { partitionNum, err := strconv.Atoi(topicStat.TopicPartition) @@ -1361,8 +1361,12 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro jsonFormat := formatString == "json" filterClients := needClients != "true" + if topicName != "" && needClients == "" { + // compatible with old, we always return clients if topic is specified and needClients is not specified + filterClients = false + } - stats := s.ctx.getStats(leaderOnly, topicName, filterClients) + stats := s.ctx.getStats(leaderOnly, topicName, channelName, filterClients) health := s.ctx.getHealth() startTime := s.ctx.getStartTime() uptime := time.Since(startTime)