Skip to content

Commit

Permalink
add pfc
Browse files Browse the repository at this point in the history
  • Loading branch information
niean committed Jan 26, 2016
1 parent 2d50434 commit c342de9
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 6 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ _testmain.go

*_test.go
*.bak
*.bin
*.log
/var
/falcon-hbs-proxy*
/cfg.json
/test/bench
/test/bench/build
.gitversion
3 changes: 2 additions & 1 deletion http/proc_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/open-falcon/hbs-proxy/g"
"github.com/open-falcon/hbs-proxy/proxy"
)

func configProcHttpRoutes() {
Expand All @@ -16,6 +17,6 @@ func configProcHttpRoutes() {
})

http.HandleFunc("/proc/hbs/pools", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, nil)
RenderDataJson(w, proxy.ConnPools.Proc())
})
}
11 changes: 7 additions & 4 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,20 @@ func proxy(method string, args interface{}, reply interface{}) error {
addr := HbsMap[host]

// 过滤掉建连缓慢的host, 否则会严重影响发送速率
cc := pfc.GetCounterCount(host)
key := addr + "." + method
cc := pfc.GetCounterCount(key)
if cc >= HbsMaxConns {
continue
}

pfc.Counter(host, 1)
pfc.Counter(key, 1)
err = ConnPools.Call(addr, method, args, reply)
pfc.Counter(host, -1)
pfc.Counter(key, -1)

if err == nil {
pfc.Meter(key+".ok", 1)
sendOk = true
} else {
pfc.Meter(key+".error", 1)
}
}
return err
Expand Down
51 changes: 51 additions & 0 deletions test/bench/mock
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash
## test home
testdir=$(cd $(dirname $0)/; pwd)
## word home
workdir=$(dirname $testdir)
cd $workdir

## mockagent
srcname=mockagent
appname=$srcname-bench-test
builddir=$testdir/build
masrc=$testdir/$srcname.go
matarget=$builddir/$appname.bin

function build(){
rm -rf $matarget &&\
go build -o $matarget $masrc
ec=$?
[ $ec -eq 0 ] && echo -e "mockagent build, ok" || { echo -e "mockagent build, error"; exit $ec;}
}

function stop(){
pids=`ps -ef | grep $appname.bin | grep -v grep | awk '{print $2}'`
for pid in $pids
do
kill -9 $pid &>/dev/null
echo -e "kill mockagent, $pid"
sleep 0.01
done
echo -e "kill mockagent ok"
}

function start(){
$matarget "-m=$1"
}

action=$1
case $action in
"build")
build
;;
"start")
start $2
;;
"stop")
stop
;;
*)
build && start $1 && stop
esac

179 changes: 179 additions & 0 deletions test/bench/mockagent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package main

import (
"flag"
"fmt"
"github.com/toolkits/net"
"log"
"math"
"net/rpc"
"sync"
"time"

"github.com/open-falcon/common/model"
)

const (
hbsAddr = "127.0.0.1:6030"
timeout = time.Duration(10) * time.Second
)

var (
hbsClient = &SingleConnRpcClient{RpcServer: hbsAddr, Timeout: timeout}
rpcs = map[string]func(){"TrustableIps": trustable_ips, "BuiltinMetrics": builtin_metrics, "MinePlugins": mine_plugins, "ReportStatus": report_status}
)

func main() {
// flag
method := flag.String("m", "TrustableIps", "ticker interval")
flag.Parse()

if f, ok := rpcs[*method]; ok {
fmt.Println("[mockagent] method:", *method)
f()
return
} else {
fmt.Println("[mockagent] all methods")
for _, f := range rpcs {
f()
time.Sleep(time.Second)
}
}

fmt.Println("[mockagent] done")
}

// rpc
func trustable_ips() {
var ips string
err := hbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
if err != nil {
fmt.Println("[mockagent] TrustableIps error:", err)
} else {
fmt.Println("[mockagent] TrustableIps:", ips)
}
}

func builtin_metrics() {
hostname := "work"
req := model.AgentHeartbeatRequest{
Hostname: hostname,
Checksum: "abc123",
}

var resp model.BuiltinMetricResponse
err := hbsClient.Call("Agent.BuiltinMetrics", req, &resp)
if err != nil {
fmt.Println("[mockagent] BuiltinMetrics error:", err)
} else {
fmt.Println("[mockagent] BuiltinMetrics:", resp)
}
}

func mine_plugins() {
hostname := "work"
req := model.AgentHeartbeatRequest{
Hostname: hostname,
}

var resp model.AgentPluginsResponse
err := hbsClient.Call("Agent.MinePlugins", req, &resp)
if err != nil {
fmt.Println("[mockagent] MinePlugins error:", err)
} else {
fmt.Println("[mockagent] MinePlugins:", resp)
}
}

func report_status() {
hostname := "work"
req := model.AgentReportRequest{
Hostname: hostname,
IP: "128.1.1.2",
AgentVersion: "v.test",
PluginVersion: "/home/to/plugin",
}

var resp model.SimpleRpcResponse
err := hbsClient.Call("Agent.ReportStatus", req, &resp)

if err != nil || resp.Code != 0 {
fmt.Println("[mockagent] ReportStatus error:", err, resp)
} else {
fmt.Println("[mockagent] ReportStatus:", resp)
}
}

// connection
type SingleConnRpcClient struct {
sync.Mutex
rpcClient *rpc.Client
RpcServer string
Timeout time.Duration
}

func (this *SingleConnRpcClient) close() {
if this.rpcClient != nil {
this.rpcClient.Close()
this.rpcClient = nil
}
}

func (this *SingleConnRpcClient) insureConn() {
if this.rpcClient != nil {
return
}

var err error
var retry int = 1

for {
if this.rpcClient != nil {
return
}

this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout)
if err == nil {
return
}

log.Printf("dial %s fail: %v", this.RpcServer, err)

if retry > 6 {
retry = 1
}

time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second)

retry++
}
}

func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error {

this.Lock()
defer this.Unlock()

this.insureConn()

timeout := time.Duration(50 * time.Second)
done := make(chan error)

go func() {
err := this.rpcClient.Call(method, args, reply)
done <- err
}()

select {
case <-time.After(timeout):
log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer)
this.close()
case err := <-done:
if err != nil {
this.close()
return err
}
}

return nil
}

0 comments on commit c342de9

Please sign in to comment.