diff --git a/.gitignore b/.gitignore index 6f5c656..142245f 100644 --- a/.gitignore +++ b/.gitignore @@ -33,10 +33,12 @@ _testmain.go .DS_Store /var /graph -/falcon-graph +/falcon-graph* /cfg.json -/test +/test/*.go +/test/build -tmp/ +tmp +cfg.json gitversion -*.tar.gz +g/git.go diff --git a/README.md b/README.md index 1dbfd6f..98d4919 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ graph所做的事情,就是把用户每次push上来的数据,进行采样 ## Installation +源码安装过程,如下 + ```bash # set $GOPATH and $GOROOT @@ -26,16 +28,12 @@ go get ./... ./control start ``` -## Configuration - - pid: 绝对路径,进程启动后的pid文件,这对于graph的平滑重启很重要(如有必要,请修改) +你可以从[这里](https://github.com/open-falcon/graph/releases),下载最新的release安装包,避免源码编译的种种问题。 - log: error/warn/info/debug/trace, 默认为info - - debug: true/false, 如果为true,配合debugChecksum一起使用 - - debugChecksum: 如果debug为true,那么符合该checksum的counter,整个处理过程会在日志文件中详细打印,主要用于debug和排错 +## Configuration + debug: true/false, 是否开启debug日志 + http - enable: true/false, 表示是否开启该http端口,该端口为控制端口,主要用来对graph发送控制命令、统计命令、debug命令等 - listen: 表示监听的http端口 diff --git a/api/graph.go b/api/graph.go index 559ce35..36f5e79 100644 --- a/api/graph.go +++ b/api/graph.go @@ -2,8 +2,8 @@ package api import ( "fmt" - "log" "math" + "time" cmodel "github.com/open-falcon/common/model" cutils "github.com/open-falcon/common/utils" @@ -14,8 +14,6 @@ import ( "github.com/open-falcon/graph/store" ) -//var DropCounter int64 - type Graph int func (this *Graph) Ping(req cmodel.NullRpcRequest, resp *cmodel.SimpleRpcResponse) error { @@ -47,19 +45,20 @@ func handleItems(items []*cmodel.GraphItem) { if items[i] == nil { continue } + dsType := items[i].DsType + step := items[i].Step checksum := items[i].Checksum() + ckey := g.FormRrdCacheKey(checksum, dsType, step) //statistics proc.GraphRpcRecvCnt.Incr() - proc.RecvDataTrace.Trace(checksum, items[i]) - proc.RecvDataFilter.Filter(checksum, items[i].Value, items[i]) // To Graph - first := store.GraphItems.First(checksum) + first := store.GraphItems.First(ckey) if first != nil && items[i].Timestamp <= first.Timestamp { continue } - store.GraphItems.PushFront(checksum, items[i]) + store.GraphItems.PushFront(ckey, items[i]) // To Index index.ReceiveItem(items[i], checksum) @@ -73,140 +72,155 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe // statistics proc.GraphQueryCnt.Incr() + // form empty response resp.Values = []*cmodel.RRDData{} - dsType, step, exists := index.GetTypeAndStep(param.Endpoint, param.Counter) + resp.Endpoint = param.Endpoint + resp.Counter = param.Counter + dsType, step, exists := index.GetTypeAndStep(param.Endpoint, param.Counter) // complete dsType and step if !exists { return nil } + resp.DsType = dsType + resp.Step = step - md5 := cutils.Md5(param.Endpoint + "/" + param.Counter) - filename := fmt.Sprintf("%s/%s/%s_%s_%d.rrd", g.Config().RRD.Storage, md5[0:2], md5, dsType, step) - datas, err := rrdtool.Fetch(filename, param.ConsolFun, param.Start, param.End, step) - if err != nil { - if store.GraphItems.LenOf(md5) <= 2 { - return nil - } - // TODO not atomic, fix me - items := store.GraphItems.PopAll(md5) - size := len(items) - if size > 2 { - filename := fmt.Sprintf("%s/%s/%s_%s_%d.rrd", g.Config().RRD.Storage, md5[0:2], - md5, items[0].DsType, items[0].Step) - err := rrdtool.Flush(filename, items) - if err != nil && g.Config().Debug && g.Config().DebugChecksum == md5 { - log.Println("flush fail:", err, "filename:", filename) - } - } else { - return nil - } + start_ts := param.Start - param.Start%int64(step) + end_ts := param.End - param.End%int64(step) + int64(step) + if end_ts-start_ts-int64(step) < 1 { + return nil } - items := store.GraphItems.FetchAll(md5) - // merge - items_size := len(items) + md5 := cutils.Md5(param.Endpoint + "/" + param.Counter) + ckey := g.FormRrdCacheKey(md5, dsType, step) + filename := g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step) + // read data from rrd file + datas, _ := rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, step) datas_size := len(datas) - if items_size > 1 && datas_size > 2 && - int(datas[1].Timestamp-datas[0].Timestamp) == step && - items[items_size-1].Timestamp > datas[0].Timestamp { + // read cached items + items := store.GraphItems.FetchAll(ckey) + items_size := len(items) + + nowTs := time.Now().Unix() + lastUpTs := nowTs - nowTs%int64(step) + rra1StartTs := lastUpTs - int64(rrdtool.RRA1PointCnt*step) + // consolidated, do not merge + if start_ts < rra1StartTs { + resp.Values = datas + goto _RETURN_OK + } + // no cached items, do not merge + if items_size < 1 { + resp.Values = datas + goto _RETURN_OK + } + + // merge + { + // fmt cached items var val cmodel.JsonFloat - cache_size := int(items[items_size-1].Timestamp-items[0].Timestamp)/step + 1 - cache := make([]*cmodel.RRDData, cache_size, cache_size) + cache := make([]*cmodel.RRDData, 0) - //fix items - items_idx := 0 ts := items[0].Timestamp + itemEndTs := items[items_size-1].Timestamp + itemIdx := 0 if dsType == g.DERIVE || dsType == g.COUNTER { - for i := 0; i < cache_size; i++ { - if items_idx < items_size-1 && - ts == items[items_idx].Timestamp && - ts != items[items_idx+1].Timestamp { - val = cmodel.JsonFloat(items[items_idx+1].Value-items[items_idx].Value) / - cmodel.JsonFloat(items[items_idx+1].Timestamp-items[items_idx].Timestamp) + for ts < itemEndTs { + if itemIdx < items_size-1 && ts == items[itemIdx].Timestamp && + ts == items[itemIdx+1].Timestamp-int64(step) { + val = cmodel.JsonFloat(items[itemIdx+1].Value-items[itemIdx].Value) / cmodel.JsonFloat(step) if val < 0 { val = cmodel.JsonFloat(math.NaN()) } - items_idx++ + itemIdx++ } else { - // miss + // missing val = cmodel.JsonFloat(math.NaN()) } - cache[i] = &cmodel.RRDData{ - Timestamp: ts, - Value: val, + + if ts >= start_ts && ts <= end_ts { + cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val}) } ts = ts + int64(step) } } else if dsType == g.GAUGE { - for i := 0; i < cache_size; i++ { - if items_idx < items_size && ts == items[items_idx].Timestamp { - val = cmodel.JsonFloat(items[items_idx].Value) - items_idx++ + for ts <= itemEndTs { + if itemIdx < items_size && ts == items[itemIdx].Timestamp { + val = cmodel.JsonFloat(items[itemIdx].Value) + itemIdx++ } else { - // miss + // missing val = cmodel.JsonFloat(math.NaN()) } - cache[i] = &cmodel.RRDData{ - Timestamp: ts, - Value: val, + + if ts >= start_ts && ts <= end_ts { + cache = append(cache, &cmodel.RRDData{Timestamp: ts, Value: val}) } ts = ts + int64(step) } - } else { - log.Println("not support dstype") - return nil + } + cache_size := len(cache) + + // do merging + merged := make([]*cmodel.RRDData, 0) + if datas_size > 0 { + for _, val := range datas { + if val.Timestamp >= start_ts && val.Timestamp <= end_ts { + merged = append(merged, val) //rrdtool返回的数据,时间戳是连续的、不会有跳点的情况 + } + } } - size := int(items[items_size-1].Timestamp-datas[0].Timestamp)/step + 1 - ret := make([]*cmodel.RRDData, size, size) - cache_idx := 0 - ts = datas[0].Timestamp + if cache_size > 0 { + rrdDataSize := len(merged) + lastTs := cache[0].Timestamp - if g.Config().Debug && g.Config().DebugChecksum == md5 { - log.Println("param.start", param.Start, "param.End:", param.End, - "items:", items, "datas:", datas) - } + // find junction + rrdDataIdx := 0 + for rrdDataIdx = rrdDataSize - 1; rrdDataIdx >= 0; rrdDataIdx-- { + if merged[rrdDataIdx].Timestamp < cache[0].Timestamp { + lastTs = merged[rrdDataIdx].Timestamp + break + } + } - for i := 0; i < size; i++ { - if g.Config().Debug && g.Config().DebugChecksum == md5 { - log.Println("i", i, "size:", size, "items_idx:", items_idx, "ts:", ts) + // fix missing + for ts := lastTs + int64(step); ts < cache[0].Timestamp; ts += int64(step) { + merged = append(merged, &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())}) } - if i < datas_size { - if ts == cache[cache_idx].Timestamp { - if math.IsNaN(float64(cache[cache_idx].Value)) { - val = datas[i].Value - } else { - val = cache[cache_idx].Value + + // merge cached items to result + rrdDataIdx += 1 + for cacheIdx := 0; cacheIdx < cache_size; cacheIdx++ { + if rrdDataIdx < rrdDataSize { + if !math.IsNaN(float64(cache[cacheIdx].Value)) { + merged[rrdDataIdx] = cache[cacheIdx] } - cache_idx++ } else { - val = datas[i].Value - } - } else { - if cache_idx < cache_size && ts == cache[cache_idx].Timestamp { - val = cache[cache_idx].Value - cache_idx++ - } else { - //miss - val = cmodel.JsonFloat(math.NaN()) + merged = append(merged, cache[cacheIdx]) } + rrdDataIdx++ } - ret[i] = &cmodel.RRDData{ - Timestamp: ts, - Value: val, + } + mergedSize := len(merged) + + // fmt result + ret_size := int((end_ts - start_ts) / int64(step)) + ret := make([]*cmodel.RRDData, ret_size, ret_size) + mergedIdx := 0 + ts = start_ts + for i := 0; i < ret_size; i++ { + if mergedIdx < mergedSize && ts == merged[mergedIdx].Timestamp { + ret[i] = merged[mergedIdx] + mergedIdx++ + } else { + ret[i] = &cmodel.RRDData{Timestamp: ts, Value: cmodel.JsonFloat(math.NaN())} } - ts = ts + int64(step) + ts += int64(step) } resp.Values = ret - } else { - resp.Values = datas } - resp.Endpoint = param.Endpoint - resp.Counter = param.Counter - resp.DsType = dsType - resp.Step = step - +_RETURN_OK: // statistics proc.GraphQueryItemCnt.IncrBy(int64(len(resp.Values))) return nil @@ -237,9 +251,61 @@ func (this *Graph) Last(param cmodel.GraphLastParam, resp *cmodel.GraphLastResp) resp.Endpoint = param.Endpoint resp.Counter = param.Counter + resp.Value = GetLast(param.Endpoint, param.Counter) + + return nil +} + +func (this *Graph) LastRaw(param cmodel.GraphLastParam, resp *cmodel.GraphLastResp) error { + // statistics + proc.GraphLastRawCnt.Incr() + + resp.Endpoint = param.Endpoint + resp.Counter = param.Counter + resp.Value = GetLastRaw(param.Endpoint, param.Counter) - md5 := cutils.Md5(param.Endpoint + "/" + param.Counter) - item := store.GetLastItem(md5) - resp.Value = cmodel.NewRRDData(item.Timestamp, item.Value) return nil } + +// 非法值: ts=0,value无意义 +func GetLast(endpoint, counter string) *cmodel.RRDData { + dsType, step, exists := index.GetTypeAndStep(endpoint, counter) + if !exists { + return cmodel.NewRRDData(0, 0.0) + } + + if dsType == g.GAUGE { + return GetLastRaw(endpoint, counter) + } + + if dsType == g.COUNTER || dsType == g.DERIVE { + md5 := cutils.Md5(endpoint + "/" + counter) + items := store.GetAllItems(md5) + if len(items) < 2 { + return cmodel.NewRRDData(0, 0.0) + } + + f0 := items[0] + f1 := items[1] + delta_ts := f0.Timestamp - f1.Timestamp + delta_v := f0.Value - f1.Value + if delta_ts != int64(step) || delta_ts <= 0 { + return cmodel.NewRRDData(0, 0.0) + } + if delta_v < 0 { + // when cnt restarted, new cnt value would be zero, so fix it here + delta_v = 0 + } + + return cmodel.NewRRDData(f0.Timestamp, delta_v/float64(delta_ts)) + } + + return cmodel.NewRRDData(0, 0.0) +} + +// 非法值: ts=0,value无意义 +func GetLastRaw(endpoint, counter string) *cmodel.RRDData { + md5 := cutils.Md5(endpoint + "/" + counter) + item := store.GetLastItem(md5) + return cmodel.NewRRDData(item.Timestamp, item.Value) +} diff --git a/api/rpc.go b/api/rpc.go index aa9e62c..bad5668 100644 --- a/api/rpc.go +++ b/api/rpc.go @@ -37,20 +37,21 @@ func init() { } func Start() { - if !g.Config().Rpc.Enabled { + if !g.Config().Rpc.Enable { + log.Println("rpc.Start warning, not enable") return } addr := g.Config().Rpc.Listen tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { - log.Fatalf("net.ResolveTCPAddr fail: %s", err) + log.Fatalf("rpc.Start error, net.ResolveTCPAddr failed, %s", err) } listener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { - log.Fatalf("listen %s fail: %s", addr, err) + log.Fatalf("rpc.Start error, listen %s failed, %s", addr, err) } else { - log.Println("rpc listening", addr) + log.Println("rpc.Start ok, listening on", addr) } rpc.Register(new(Graph)) @@ -82,7 +83,7 @@ func Start() { select { case <-Close_chan: - log.Println("api recv sigout and exit...") + log.Println("rpc, recv sigout and exiting...") listener.Close() Close_done_chan <- 1 diff --git a/cfg.example.json b/cfg.example.json index f5fbdf7..e3afeae 100644 --- a/cfg.example.json +++ b/cfg.example.json @@ -1,14 +1,11 @@ { - "pid": "/home/work/open-falcon/graph/var/app.pid", - "log": "info", "debug": false, - "debugChecksum": "nil", "http": { - "enabled": true, + "enable": true, "listen": "0.0.0.0:6071" }, "rpc": { - "enabled": true, + "enable": true, "listen": "0.0.0.0:6070" }, "rrd": { diff --git a/control b/control index 12cba77..6c0cc39 100755 --- a/control +++ b/control @@ -1,9 +1,6 @@ #!/bin/bash - -WORKSPACE=$(cd $(dirname $0)/; pwd) -cd $WORKSPACE - -mkdir -p var +workspace=$(cd $(dirname $0) && pwd) +cd $workspace module=graph app=falcon-$module @@ -11,144 +8,152 @@ conf=cfg.json pidfile=var/app.pid logfile=var/app.log -[[ -f $module ]] && mv $module $app -[[ -f $conf ]] || cp cfg.example.json $conf +mkdir -p var &>/dev/null -function check_pid() { - if [ -f $pidfile ];then - pid=`cat $pidfile` - if [ -n $pid ]; then - running=`ps -p $pid|grep -v "PID TTY" |wc -l` - return $running - fi + +## build & pack +function build() { + commit=$(git log -1 --pretty=%h) + cat < ./g/git.go +package g +const ( + COMMIT = "$commit" +) +EOF + go build -o $app main.go + sc=$? + if [ $sc -ne 0 ];then + echo "build error" + exit $sc + else + echo -n "build ok, vsn=" + ./$app -v fi - return 0 } +function pack() { + build + git log -1 --pretty=%h > gitversion + version=`./$app -v` + tar zcvf $app-$version.tar.gz control $app cfg.example.json gitversion ./test/debug + rm -f gitversion &>/dev/null +} + +function packbin() { + build + git log -1 --pretty=%h > gitversion + version=`./$app -v` + tar zcvf $app-bin-$version.tar.gz $app gitversion + rm -f gitversion &>/dev/null +} + +## opt function start() { check_pid running=$? if [ $running -gt 0 ];then - echo -n "$app now is running already, pid=" + echo -n "started, pid=" cat $pidfile return 1 fi nohup ./$app -c $conf >>$logfile 2>&1 & echo $! > $pidfile - echo "$app started..., pid=$!" + echo "start ok, pid=$!" } function stop() { pid=`cat $pidfile` kill $pid - echo "$app quit..." + echo "stoped" } function shutdown() { pid=`cat $pidfile` kill -9 $pid - echo "$app kill..." -} - -function kill9() { - pid=`cat $pidfile` - kill -9 $pid - echo "$app stoped..." + echo "stoped" } function restart() { - stop - sleep 2 - start + stop + sleep 1 + start } +## other function status() { check_pid running=$? if [ $running -gt 0 ];then - echo -n "$app now is running, pid=" + echo -n "running, pid=" cat $pidfile - echo else - echo "$app is stoped" + echo "stoped" fi } -function tailf() { - tail -f $logfile -} - -function help() { - echo "$0 start|stop|restart|status|tail|test" -} - -function pack() { - build - git log -1 --pretty=%h > gitversion - version=`./$app -v` - tar zcvf $app-$version.tar.gz control $app cfg.example.json gitversion ./test/debug -} - -function packbin() { - build - git log -1 --pretty=%h > gitversion - version=`./$app -v` - tar zcvf $app-$version.tar.gz control $app gitversion +function version() { + ./$app -vg } - -function build() { - commit=$(git log -1 --pretty=%h) - cat < ./g/git.go -package g -const ( - COMMIT = "$commit" -) -EOF - go build -o $app main.go - show_version +function tailf() { + tail -f $logfile } -function show_version() { - ./$app -vg +## internal +function check_pid() { + if [ -f $pidfile ];then + pid=`cat $pidfile` + if [ -n $pid ]; then + running=`ps -p $pid|grep -v "PID TTY" |wc -l` + return $running + fi + fi + return 0 } -function test() { - pkg=$1 - if [ "X$pkg" != "X" ]; then - go test -v "github.com/open-falcon/$module/$pkg" - else - echo "test: bad args" - fi +## usage +function usage() { + echo "$0 build|pack|packbin|start|stop|restart|status|tail|version" } -if [ "$1" == "" ]; then - help -elif [ "$1" == "stop" ];then - stop -elif [ "$1" == "kill9" ];then - kill9 -elif [ "$1" == "start" ];then - start -elif [ "$1" == "restart" ];then - restart -elif [ "$1" == "status" ];then - status -elif [ "$1" == "tail" ];then - tailf -elif [ "$1" == "pack" ];then - pack -elif [ "$1" == "packbin" ];then - packbin -elif [ "$1" == "build" ];then - build -elif [ "$1" == "version" ];then - show_version -elif [ "$1" == "test" ]; then - test $2 -elif [ "$1" == "shutdown" ]; then - kill -else - help -fi +## main +action=$1 +case $action in + ## build + "build" ) + build + ;; + "pack" ) + pack + ;; + "packbin" ) + packbin + ;; + ## opt + "start" ) + start + ;; + "stop" ) + stop + ;; + "kill" ) + shutdown + ;; + "restart" ) + restart + ;; + ## other + "status" ) + status + ;; + "version" ) + version + ;; + "tail" ) + tailf + ;; + * ) + usage + ;; +esac diff --git a/g/cfg.go b/g/cfg.go index 298ef2e..adc966f 100644 --- a/g/cfg.go +++ b/g/cfg.go @@ -9,13 +9,13 @@ import ( ) type HttpConfig struct { - Enabled bool `json:"enabled"` - Listen string `json:"listen"` + Enable bool `json:"enable"` + Listen string `json:"listen"` } type RpcConfig struct { - Enabled bool `json:"enabled"` - Listen string `json:"listen"` + Enable bool `json:"enable"` + Listen string `json:"listen"` } type RRDConfig struct { @@ -28,14 +28,12 @@ type DBConfig struct { } type GlobalConfig struct { - Pid string `json:"pid"` - Log string `json:"log"` - Debug bool `json:"debug"` - DebugChecksum string `json:"debugChecksum"` - Http *HttpConfig `json:"http"` - Rpc *RpcConfig `json:"rpc"` - RRD *RRDConfig `json:"rrd"` - DB *DBConfig `json:"db"` + Pid string `json:"pid"` + Debug bool `json:"debug"` + Http *HttpConfig `json:"http"` + Rpc *RpcConfig `json:"rpc"` + RRD *RRDConfig `json:"rrd"` + DB *DBConfig `json:"db"` } var ( @@ -52,30 +50,30 @@ func Config() *GlobalConfig { func ParseConfig(cfg string) { if cfg == "" { - log.Fatalln("use -c to specify configuration file") + log.Fatalln("config file not specified: use -c $filename") } if !file.IsExist(cfg) { - log.Fatalln("config file:", cfg, "is not existent") + log.Fatalln("config file specified not found:", cfg) } ConfigFile = cfg configContent, err := file.ToTrimString(cfg) if err != nil { - log.Fatalln("read config file:", cfg, "fail:", err) + log.Fatalln("read config file", cfg, "error:", err.Error()) } var c GlobalConfig err = json.Unmarshal([]byte(configContent), &c) if err != nil { - log.Fatalln("parse config file:", cfg, "fail:", err) + log.Fatalln("parse config file", cfg, "error:", err.Error()) } + // set config configLock.Lock() defer configLock.Unlock() - config = &c - log.Println("read config file:", cfg, "successfully") + log.Println("g.ParseConfig ok, file", cfg) } diff --git a/g/db.go b/g/db.go index a228dad..cadfb87 100644 --- a/g/db.go +++ b/g/db.go @@ -23,7 +23,7 @@ func InitDB() { } dbConnMap = make(map[string]*sql.DB) - log.Println("g.InitDB, ok") + log.Println("g.InitDB ok") } func GetDbConn(connName string) (c *sql.DB, e error) { diff --git a/g/g.go b/g/g.go index 32a0049..d1d8806 100644 --- a/g/g.go +++ b/g/g.go @@ -5,13 +5,24 @@ import ( "runtime" ) -// Change Logs -// 0.5.2 use rrdlite other than rrdtool, fix data lost when query failed -// rollback to central lib, add filer for debug -// 0.5.3 mv db back to g, add rpc.last +// TODO +// change graph.store cache struct(key: md5->uuid) +// flush when query happens seems unreasonable +// shrink packages + +// CHANGE LOGS +// 0.4.8 fix filename bug emporarily, fix dirty-index-cache bug of query, +// add filter for debug +// 0.4.9 mv db back to g, add rpc.last +// 0.5.0 rm trace, add history&last api +// 0.5.1 add http interface v2, using form args +// 0.5.2 add last_raw +// 0.5.3 fix bug of last&last_raw +// 0.5.4 fix bug of Query.merge +// 0.5.5 use commom(rm model), fix sync disk const ( - VERSION = "0.5.3" + VERSION = "0.5.5" GAUGE = "GAUGE" DERIVE = "DERIVE" COUNTER = "COUNTER" diff --git a/g/git.go b/g/git.go index 9aafa9c..5353d39 100644 --- a/g/git.go +++ b/g/git.go @@ -1,4 +1,4 @@ package g const ( - COMMIT = "76cc7ec" + COMMIT = "044feaa" ) diff --git a/g/utils.go b/g/utils.go index 93e5e93..53327f9 100644 --- a/g/utils.go +++ b/g/utils.go @@ -2,9 +2,10 @@ package g import ( "fmt" - "github.com/toolkits/file" "strconv" "strings" + + "github.com/toolkits/file" ) // RRDTOOL UTILS diff --git a/http/common.go b/http/common.go index 33f22c3..ce4618e 100644 --- a/http/common.go +++ b/http/common.go @@ -1,10 +1,12 @@ package http import ( - "github.com/open-falcon/graph/g" - "github.com/toolkits/file" "net/http" "strings" + + "github.com/toolkits/file" + + "github.com/open-falcon/graph/g" ) func configCommonRoutes() { diff --git a/http/debug_http.go b/http/debug_http.go index e07ef4f..3321c64 100644 --- a/http/debug_http.go +++ b/http/debug_http.go @@ -2,15 +2,16 @@ package http import ( "fmt" + "net/http" + "strconv" + "strings" + "time" + cmodel "github.com/open-falcon/common/model" cutils "github.com/open-falcon/common/utils" "github.com/open-falcon/graph/api" "github.com/open-falcon/graph/g" "github.com/open-falcon/graph/store" - "net/http" - "strconv" - "strings" - "time" ) func configDebugRoutes() { @@ -25,8 +26,8 @@ func configDebugRoutes() { oneHourAgo := time.Now().Unix() - 3600 count := 0 - for _, checksum := range keys { - item := store.GraphItems.First(checksum) + for _, ckey := range keys { + item := store.GraphItems.First(ckey) if item == nil { continue } @@ -81,6 +82,51 @@ func configDebugRoutes() { RenderDataJson(w, "ok") }) + http.HandleFunc("/v2/api/recv", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + + if !(len(r.Form["e"]) > 0 && len(r.Form["m"]) > 0 && len(r.Form["v"]) > 0 && + len(r.Form["ts"]) > 0 && len(r.Form["step"]) > 0 && len(r.Form["type"]) > 0) { + RenderDataJson(w, "bad args") + return + } + endpoint := r.Form["e"][0] + metric := r.Form["m"][0] + value, _ := strconv.ParseFloat(r.Form["v"][0], 64) + ts, _ := strconv.ParseInt(r.Form["ts"][0], 10, 64) + step, _ := strconv.ParseInt(r.Form["step"][0], 10, 32) + dstype := r.Form["type"][0] + + tags := make(map[string]string) + if len(r.Form["t"]) > 0 { + tagstr := r.Form["t"][0] + tagVals := strings.Split(tagstr, ",") + for _, tag := range tagVals { + tagPairs := strings.Split(tag, "=") + if len(tagPairs) == 2 { + tags[tagPairs[0]] = tagPairs[1] + } + } + } + + item := &cmodel.MetaData{ + Endpoint: endpoint, + Metric: metric, + Timestamp: ts, + Step: step, + CounterType: dstype, + Value: value, + Tags: tags, + } + gitem, err := convert2GraphItem(item) + if err != nil { + RenderDataJson(w, err) + return + } + + api.HandleItems([]*cmodel.GraphItem{gitem}) + RenderDataJson(w, "ok") + }) } func convert2GraphItem(d *cmodel.MetaData) (*cmodel.GraphItem, error) { diff --git a/http/http.go b/http/http.go index 77d3cb8..7bccdad 100644 --- a/http/http.go +++ b/http/http.go @@ -72,7 +72,8 @@ func (ln TcpKeepAliveListener) Accept() (c net.Conn, err error) { } func Start() { - if !g.Config().Http.Enabled { + if !g.Config().Http.Enable { + log.Println("http.Start warning, not enable") return } diff --git a/http/index_http.go b/http/index_http.go index 0b661fa..9baa9c8 100644 --- a/http/index_http.go +++ b/http/index_http.go @@ -2,10 +2,11 @@ package http import ( "fmt" - "github.com/open-falcon/graph/index" "net/http" "strconv" "strings" + + "github.com/open-falcon/graph/index" ) func configIndexRoutes() { @@ -52,4 +53,72 @@ func configIndexRoutes() { RenderDataJson(w, "ok") }) + + // index.cached + http.HandleFunc("/index/cache/", func(w http.ResponseWriter, r *http.Request) { + urlParam := r.URL.Path[len("/index/cache/"):] + args := strings.Split(urlParam, "/") + + argsLen := len(args) + if !(argsLen == 4 || argsLen == 5) { + RenderDataJson(w, "bad args") + return + } + endpoint := args[0] + metric := args[1] + step, _ := strconv.ParseInt(args[2], 10, 32) + dstype := args[3] + tags := make(map[string]string) + if argsLen == 5 { + tagVals := strings.Split(args[4], ",") + for _, tag := range tagVals { + tagPairs := strings.Split(tag, "=") + if len(tagPairs) == 2 { + tags[tagPairs[0]] = tagPairs[1] + } + } + } + + item, err := index.GetIndexedItemCache(endpoint, metric, tags, dstype, int(step)) + if err != nil { + RenderDataJson(w, fmt.Sprintf("%v", err)) + return + } + + RenderDataJson(w, item) + }) + + http.HandleFunc("/v2/index/cache", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + + if !(len(r.Form["e"]) > 0 && len(r.Form["m"]) > 0 && len(r.Form["step"]) > 0 && len(r.Form["type"]) > 0) { + RenderDataJson(w, "bad args") + return + } + endpoint := r.Form["e"][0] + metric := r.Form["m"][0] + step, _ := strconv.ParseInt(r.Form["step"][0], 10, 32) + dstype := r.Form["type"][0] + + tags := make(map[string]string) + if len(r.Form["t"]) > 0 { + tagstr := r.Form["t"][0] + tagVals := strings.Split(tagstr, ",") + for _, tag := range tagVals { + tagPairs := strings.Split(tag, "=") + if len(tagPairs) == 2 { + tags[tagPairs[0]] = tagPairs[1] + } + } + } + + item, err := index.GetIndexedItemCache(endpoint, metric, tags, dstype, int(step)) + if err != nil { + RenderDataJson(w, fmt.Sprintf("%v", err)) + return + } + + RenderDataJson(w, item) + }) + } diff --git a/http/proc_http.go b/http/proc_http.go index b725d02..e2a37ca 100644 --- a/http/proc_http.go +++ b/http/proc_http.go @@ -1,27 +1,28 @@ package http import ( + "net/http" + "strings" + cutils "github.com/open-falcon/common/utils" "github.com/open-falcon/graph/proc" "github.com/open-falcon/graph/store" - "net/http" - "strconv" - "strings" ) func configProcRoutes() { - // statistics, TO BE DISCARDED - http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) { + // counter + http.HandleFunc("/counter/all", func(w http.ResponseWriter, r *http.Request) { RenderDataJson(w, proc.GetAll()) }) - http.HandleFunc("/counter/all", func(w http.ResponseWriter, r *http.Request) { + // TO BE DISCARDed + http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) { RenderDataJson(w, proc.GetAll()) }) - // trace, TO BE DISCARDED - http.HandleFunc("/trace/", func(w http.ResponseWriter, r *http.Request) { - urlParam := r.URL.Path[len("/trace/"):] + // items.history + http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) { + urlParam := r.URL.Path[len("/history/"):] args := strings.Split(urlParam, "/") argsLen := len(args) @@ -37,30 +38,23 @@ func configProcRoutes() { } } } - proc.RecvDataTrace.SetPK(cutils.Checksum(endpoint, metric, tags)) - RenderDataJson(w, proc.RecvDataTrace.GetAllTraced()) + RenderDataJson(w, store.GetAllItems(cutils.Checksum(endpoint, metric, tags))) }) - // filter - http.HandleFunc("/filter/", func(w http.ResponseWriter, r *http.Request) { - urlParam := r.URL.Path[len("/filter/"):] - args := strings.Split(urlParam, "/") - - argsLen := len(args) - endpoint := args[0] - metric := args[1] - opt := args[2] + http.HandleFunc("/v2/history", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() - threadholdStr := args[3] - threadhold, err := strconv.ParseFloat(threadholdStr, 64) - if err != nil { - RenderDataJson(w, "bad threadhold") + if !(len(r.Form["e"]) > 0 && len(r.Form["m"]) > 0) { + RenderDataJson(w, "bad args") return } + endpoint := r.Form["e"][0] + metric := r.Form["m"][0] tags := make(map[string]string) - if argsLen > 4 { - tagVals := strings.Split(args[4], ",") + if len(r.Form["t"]) > 0 { + tagstr := r.Form["t"][0] + tagVals := strings.Split(tagstr, ",") for _, tag := range tagVals { tagPairs := strings.Split(tag, "=") if len(tagPairs) == 2 { @@ -69,18 +63,12 @@ func configProcRoutes() { } } - err = proc.RecvDataFilter.SetFilter(cutils.Checksum(endpoint, metric, tags), opt, threadhold) - if err != nil { - RenderDataJson(w, err.Error()) - return - } - - RenderDataJson(w, proc.RecvDataFilter.GetAllFiltered()) + RenderDataJson(w, store.GetAllItems(cutils.Checksum(endpoint, metric, tags))) }) - // history - http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) { - urlParam := r.URL.Path[len("/history/"):] + // items.last + http.HandleFunc("/last/", func(w http.ResponseWriter, r *http.Request) { + urlParam := r.URL.Path[len("/last/"):] args := strings.Split(urlParam, "/") argsLen := len(args) @@ -96,20 +84,23 @@ func configProcRoutes() { } } } - RenderDataJson(w, store.GetAllItems(cutils.Checksum(endpoint, metric, tags))) + RenderDataJson(w, store.GetLastItem(cutils.Checksum(endpoint, metric, tags))) }) - // last - http.HandleFunc("/last/", func(w http.ResponseWriter, r *http.Request) { - urlParam := r.URL.Path[len("/last/"):] - args := strings.Split(urlParam, "/") + http.HandleFunc("/v2/last", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + + if !(len(r.Form["e"]) > 0 && len(r.Form["m"]) > 0) { + RenderDataJson(w, "bad args") + return + } + endpoint := r.Form["e"][0] + metric := r.Form["m"][0] - argsLen := len(args) - endpoint := args[0] - metric := args[1] tags := make(map[string]string) - if argsLen > 2 { - tagVals := strings.Split(args[2], ",") + if len(r.Form["t"]) > 0 { + tagstr := r.Form["t"][0] + tagVals := strings.Split(tagstr, ",") for _, tag := range tagVals { tagPairs := strings.Split(tag, "=") if len(tagPairs) == 2 { @@ -117,6 +108,7 @@ func configProcRoutes() { } } } + RenderDataJson(w, store.GetLastItem(cutils.Checksum(endpoint, metric, tags))) }) diff --git a/index/cache.go b/index/cache.go index 6325b77..4180aef 100644 --- a/index/cache.go +++ b/index/cache.go @@ -3,16 +3,18 @@ package index import ( "database/sql" "fmt" - cmodel "github.com/open-falcon/common/model" - cutils "github.com/open-falcon/common/utils" - "github.com/open-falcon/graph/g" - "github.com/open-falcon/graph/proc" - tcache "github.com/toolkits/cache/localcache/timedcache" "log" "strconv" "strings" "sync" "time" + + tcache "github.com/toolkits/cache/localcache/timedcache" + + cmodel "github.com/open-falcon/common/model" + cutils "github.com/open-falcon/common/utils" + "github.com/open-falcon/graph/g" + "github.com/open-falcon/graph/proc" ) const ( diff --git a/index/index.go b/index/index.go index b700dd8..e3e4837 100644 --- a/index/index.go +++ b/index/index.go @@ -1,16 +1,18 @@ package index import ( + "fmt" + "log" + cmodel "github.com/open-falcon/common/model" "github.com/open-falcon/graph/g" - "log" ) // 初始化索引功能模块 func Start() { InitCache() go StartIndexUpdateIncrTask() - log.Println("index:Start, ok") + log.Println("index.Start ok") } // index收到一条新上报的监控数据,尝试用于增量更新索引 @@ -44,3 +46,31 @@ func ReceiveItem(item *cmodel.GraphItem, md5 string) { // 缓存未命中, 放入增量更新队列 unIndexedItemCache.Put(md5, NewIndexCacheItem(uuid, item)) } + +// +func GetIndexedItemCache(endpoint string, metric string, tags map[string]string, dstype string, step int) (r *cmodel.GraphItem, rerr error) { + itemDemo := &cmodel.GraphItem{ + Endpoint: endpoint, + Metric: metric, + Tags: tags, + DsType: dstype, + Step: step, + } + md5 := itemDemo.Checksum() + uuid := itemDemo.UUID() + + cached := indexedItemCache.Get(md5) + if cached == nil { + rerr = fmt.Errorf("not found") + return + } + + icitem := cached.(*IndexCacheItem) + if icitem.UUID != uuid { + rerr = fmt.Errorf("counter found, uuid not found: bad step or type") + return + } + + r = icitem.Item + return +} diff --git a/index/index_update_all_task.go b/index/index_update_all_task.go index 7420dec..b1d3260 100644 --- a/index/index_update_all_task.go +++ b/index/index_update_all_task.go @@ -3,14 +3,16 @@ package index import ( "database/sql" "fmt" + "log" + "time" + + nsema "github.com/toolkits/concurrent/semaphore" + ntime "github.com/toolkits/time" + cmodel "github.com/open-falcon/common/model" cutils "github.com/open-falcon/common/utils" "github.com/open-falcon/graph/g" proc "github.com/open-falcon/graph/proc" - nsema "github.com/toolkits/concurrent/semaphore" - ntime "github.com/toolkits/time" - "log" - "time" ) const ( @@ -25,7 +27,6 @@ var ( // 更新一条监控数据对应的索引. 用于手动添加索引,一般情况下不会使用 func UpdateIndexOne(endpoint string, metric string, tags map[string]string, dstype string, step int) error { - log.Println("1") itemDemo := &cmodel.GraphItem{ Endpoint: endpoint, Metric: metric, @@ -47,16 +48,12 @@ func UpdateIndexOne(endpoint string, metric string, tags map[string]string, dsty } gitem := icitem.Item - log.Println("2") - dbConn, err := g.GetDbConn("UpdateIndexAllTask") if err != nil { log.Println("[ERROR] make dbConn fail", err) return err } - log.Println("3") - return updateIndexFromOneItem(gitem, dbConn) } @@ -151,7 +148,7 @@ func updateIndexFromOneItem(item *cmodel.GraphItem, conn *sql.DB) error { // endpoint表 { - sqlStr := "INSERT INTO endpoint (endpoint, ts, t_create) VALUES (?, ?, now())" + sqlDuplicateString + sqlStr := "INSERT INTO endpoint(endpoint, ts, t_create) VALUES (?, ?, now())" + sqlDuplicateString ret, err := conn.Exec(sqlStr, endpoint, ts) if err != nil { log.Println(err) @@ -167,7 +164,7 @@ func updateIndexFromOneItem(item *cmodel.GraphItem, conn *sql.DB) error { // tag_endpoint表 { - sqlStr := "INSERT INTO tag_endpoint (tag, endpoint_id, ts, t_create) VALUES (?, ?, ?, now())" + sqlDuplicateString + sqlStr := "INSERT INTO tag_endpoint(tag, endpoint_id, ts, t_create) VALUES (?, ?, ?, now())" + sqlDuplicateString for tagKey, tagVal := range item.Tags { tag := fmt.Sprintf("%s=%s", tagKey, tagVal) @@ -192,7 +189,7 @@ func updateIndexFromOneItem(item *cmodel.GraphItem, conn *sql.DB) error { counter = fmt.Sprintf("%s/%s", counter, cutils.SortedTags(item.Tags)) } - sqlStr := "INSERT INTO endpoint_counter (endpoint_id,counter,step,type,ts,t_create) VALUES (?,?,?,?,?,now())" + sqlDuplicateString + sqlStr := "INSERT INTO endpoint_counter(endpoint_id,counter,step,type,ts,t_create) VALUES (?,?,?,?,?,now())" + sqlDuplicateString ret, err := conn.Exec(sqlStr, endpointId, counter, item.Step, item.DsType, ts) if err != nil { log.Println(err) diff --git a/index/index_update_incr_task.go b/index/index_update_incr_task.go index c6ae69b..2e30105 100644 --- a/index/index_update_incr_task.go +++ b/index/index_update_incr_task.go @@ -3,14 +3,16 @@ package index import ( "database/sql" "fmt" + "log" + "time" + + nsema "github.com/toolkits/concurrent/semaphore" + ntime "github.com/toolkits/time" + cmodel "github.com/open-falcon/common/model" cutils "github.com/open-falcon/common/utils" "github.com/open-falcon/graph/g" proc "github.com/open-falcon/graph/proc" - nsema "github.com/toolkits/concurrent/semaphore" - ntime "github.com/toolkits/time" - "log" - "time" ) const ( @@ -93,7 +95,7 @@ func maybeUpdateIndexFromOneItem(item *cmodel.GraphItem, conn *sql.DB) error { proc.IndexUpdateIncrDbEndpointSelectCnt.Incr() if err == sql.ErrNoRows || endpointId <= 0 { // 数据库中也没有, insert - sqlStr := "INSERT INTO endpoint (endpoint, ts, t_create) VALUES (?, ?, now())" + sqlDuplicateString + sqlStr := "INSERT INTO endpoint(endpoint, ts, t_create) VALUES (?, ?, now())" + sqlDuplicateString ret, err := conn.Exec(sqlStr, endpoint, ts) if err != nil { log.Println(err) @@ -127,7 +129,7 @@ func maybeUpdateIndexFromOneItem(item *cmodel.GraphItem, conn *sql.DB) error { proc.IndexUpdateIncrDbTagEndpointSelectCnt.Incr() if err == sql.ErrNoRows || tagEndpointId <= 0 { - sqlStr := "INSERT INTO tag_endpoint (tag, endpoint_id, ts, t_create) VALUES (?, ?, ?, now())" + sqlDuplicateString + sqlStr := "INSERT INTO tag_endpoint(tag, endpoint_id, ts, t_create) VALUES (?, ?, ?, now())" + sqlDuplicateString ret, err := conn.Exec(sqlStr, tag, endpointId, ts) if err != nil { log.Println(err) @@ -164,7 +166,7 @@ func maybeUpdateIndexFromOneItem(item *cmodel.GraphItem, conn *sql.DB) error { proc.IndexUpdateIncrDbEndpointCounterSelectCnt.Incr() if err == sql.ErrNoRows || endpointCounterId <= 0 { - sqlStr := "INSERT INTO endpoint_counter (endpoint_id,counter,step,type,ts,t_create) VALUES (?,?,?,?,?,now())" + + sqlStr := "INSERT INTO endpoint_counter(endpoint_id,counter,step,type,ts,t_create) VALUES (?,?,?,?,?,now())" + " ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id),ts=VALUES(ts), step=VALUES(step),type=VALUES(type)" ret, err := conn.Exec(sqlStr, endpointId, counter, item.Step, item.DsType, ts) if err != nil { diff --git a/main.go b/main.go index 64ace4e..6109332 100644 --- a/main.go +++ b/main.go @@ -9,14 +9,13 @@ import ( "syscall" "github.com/open-falcon/graph/api" - "github.com/open-falcon/graph/cron" "github.com/open-falcon/graph/g" "github.com/open-falcon/graph/http" "github.com/open-falcon/graph/index" "github.com/open-falcon/graph/rrdtool" ) -func start_signal(pid int, conf g.GlobalConfig) { +func start_signal(pid int, cfg *g.GlobalConfig) { sigs := make(chan os.Signal, 1) log.Println(pid, "register signal notify") signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) @@ -28,32 +27,38 @@ func start_signal(pid int, conf g.GlobalConfig) { switch s { case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: log.Println("gracefull shut down") - http.Close_chan <- 1 - api.Close_chan <- 1 - cron.Out_done_chan <- 1 - <-http.Close_done_chan - <-api.Close_done_chan - log.Println(pid, "remove ", conf.Pid) - os.Remove(conf.Pid) + if cfg.Http.Enable { + http.Close_chan <- 1 + <-http.Close_done_chan + } + log.Println("http stop ok") + + if cfg.Rpc.Enable { + api.Close_chan <- 1 + <-api.Close_done_chan + } + log.Println("rpc stop ok") + + rrdtool.Out_done_chan <- 1 rrdtool.FlushAll() - log.Println(pid, "flush done, exiting") + log.Println("rrdtool stop ok") + + log.Println(pid, "exit") os.Exit(0) } } } func main() { - - cfg := flag.String("c", "cfg.json", "configuration file") + cfg := flag.String("c", "cfg.json", "specify config file") version := flag.Bool("v", false, "show version") - versionGit := flag.Bool("vg", false, "show version") + versionGit := flag.Bool("vg", false, "show version and git commit log") flag.Parse() if *version { fmt.Println(g.VERSION) os.Exit(0) } - if *versionGit { fmt.Println(g.VERSION, g.COMMIT) os.Exit(0) @@ -61,23 +66,17 @@ func main() { // global config g.ParseConfig(*cfg) - // init db g.InitDB() + // start rrdtool rrdtool.Start() - + // start api go api.Start() - - // 刷硬盘 - go cron.SyncDisk() - - // 索引更新2.0 + // start indexing index.Start() - - // http + // start http server go http.Start() - start_signal(os.Getpid(), *g.Config()) - + start_signal(os.Getpid(), g.Config()) } diff --git a/proc/proc.go b/proc/proc.go index a7115fa..0c4171e 100644 --- a/proc/proc.go +++ b/proc/proc.go @@ -4,16 +4,6 @@ import ( nproc "github.com/toolkits/proc" ) -// trace -var ( - RecvDataTrace = nproc.NewDataTrace("RecvDataTrace", 3) -) - -// filter -var ( - RecvDataFilter = nproc.NewDataFilter("RecvDataFilter", 3) -) - // 索引增量更新 var ( IndexUpdateIncr = nproc.NewSCounterQps("IndexUpdateIncr") @@ -57,6 +47,7 @@ var ( GraphQueryItemCnt = nproc.NewSCounterQps("GraphQueryItemCnt") GraphInfoCnt = nproc.NewSCounterQps("GraphInfoCnt") GraphLastCnt = nproc.NewSCounterQps("GraphLastCnt") + GraphLastRawCnt = nproc.NewSCounterQps("GraphLastRawCnt") GraphLoadDbCnt = nproc.NewSCounterQps("GraphLoadDbCnt") // load sth from db when query/info, tmp ) @@ -71,6 +62,7 @@ func GetAll() []interface{} { ret = append(ret, GraphQueryItemCnt.Get()) ret = append(ret, GraphInfoCnt.Get()) ret = append(ret, GraphLastCnt.Get()) + ret = append(ret, GraphLastRawCnt.Get()) ret = append(ret, GraphLoadDbCnt.Get()) // index update all diff --git a/rrdtool/rrdtool.go b/rrdtool/rrdtool.go index f0c5335..9bbab41 100644 --- a/rrdtool/rrdtool.go +++ b/rrdtool/rrdtool.go @@ -2,17 +2,17 @@ package rrdtool import ( "errors" - "fmt" "log" "math" "sync" "time" cmodel "github.com/open-falcon/common/model" - "github.com/open-falcon/graph/g" - "github.com/open-falcon/graph/store" "github.com/open-falcon/rrdlite" "github.com/toolkits/file" + + "github.com/open-falcon/graph/g" + "github.com/open-falcon/graph/store" ) var Counter uint64 @@ -23,7 +23,10 @@ func Start() { if err := file.EnsureDirRW(dataDir); err != nil { log.Fatalln("rrdtool.Start error, bad data dir", dataDir+",", err) } - log.Println("rrdtool.Start, ok") + + // sync disk + go syncDisk() + log.Println("rrdtool.Start ok") } // RRD Files' Lock @@ -50,6 +53,15 @@ var ( } ) +// RRA.Point.Size +const ( + RRA1PointCnt = 720 // 1m一个点存12h + RRA5PointCnt = 576 // 5m一个点存2d + RRA20PointCnt = 504 // 20m一个点存7d + RRA180PointCnt = 766 // 3h一个点存3month + RRA720PointCnt = 730 // 12h一个点存1year +) + func create(filename string, item *cmodel.GraphItem) error { now := time.Now() start := now.Add(time.Duration(-24) * time.Hour) @@ -60,27 +72,27 @@ func create(filename string, item *cmodel.GraphItem) error { // 设置各种归档策略 // 1分钟一个点存 12小时 - c.RRA("AVERAGE", 0.5, 1, 720) + c.RRA("AVERAGE", 0.5, 1, RRA1PointCnt) // 5m一个点存2d - c.RRA("AVERAGE", 0.5, 5, 576) - c.RRA("MAX", 0.5, 5, 576) - c.RRA("MIN", 0.5, 5, 576) + c.RRA("AVERAGE", 0.5, 5, RRA5PointCnt) + c.RRA("MAX", 0.5, 5, RRA5PointCnt) + c.RRA("MIN", 0.5, 5, RRA5PointCnt) // 20m一个点存7d - c.RRA("AVERAGE", 0.5, 20, 504) - c.RRA("MAX", 0.5, 20, 504) - c.RRA("MIN", 0.5, 20, 504) + c.RRA("AVERAGE", 0.5, 20, RRA20PointCnt) + c.RRA("MAX", 0.5, 20, RRA20PointCnt) + c.RRA("MIN", 0.5, 20, RRA20PointCnt) // 3小时一个点存3个月 - c.RRA("AVERAGE", 0.5, 180, 766) - c.RRA("MAX", 0.5, 180, 766) - c.RRA("MIN", 0.5, 180, 766) + c.RRA("AVERAGE", 0.5, 180, RRA180PointCnt) + c.RRA("MAX", 0.5, 180, RRA180PointCnt) + c.RRA("MIN", 0.5, 180, RRA180PointCnt) - // 1天一个点存5year - c.RRA("AVERAGE", 0.5, 720, 730) - c.RRA("MAX", 0.5, 720, 730) - c.RRA("MIN", 0.5, 720, 730) + // 12小时一个点存1year + c.RRA("AVERAGE", 0.5, 720, RRA720PointCnt) + c.RRA("MAX", 0.5, 720, RRA720PointCnt) + c.RRA("MIN", 0.5, 720, RRA720PointCnt) return c.Create(true) } @@ -105,8 +117,8 @@ func update(filename string, items []*cmodel.GraphItem) error { // flush to disk from memory // 最新的数据在列表的最后面 +// TODO fix me, filename fmt from item[0], it's hard to keep consistent func Flush(filename string, items []*cmodel.GraphItem) error { - if items == nil || len(items) == 0 { return errors.New("empty items") } @@ -179,50 +191,28 @@ func FlushAll() { } func FlushRRD(idx int) { - var debug_checksum string - var debug bool - storageDir := g.Config().RRD.Storage - if g.Config().Debug { - debug = true - debug_checksum = g.Config().DebugChecksum - } else { - debug = false - } keys := store.GraphItems.KeysByIndex(idx) if len(keys) == 0 { return } - for _, checksum := range keys { + for _, ckey := range keys { + // get md5, dstype, step + checksum, dsType, step, err := g.SplitRrdCacheKey(ckey) + if err != nil { + continue + } - items := store.GraphItems.PopAll(checksum) + items := store.GraphItems.PopAll(ckey) size := len(items) if size == 0 { continue } - first := items[0] - filename := fmt.Sprintf("%s/%s/%s_%s_%d.rrd", storageDir, checksum[0:2], checksum, first.DsType, first.Step) - if debug && debug_checksum == checksum { - for _, item := range items { - log.Printf( - "2-flush:%d:%s:%lf", - item.Timestamp, - time.Unix(item.Timestamp, 0).Format("2006-01-02 15:04:05"), - item.Value, - ) - } - } - - err := Flush(filename, items) - if err != nil && debug && debug_checksum == checksum { - log.Println("flush fail:", err, "filename:", filename) - } + filename := g.RrdFileName(storageDir, checksum, dsType, step) + Flush(filename, items) Counter += 1 } - if debug { - log.Println("flushrrd counter:", Counter) - } } diff --git a/cron/sync_disk.go b/rrdtool/sync_disk.go similarity index 78% rename from cron/sync_disk.go rename to rrdtool/sync_disk.go index 2f4f768..31a79de 100644 --- a/cron/sync_disk.go +++ b/rrdtool/sync_disk.go @@ -1,25 +1,22 @@ -package cron +package rrdtool import ( "log" "time" "github.com/open-falcon/graph/g" - "github.com/open-falcon/graph/rrdtool" "github.com/open-falcon/graph/store" ) var ( Out_done_chan chan int - Counter uint64 ) func init() { - Counter = 0 Out_done_chan = make(chan int, 1) } -func SyncDisk() { +func syncDisk() { time.Sleep(time.Second * 300) ticker := time.NewTicker(time.Millisecond * g.FLUSH_DISK_STEP).C var idx int = 0 @@ -28,7 +25,7 @@ func SyncDisk() { select { case <-ticker: idx = idx % store.GraphItems.Size - rrdtool.FlushRRD(idx) + FlushRRD(idx) idx += 1 case <-Out_done_chan: log.Println("cron recv sigout and exit...") diff --git a/store/history.go b/store/history.go index 6c8dbb6..4ce8aad 100644 --- a/store/history.go +++ b/store/history.go @@ -1,9 +1,10 @@ package store import ( - "github.com/open-falcon/common/model" tlist "github.com/toolkits/container/list" tmap "github.com/toolkits/container/nmap" + + cmodel "github.com/open-falcon/common/model" ) const ( @@ -11,25 +12,27 @@ const ( ) var ( + // mem: front = = = back + // time: latest ... old HistoryCache = tmap.NewSafeMap() ) -func GetLastItem(key string) *model.GraphItem { +func GetLastItem(key string) *cmodel.GraphItem { itemlist, found := HistoryCache.Get(key) if !found || itemlist == nil { - return &model.GraphItem{} + return &cmodel.GraphItem{} } first := itemlist.(*tlist.SafeListLimited).Front() if first == nil { - return &model.GraphItem{} + return &cmodel.GraphItem{} } - return first.(*model.GraphItem) + return first.(*cmodel.GraphItem) } -func GetAllItems(key string) []*model.GraphItem { - ret := make([]*model.GraphItem, 0) +func GetAllItems(key string) []*cmodel.GraphItem { + ret := make([]*cmodel.GraphItem, 0) itemlist, found := HistoryCache.Get(key) if !found || itemlist == nil { return ret @@ -40,12 +43,12 @@ func GetAllItems(key string) []*model.GraphItem { if item == nil { continue } - ret = append(ret, item.(*model.GraphItem)) + ret = append(ret, item.(*cmodel.GraphItem)) } return ret } -func AddItem(key string, val *model.GraphItem) { +func AddItem(key string, val *cmodel.GraphItem) { itemlist, found := HistoryCache.Get(key) var slist *tlist.SafeListLimited if !found { @@ -54,5 +57,10 @@ func AddItem(key string, val *model.GraphItem) { } else { slist = itemlist.(*tlist.SafeListLimited) } - slist.PushFrontViolently(val) + + // old item should be drop + first := slist.Front() + if first == nil || first.(*cmodel.GraphItem).Timestamp < val.Timestamp { // first item or latest one + slist.PushFrontViolently(val) + } } diff --git a/store/store.go b/store/store.go index f162a08..18cac47 100644 --- a/store/store.go +++ b/store/store.go @@ -3,11 +3,12 @@ package store import ( "container/list" "hash/crc32" + "log" + "sync" cmodel "github.com/open-falcon/common/model" + "github.com/open-falcon/graph/g" - "log" - "sync" ) var GraphItems *GraphItemMap @@ -51,18 +52,6 @@ func (this *GraphItemMap) Len() int { return l } -func (this *GraphItemMap) LenOf(key string) int { - this.RLock() - defer this.RUnlock() - - idx := hashKey(key) % uint32(this.Size) - L, ok := this.A[idx][key] - if !ok { - return 0 - } - return L.Len() -} - func (this *GraphItemMap) First(key string) *cmodel.GraphItem { this.RLock() defer this.RUnlock() @@ -147,12 +136,11 @@ func (this *GraphItemMap) KeysByIndex(idx int) []string { } func init() { - size32 := int32(uint32(g.CACHE_TIME / g.FLUSH_DISK_STEP)) - if size32 < 0 { - log.Panicf("store.init, bad size %d\n", size32) + size := g.CACHE_TIME / g.FLUSH_DISK_STEP + if size < 0 { + log.Panicf("store.init, bad size %d\n", size) } - size := int(size32) GraphItems = &GraphItemMap{ A: make([]map[string]*SafeLinkedList, size), Size: size, diff --git a/test/debug b/test/debug index 89820c2..3b4b742 100755 --- a/test/debug +++ b/test/debug @@ -14,8 +14,8 @@ httpprex="127.0.0.1:6071" rpcprex="127.0.0.1:6070" ## statistics -function statistics(){ - curl -s "$httpprex/statistics/all" | python -m json.tool +function counter(){ + curl -s "$httpprex/counter/all" | python -m json.tool } ## config @@ -31,35 +31,36 @@ function config(){ esac } -function index_update_all(){ - curl -s "$httpprex/index/updateAll" | python -m json.tool -} -function index_update_one(){ +function index(){ e="test.graph.endpoint.niean.1" m="test.graph.metric.1" - tags="tag0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" + t="tag0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" dt="GAUGE" step="60" - curl -s "$httpprex/index/updateOne?EndPoint=$e&Metric=$m&Tags=$tags&DsType=$dt&Step=$step" | python -m json.tool + curl -s "$httpprex/index/cache/$e/$m/$step/$dt/$t" | python -m json.tool } -## trace -function trace_recv(){ - e="test.graph.endpoint.niean.1" - m="test.graph.metric.1" - tags="tag0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" - curl -s "$httpprex/trace/$e/$m/$tags" | python -m json.tool +function index_v2(){ + e="test.graph.endpoint.niean/1" + m="test.graph.metric/1" + t="tag/0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" + dt="GAUGE" + step="60" + curl -s "$httpprex/v2/index/cache?e=$e&m=$m&t=$t&type=$dt&step=$step" | python -m json.tool } -## filter -function filter_recv(){ +function index_update_all(){ + curl -s "$httpprex/index/updateAll" | python -m json.tool +} + +function index_update_one(){ e="test.graph.endpoint.niean.1" m="test.graph.metric.1" - opt="gt" - threshhold="50.0" tags="tag0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" - curl -s "$httpprex/filter/$e/$m/$opt/$threshhold/$tags" | python -m json.tool + dt="GAUGE" + step="60" + curl -s "$httpprex/index/updateOne?EndPoint=$e&Metric=$m&Tags=$tags&DsType=$dt&Step=$step" | python -m json.tool } ## history @@ -69,6 +70,12 @@ function fetch_history(){ tags="tag0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" curl -s "$httpprex/history/$e/$m/$tags" | python -m json.tool } +function fetch_history_v2(){ + e="test.graph.endpoint.niean/1" + m="test.graph.metric/1" + tags="tag/0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" + curl -s "$httpprex/v2/history?e=$e&m=$m&t=$tags" | python -m json.tool +} ## last function fetch_last(){ @@ -90,6 +97,18 @@ function api_send(){ curl -s "$httpprex/api/recv/$e/$m/$ts/$step/$dsType/$value/$tags" | python -m json.tool } +function api_send_v2(){ + e="test.graph.endpoint.niean/1" + m="test.graph.metric/1" + ts=`date +%s` + dsType="GAUGE" + step=60 + value=`expr $ts % 60` + tags="tag/0=tag0-dummy-1,tag1=tag1-dummy-1,tag2=tag2-dummy-1" + curl -s "$httpprex/v2/api/recv?e=$e&m=$m&ts=$ts&step=$step&type=$dsType&v=$value&t=$tags" | python -m json.tool +} + + ## tail function tail_log(){ $control tail @@ -104,64 +123,9 @@ function start(){ $control start } function stop(){ - $control kill9 + $control stop } -## mock -srcname=mocktransfer -appname=$srcname-debug -builddir=$testdir/build -masrc=$testdir/$srcname.go -matarget=$builddir/$appname.bin - -function buildm(){ - rm -rf $matarget &&\ - go build -o $matarget $masrc &>/dev/null - ec=$? - [ $ec -eq 0 ] && echo -e "build mock, ok" || { echo -e "build mock, error"; exit $ec;} -} - -function cleanm(){ - rm -rf $builddir - ec=$? - [ $ec -eq 0 ] && echo -e "clean mock, ok" || { echo -e "clean mock, error"; exit $ec; } -} - -function killm(){ - pids=`ps -ef | grep $appname.bin | grep -v grep | awk '{print $2}'` - for pid in $pids - do - kill -9 $pid &>/dev/null - echo -e "kill mock, $pid" - sleep 0.01 - done - echo -e "kill mock, ok" -} - -function startm(){ - cnt=$1 - if [ "X$cnt" == "X" ];then - cnt=1 - fi - - rpc=$2 - if [ "X$rpc" == "X" ];then - rpc=$rpcprex - fi - - step=$3 - if [ "X$step" == "X" ];then - step=0 - fi - - for i in `seq 1 $cnt` - do - id=$rpc.$step.`date +%s`.$i - $matarget -h $rpc -s $step -i 100 &> $builddir/malog.$id.log & - echo -e "start mock, $id" - sleep 0.2 - done -} action=$1 case $action in @@ -180,44 +144,41 @@ case $action in "config") config $2 ;; + "index") + index + ;; + "index2") + index_v2 + ;; "index_update_all") index_update_all ;; "index_update_one") index_update_one ;; - "trace") - trace_recv - ;; "filter") filter_recv ;; "history") fetch_history ;; + "history2") + fetch_history_v2 + ;; "last") fetch_last ;; "send") api_send ;; + "send2") + api_send_v2 + ;; "tail") tail_log ;; - "startm") - startm $2 $3 $4 - ;; - "killm") - killm - ;; - "cleanm") - cleanm - ;; - "buildm") - buildm - ;; *) - statistics + counter ;; esac diff --git a/test/graph.list b/test/graph.list new file mode 100644 index 0000000..d7c76d7 --- /dev/null +++ b/test/graph.list @@ -0,0 +1,3 @@ +test.hostname01:6071 +test.hostname02:6071 +test.hostname03:6071 diff --git a/test/http.recv.history b/test/http.recv.history new file mode 100755 index 0000000..46e0fb7 --- /dev/null +++ b/test/http.recv.history @@ -0,0 +1,13 @@ +#!/bin/bash +e=$1 +m=$2 +tags=$3 + +host_file=./graph.list +for i in `cat $host_file`; +do + printf "%s\n" $i + curl -s "$i/history/$e/$m/$tags" + printf "\n" + sleep 0.1 +done