Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix proc_num monitor #77

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cfg.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"enabled": true,
"addr": "127.0.0.1:6030",
"interval": 60,
"timeout": 1000
"timeout": 1000,
"waitTimeout": 5000
},
"transfer": {
"enabled": true,
Expand All @@ -21,7 +22,8 @@
"127.0.0.1:8433"
],
"interval": 60,
"timeout": 1000
"timeout": 1000,
"waitTimeout": 5000
},
"http": {
"enabled": true,
Expand Down
13 changes: 5 additions & 8 deletions cron/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func syncBuiltinMetrics() {

var ports = []int64{}
var paths = []string{}
var procs = make(map[string]map[int]string)
var procs = make(map[string]*g.CacheProc)
var urls = make(map[string]string)

hostname, err := g.Hostname()
Expand Down Expand Up @@ -108,24 +108,21 @@ func syncBuiltinMetrics() {
if metric.Metric == g.PROC_NUM {
arr := strings.Split(metric.Tags, ",")

tmpMap := make(map[int]string)
proc := g.GetProc(metric.Tags)

for i := 0; i < len(arr); i++ {
if strings.HasPrefix(arr[i], "name=") {
tmpMap[1] = strings.TrimSpace(arr[i][5:])
proc.Name = strings.TrimSpace(arr[i][5:])
} else if strings.HasPrefix(arr[i], "cmdline=") {
tmpMap[2] = strings.TrimSpace(arr[i][8:])
proc.Cmdline = strings.TrimSpace(arr[i][8:])
}
}

procs[metric.Tags] = tmpMap
procs[metric.Tags] = proc
}
}

g.SetReportUrls(urls)
g.SetReportPorts(ports)
g.SetReportProcs(procs)
g.SetDuPaths(paths)

}
}
63 changes: 45 additions & 18 deletions funcs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/open-falcon/common/model"
"github.com/toolkits/nux"
"log"
"strings"
)

func ProcMetrics() (L []*model.MetricValue) {
Expand All @@ -24,33 +23,61 @@ func ProcMetrics() (L []*model.MetricValue) {

pslen := len(ps)

for tags, m := range reportProcs {
for tags, preProc := range reportProcs {
cnt := 0
pids := map[int]struct{}{}
for i := 0; i < pslen; i++ {
if is_a(ps[i], m) {
if is_a(ps[i], preProc) {
cnt++
pids[ps[i].Pid] = struct{}{}
}
}

if cnt > 0 && is_restart_proc(pids, preProc) {
//此进程发生了重启,将进程数置为0
cnt = 0
}
L = append(L, GaugeValue(g.PROC_NUM, cnt, tags))
}

return
}

func is_a(p *nux.Proc, m map[int]string) bool {
// only one kv pair
for key, val := range m {
if key == 1 {
// name
if val != p.Name {
return false
}
} else if key == 2 {
// cmdline
if !strings.Contains(p.Cmdline, val) {
return false
}
func is_restart_proc(pids map[int]struct{}, preProc *g.CacheProc) bool {
if len(pids) != len(preProc.Pids) {
//进程数目前后不一致,则认为发生重启
return true
}
if len(preProc.Pids) == 1 {
if _, ok := preProc.Pids[-1]; ok {
//此进程是新加的进程监控,无需对比两次采集的进程号
preProc.Pids = pids
return false
}
}
flag := false
for pid, _ := range pids {
if _, ok := preProc.Pids[pid]; !ok {
//由tag name、cmdline标识的同一个进程在两次采集的过程中发生了重启
flag = true
break
}
}
preProc.Pids = pids
return flag
}

func is_a(p *nux.Proc, preProc *g.CacheProc) bool {
// name
if len(preProc.Name) > 0 {
//进程监控设置了name tag
if p.Name != preProc.Name {
return false
}
}
// cmdline
if len(preProc.Cmdline) > 0 {
//进程监控设置了cmdline tag
if p.Cmdline != preProc.Cmdline {
return false
}
}
return true
Expand Down
18 changes: 10 additions & 8 deletions g/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ type PluginConfig struct {
}

type HeartbeatConfig struct {
Enabled bool `json:"enabled"`
Addr string `json:"addr"`
Interval int `json:"interval"`
Timeout int `json:"timeout"`
Enabled bool `json:"enabled"`
Addr string `json:"addr"`
Interval int `json:"interval"`
Timeout int `json:"timeout"`
WaitTimeout int `json:"waitTimeout"`
}

type TransferConfig struct {
Enabled bool `json:"enabled"`
Addrs []string `json:"addrs"`
Interval int `json:"interval"`
Timeout int `json:"timeout"`
Enabled bool `json:"enabled"`
Addrs []string `json:"addrs"`
Interval int `json:"interval"`
Timeout int `json:"timeout"`
WaitTimeout int `json:"waitTimeout"`
}

type HttpConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion g/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
// 5.0.0: 支持通过配置控制是否开启/run接口;收集udp流量数据;du某个目录的大小
// 5.1.0: 同步插件的时候不再使用checksum机制
// 5.1.1: 修复往多个transfer发送数据的时候crash的问题
// 5.1.2: 修复进程监控过程中,进程在采集周期内无法发现的问题,完善发送失败重试机制
const (
VERSION = "5.1.1"
VERSION = "5.1.2"
COLLECT_INTERVAL = time.Second
URL_CHECK_HEALTH = "url.check.health"
NET_PORT_LISTEN = "net.port.listen"
Expand Down
14 changes: 8 additions & 6 deletions g/rpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package g

import (
"errors"
"github.com/toolkits/net"
"log"
"math"
Expand All @@ -11,9 +12,10 @@ import (

type SingleConnRpcClient struct {
sync.Mutex
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
WaitTimeout time.Duration
}

func (this *SingleConnRpcClient) close() {
Expand Down Expand Up @@ -60,7 +62,6 @@ func (this *SingleConnRpcClient) Call(method string, args interface{}, reply int
return err
}

timeout := time.Duration(50 * time.Second)
done := make(chan error, 1)

go func() {
Expand All @@ -69,9 +70,10 @@ func (this *SingleConnRpcClient) Call(method string, args interface{}, reply int
}()

select {
case <-time.After(timeout):
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
case <-time.After(this.WaitTimeout):
log.Printf("[WARN] rpc call timeout %v => %v, waitTimeout [%d]", this.rpcClient, this.RpcServer, this.WaitTimeout)
this.close()
return errors.New("rpc call timeout")
case err := <-done:
if err != nil {
this.close()
Expand Down
8 changes: 5 additions & 3 deletions g/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
initTransferClient(addr)
}
if updateMetrics(addr, metrics, resp) {
break
return
}
}
log.Printf("[ERROR] send metrics to transfer FAILED.retry time [%d]", len(Config().Transfer.Addrs))
}

func initTransferClient(addr string) {
TransferClientsLock.Lock()
defer TransferClientsLock.Unlock()
TransferClients[addr] = &SingleConnRpcClient{
RpcServer: addr,
Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
RpcServer: addr,
Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
WaitTimeout: time.Duration(Config().Transfer.WaitTimeout) * time.Millisecond,
}
}

Expand Down
45 changes: 32 additions & 13 deletions g/var.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package g

import (
"github.com/open-falcon/common/model"
"github.com/toolkits/slice"
"log"
"net"
"os"
"strings"
"sync"
"time"
"net"
"github.com/open-falcon/common/model"
"github.com/toolkits/slice"
)

var Root string
Expand All @@ -24,15 +24,15 @@ func InitRootDir() {
var LocalIp string

func InitLocalIp() {
if Config().Heartbeat.Enabled {
conn, err := net.DialTimeout("tcp",Config().Heartbeat.Addr,time.Second*10)
if Config().Heartbeat.Enabled {
conn, err := net.DialTimeout("tcp", Config().Heartbeat.Addr, time.Second*10)
if err != nil {
log.Println("get local addr failed !")
}else{
LocalIp = strings.Split(conn.LocalAddr().String(),":")[0]
} else {
LocalIp = strings.Split(conn.LocalAddr().String(), ":")[0]
conn.Close()
}
}else{
} else {
log.Println("hearbeat is not enabled, can't get localip")
}
}
Expand All @@ -44,8 +44,9 @@ var (
func InitRpcClients() {
if Config().Heartbeat.Enabled {
HbsClient = &SingleConnRpcClient{
RpcServer: Config().Heartbeat.Addr,
Timeout: time.Duration(Config().Heartbeat.Timeout) * time.Millisecond,
RpcServer: Config().Heartbeat.Addr,
Timeout: time.Duration(Config().Heartbeat.Timeout) * time.Millisecond,
WaitTimeout: time.Duration(Config().Heartbeat.WaitTimeout) * time.Millisecond,
}
}
}
Expand Down Expand Up @@ -120,21 +121,39 @@ func SetDuPaths(paths []string) {
duPaths = paths
}

type CacheProc struct {
Pids map[int]struct{}
Name string
Cmdline string
}

var (
// tags => {1=>name, 2=>cmdline}
// e.g. 'name=falcon-agent'=>{1=>falcon-agent}
// e.g. 'cmdline=xx'=>{2=>xx}
reportProcs map[string]map[int]string
reportProcs map[string]*CacheProc
reportProcsLock = new(sync.RWMutex)
)

func ReportProcs() map[string]map[int]string {
func GetProc(tag string) *CacheProc {
reportProcsLock.RLock()
defer reportProcsLock.RUnlock()
if proc, ok := reportProcs[tag]; ok {
return proc
}
pids := map[int]struct{}{
-1: struct{}{},
}
return &CacheProc{Pids: pids}
}

func ReportProcs() map[string]*CacheProc {
reportProcsLock.RLock()
defer reportProcsLock.RUnlock()
return reportProcs
}

func SetReportProcs(procs map[string]map[int]string) {
func SetReportProcs(procs map[string]*CacheProc) {
reportProcsLock.Lock()
defer reportProcsLock.Unlock()
reportProcs = procs
Expand Down