Skip to content

add ddagent stream collector #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (nodeAgent *NodeAgent) CreateTask(task *common.NodeTask) error {
}
}

newTask, err := NewHyperpilotTask(task, task.Id, metricTypes,
newTask, err := NewTask(task, task.Id, metricTypes,
taskCollector, taskProcessor, taskAnalyzer, nodeAgent)
if err != nil {
return errors.New(fmt.Sprintf("Unable to new agent task {%s}: %s", task.Id, err.Error()))
Expand All @@ -143,7 +143,7 @@ func (nodeAgent *NodeAgent) CreateTask(task *common.NodeTask) error {
log.Warnf("Task id {%s} is duplicated, skip this task", task.Id)
return nil
}
nodeAgent.Tasks[task.Id] = newTask
nodeAgent.Tasks[task.Id] = &newTask
return nil
}

Expand All @@ -167,7 +167,7 @@ func (nodeAgent *NodeAgent) CreatePublisher(p *common.Publish) error {

func (nodeAgent *NodeAgent) Run() {
for _, task := range nodeAgent.Tasks {
task.Run()
(*task).Run()
}
}

Expand Down
107 changes: 93 additions & 14 deletions cmd/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ type PublishConfig struct {
AnalyzerPublisher []*HyperpilotPublisher
}

type HyperpilotTask struct {
type BaseTask struct {
Task *common.NodeTask
Id string
Collector collector.Collector
Processor processor.Processor
Analyzer analyzer.Analyzer
PublishConfig *PublishConfig
Expand All @@ -32,14 +31,27 @@ type HyperpilotTask struct {
Agent *NodeAgent
}

func NewHyperpilotTask(
task *common.NodeTask,
type HyperpilotTask interface {
Run()
}

type NormalTask struct {
BaseTask
Collector collector.NormalCollector
}

type StreamTask struct {
BaseTask
Collector collector.StreamCollector
}

func NewTask(task *common.NodeTask,
id string,
allMetricTypes []snap.Metric,
collector collector.Collector,
coll collector.Collector,
processor processor.Processor,
analyzer analyzer.Analyzer,
agent *NodeAgent) (*HyperpilotTask, error) {
agent *NodeAgent) (HyperpilotTask, error) {
var pubs []*HyperpilotPublisher
for _, pubId := range *task.Publish {
p, ok := agent.Publishers[pubId]
Expand Down Expand Up @@ -80,10 +92,9 @@ func NewHyperpilotTask(
return nil, errors.New(errMsg)
}

return &HyperpilotTask{
base := BaseTask{
Task: task,
Id: id,
Collector: collector,
Processor: processor,
Analyzer: analyzer,
PublishConfig: &PublishConfig{
Expand All @@ -92,10 +103,70 @@ func NewHyperpilotTask(
},
CollectMetrics: cmts,
Agent: agent,
}, nil
}

if common.IsInstanceOf(coll, (*collector.NormalCollector)(nil)) {
return &NormalTask{
BaseTask: base,
Collector: coll.(collector.NormalCollector),
}, nil
} else {
return &StreamTask{
BaseTask: base,
Collector: coll.(collector.StreamCollector),
}, nil
}
}

func (task *HyperpilotTask) Run() {
func (task *StreamTask) Run() {

if err := task.collect(); err != nil {
log.Errorf("Unable to start stream collector {%s}: %s", task.Id, err.Error())
task.FailureCount++
task.reportError(err)
return
}

go func() {
log.Infof("wait for processing incoming snap metric")
for {
select {
case metrics := <-task.Collector.Metrics():
addTags(task.Task.Collect.Tags, metrics)
if task.Processor != nil {
var err error
task.FailureCount++
metrics, err = task.process(metrics, task.Task.Process.Config)
if err != nil {
task.reportError(err)
log.Warnf("process metric fail, skip this time: %s", err.Error())
continue
}
}
for _, publish := range task.PublishConfig.Publisher {
publish.Put(metrics)
}

if task.Analyzer != nil {
derivedMetrics, err := task.analyze(metrics, task.Task.Analyze.Config)
if err != nil {
task.FailureCount++
task.reportError(err)
log.Warnf("analyze metric fail for %s, skip this time: %s", task.Task.Id, err.Error())
continue
}
for _, publish := range task.PublishConfig.AnalyzerPublisher {
if len(derivedMetrics) > 0 {
publish.Put(derivedMetrics)
}
}
}
}
}
}()
}

func (task *NormalTask) Run() {
waitTime, err := time.ParseDuration(task.Task.Schedule.Interval)
if err != nil {
log.Warnf("Parse schedule interval {%s} fail, use default interval 5 seconds",
Expand Down Expand Up @@ -204,7 +275,7 @@ func addTags(tags map[string]map[string]string, mts []snap.Metric) []snap.Metric
return newMts
}

func (task *HyperpilotTask) collect() ([]snap.Metric, error) {
func (task *NormalTask) collect() ([]snap.Metric, error) {
collectMetrics, err := task.Collector.CollectMetrics(task.CollectMetrics)
if err != nil {
return nil, fmt.Errorf("Unable to collect metrics for %s: %s", task.Id, err.Error())
Expand All @@ -213,15 +284,23 @@ func (task *HyperpilotTask) collect() ([]snap.Metric, error) {
return addTags(task.Task.Collect.Tags, collectMetrics), nil
}

func (task *HyperpilotTask) process(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) {
func (task *StreamTask) collect() error {
if err := task.Collector.StreamMetrics(); err != nil {
log.Warnf("")
return err
}
return nil
}

func (task *BaseTask) process(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) {
return task.Processor.Process(mts, cfg)
}

func (task *HyperpilotTask) analyze(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) {
func (task *BaseTask) analyze(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) {
return task.Analyzer.Analyze(mts, cfg)
}

func (task *HyperpilotTask) reportError(err error) {
func (task *BaseTask) reportError(err error) {
report := common.TaskReport{
Id: task.Id,
Plugin: task.Task.Process.PluginName,
Expand Down
11 changes: 3 additions & 8 deletions conf/task-test.json → conf/ddagent-task.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@
"tasks": [
{
"id": "task1",
"schedule": {
"interval": "5s"
},
"collect": {
"plugin": "cpu",
"plugin": "ddagent",
"metrics": {
"/intel/procfs/cpu/*": {}
"/ddagent/*": {}
},
"config": {
"/intel/procfs/cpu": {
"proc_path": "/proc"
}
"port": "8877"
}
},
"publish": [
Expand Down
68 changes: 0 additions & 68 deletions conf/tasks.json

This file was deleted.

2 changes: 1 addition & 1 deletion dockerfiles/local/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM ubuntu:xenial
RUN apt-get update && apt-get -y install curl
RUN apt-get update && apt-get -y install curl netcat jq
RUN mkdir -p /etc/node_agent

COPY ./bin/linux/node-agent .
Expand Down
Loading