Skip to content

Commit

Permalink
Add 4 missing duration and queue size metrics
Browse files Browse the repository at this point in the history
This adds the following metrics:
```
logstash_node_queue_size_bytes{pipeline="main"} 1.4235844e+07
logstash_node_pipeline_queue_push_duration_seconds_total{pipeline="main"} 382
logstash_node_plugin_queue_push_duration_seconds_total{pipeline="main",plugin="beats",plugin_id="beats",plugin_type="input"} 0
logstash_node_plugin_duration_seconds_total{pipeline="main",plugin="elasticsearch",plugin_id="es",plugin_type="output"} 67
```
  • Loading branch information
rifelpet committed Aug 13, 2018
1 parent d039196 commit 682b871
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 19 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.2
0.1.3
20 changes: 12 additions & 8 deletions collector/nodestats_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package collector
// Pipeline type
type Pipeline struct {
Events struct {
DurationInMillis int `json:"duration_in_millis"`
In int `json:"in"`
Filtered int `json:"filtered"`
Out int `json:"out"`
DurationInMillis int `json:"duration_in_millis"`
In int `json:"in"`
Filtered int `json:"filtered"`
Out int `json:"out"`
QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"`
} `json:"events"`
Plugins struct {
Inputs []struct {
ID string `json:"id"`
Events struct {
In int `json:"in"`
Out int `json:"out"`
QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"`
In int `json:"in"`
Out int `json:"out"`
} `json:"events"`
Name string `json:"name"`
} `json:"inputs,omitempty"`
Expand All @@ -35,8 +37,9 @@ type Pipeline struct {
Outputs []struct {
ID string `json:"id"`
Events struct {
In int `json:"in"`
Out int `json:"out"`
DurationInMillis int `json:"duration_in_millis"`
In int `json:"in"`
Out int `json:"out"`
} `json:"events"`
Name string `json:"name"`
} `json:"outputs"`
Expand All @@ -52,6 +55,7 @@ type Pipeline struct {
Events int `json:"events"`
Type string `json:"type"`
Capacity struct {
QueueSizeInBytes int `json:"queue_size_in_bytes"`
PageCapacityInBytes int `json:"page_capacity_in_bytes"`
MaxQueueSizeInBytes int64 `json:"max_queue_size_in_bytes"`
MaxUnreadEvents int `json:"max_unread_events"`
Expand Down
76 changes: 66 additions & 10 deletions collector/nodestats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,24 @@ type NodeStatsCollector struct {
ProcessMemTotalVirtualInBytes *prometheus.Desc
ProcessCPUTotalInMillis *prometheus.Desc

PipelineDuration *prometheus.Desc
PipelineEventsIn *prometheus.Desc
PipelineEventsFiltered *prometheus.Desc
PipelineEventsOut *prometheus.Desc

PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginMatches *prometheus.Desc
PipelinePluginFailures *prometheus.Desc
PipelineDuration *prometheus.Desc
PipelineQueuePushDuration *prometheus.Desc
PipelineEventsIn *prometheus.Desc
PipelineEventsFiltered *prometheus.Desc
PipelineEventsOut *prometheus.Desc

PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginMatches *prometheus.Desc
PipelinePluginFailures *prometheus.Desc

PipelineQueueEvents *prometheus.Desc
PipelineQueuePageCapacity *prometheus.Desc
PipelineQueueMaxQueueSize *prometheus.Desc
PipelineQueueMaxUnreadEvents *prometheus.Desc
PipelineQueueSizeInBytes *prometheus.Desc

PipelineDeadLetterQueueSizeInBytes *prometheus.Desc
}
Expand Down Expand Up @@ -191,6 +194,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {
nil,
),

PipelineQueuePushDuration: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "pipeline_queue_push_duration_seconds_total"),
"pipeline_queue_push_duration_seconds_total",
[]string{"pipeline"},
nil,
),

PipelineEventsIn: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "pipeline_events_in_total"),
"pipeline_events_in_total",
Expand Down Expand Up @@ -219,6 +229,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {
nil,
),

PipelinePluginEventsQueuePushDuration: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_queue_push_duration_seconds_total"),
"plugin_queue_push_duration_seconds_total",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelinePluginEventsIn: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_events_in_total"),
"plugin_events_in",
Expand Down Expand Up @@ -275,6 +292,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {
nil,
),

PipelineQueueSizeInBytes: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "queue_size_bytes"),
"queue_size_bytes",
[]string{"pipeline"},
nil,
),

PipelineDeadLetterQueueSizeInBytes: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "dead_letter_queue_size_bytes"),
"dead_letter_queue_size_bytes",
Expand Down Expand Up @@ -514,6 +538,13 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
pipelineID,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineQueuePushDuration,
prometheus.CounterValue,
float64(pipeline.Events.QueuePushDurationInMillis/1000),
pipelineID,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineEventsIn,
prometheus.CounterValue,
Expand Down Expand Up @@ -554,6 +585,15 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
plugin.ID,
"input",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginEventsQueuePushDuration,
prometheus.CounterValue,
float64(plugin.Events.QueuePushDurationInMillis/1000),
pipelineID,
plugin.Name,
plugin.ID,
"input",
)
}

for _, plugin := range pipeline.Plugins.Filters {
Expand Down Expand Up @@ -623,6 +663,15 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginEventsDuration,
prometheus.CounterValue,
float64(plugin.Events.DurationInMillis/1000),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
}

if pipeline.Queue.Type != "memory" {
Expand Down Expand Up @@ -653,6 +702,13 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
float64(pipeline.Queue.Capacity.MaxUnreadEvents),
pipelineID,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineQueueSizeInBytes,
prometheus.CounterValue,
float64(pipeline.Queue.Capacity.QueueSizeInBytes),
pipelineID,
)
}

if pipeline.DeadLetterQueue.QueueSizeInBytes != 0 {
Expand Down
1 change: 1 addition & 0 deletions collector/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ var queueJSON = []byte(`
"events" : 0,
"type" : "persisted",
"capacity" : {
"queue_size_in_bytes": 123456,
"page_capacity_in_bytes" : 262144000,
"max_queue_size_in_bytes" : 8589934592,
"max_unread_events" : 12
Expand Down

0 comments on commit 682b871

Please sign in to comment.