Skip to content

Commit

Permalink
Added: reloads (globa/pipeline), output pipeline failures, bulk_reque…
Browse files Browse the repository at this point in the history
…sts, queue_size metrics; moved collector to vendor
  • Loading branch information
wasilak committed Feb 28, 2019
1 parent d039196 commit aec146e
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 13 deletions.
2 changes: 1 addition & 1 deletion logstash_exporter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

import (
"github.com/BonnierNews/logstash_exporter/collector"
"collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/version"
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ type NodeInfoResponse struct {
HTTPAddress string `json:"http_address"`
ID string `json:"id"`
Name string `json:"name"`
Pipeline struct {
Reloads struct {
Successes int `json:"successes"`
Failures int `json:"failures"`
} `json:"reloads"`
Pipeline struct {
Workers int `json:"workers"`
BatchSize int `json:"batch_size"`
BatchDelay int `json:"batch_delay"`
Expand Down Expand Up @@ -42,7 +46,7 @@ func NodeInfo(endpoint string) (NodeInfoResponse, error) {
var response NodeInfoResponse

handler := &HTTPHandler{
Endpoint: endpoint + "/_node",
Endpoint: endpoint + "/_node/stats",
}

err := getMetrics(handler, &response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
type NodeInfoCollector struct {
endpoint string

NodeInfos *prometheus.Desc
OsInfos *prometheus.Desc
JvmInfos *prometheus.Desc
NodeInfos *prometheus.Desc
OsInfos *prometheus.Desc
JvmInfos *prometheus.Desc
ReloadsInfos *prometheus.Desc
}

// NewNodeInfoCollector function
Expand Down Expand Up @@ -42,6 +43,13 @@ func NewNodeInfoCollector(logstashEndpoint string) (Collector, error) {
[]string{"name", "version", "vendor"},
nil,
),

ReloadsInfos: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "reloads"),
"Logstash reloads",
[]string{"result"},
nil,
),
}, nil
}

Expand Down Expand Up @@ -86,5 +94,19 @@ func (c *NodeInfoCollector) collect(ch chan<- prometheus.Metric) (*prometheus.De
stats.Jvm.VMVendor,
)

ch <- prometheus.MustNewConstMetric(
c.ReloadsInfos,
prometheus.CounterValue,
float64(stats.Reloads.Successes),
"success",
)

ch <- prometheus.MustNewConstMetric(
c.ReloadsInfos,
prometheus.CounterValue,
float64(stats.Reloads.Failures),
"failure",
)

return nil, nil
}
16 changes: 14 additions & 2 deletions collector/nodestats_api.go → vendor/collector/nodestats_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,20 @@ type Pipeline struct {
Outputs []struct {
ID string `json:"id"`
Events struct {
In int `json:"in"`
Out int `json:"out"`
DurationInMillis int `json:"duration_in_millis"`
In int `json:"in"`
Out int `json:"out"`
} `json:"events"`
Documents struct {
Successes int `json:"successes"`
RetryableFailures int `json:"retryable_failures"`
NonRetryableFailures int `json:"non_retryable_failures"`
} `json:"documents"`
BulkRequests struct {
Successes int `json:"successes"`
WithErrors int `json:"with_errors"`
Responses interface{} `json:"responses"`
} `json:"bulk_requests"`
Name string `json:"name"`
} `json:"outputs"`
} `json:"plugins"`
Expand All @@ -53,6 +64,7 @@ type Pipeline struct {
Type string `json:"type"`
Capacity struct {
PageCapacityInBytes int `json:"page_capacity_in_bytes"`
QueueSizeInBytes int64 `json:"queue_size_in_bytes"`
MaxQueueSizeInBytes int64 `json:"max_queue_size_in_bytes"`
MaxUnreadEvents int `json:"max_unread_events"`
} `json:"capacity"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@ type NodeStatsCollector struct {
PipelineEventsFiltered *prometheus.Desc
PipelineEventsOut *prometheus.Desc

PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginMatches *prometheus.Desc
PipelinePluginFailures *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsIn *prometheus.Desc
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginMatches *prometheus.Desc
PipelinePluginFailures *prometheus.Desc
PipelinePluginSuccesses *prometheus.Desc
PipelinePluginRetryableFailures *prometheus.Desc
PipelinePluginBulkRequests *prometheus.Desc

PipelineReloads *prometheus.Desc

PipelineQueueEvents *prometheus.Desc
PipelineQueuePageCapacity *prometheus.Desc
PipelineQueueQueueSize *prometheus.Desc
PipelineQueueMaxQueueSize *prometheus.Desc
PipelineQueueMaxUnreadEvents *prometheus.Desc

Expand Down Expand Up @@ -243,10 +249,31 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {
PipelinePluginFailures: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_failures_total"),
"plugin_failures",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type", "retryable"},
nil,
),

PipelinePluginSuccesses: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_successes_total"),
"plugin_successes",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type"},
nil,
),

PipelineReloads: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_reloads_total"),
"plugin_reloads",
[]string{"pipeline", "result"},
nil,
),

PipelinePluginBulkRequests: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "plugin_bulk_requests_total"),
"plugin_bulk_requests",
[]string{"pipeline", "plugin", "plugin_id", "plugin_type", "result"},
nil,
),

PipelineQueueEvents: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "queue_events"),
"queue_events",
Expand All @@ -261,6 +288,13 @@ func NewNodeStatsCollector(logstashEndpoint string) (Collector, error) {
nil,
),

PipelineQueueQueueSize: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "queue_size_in_bytes"),
"queue_size_in_bytes",
[]string{"pipeline"},
nil,
),

PipelineQueueMaxQueueSize: prometheus.NewDesc(
prometheus.BuildFQName(Namespace, subsystem, "queue_max_size_bytes"),
"queue_max_size_bytes",
Expand Down Expand Up @@ -535,6 +569,21 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
pipelineID,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineReloads,
prometheus.CounterValue,
float64(pipeline.Reloads.Successes),
pipelineID,
"success",
)
ch <- prometheus.MustNewConstMetric(
c.PipelineReloads,
prometheus.CounterValue,
float64(pipeline.Reloads.Failures),
pipelineID,
"failure",
)

for _, plugin := range pipeline.Plugins.Inputs {
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginEventsIn,
Expand Down Expand Up @@ -601,6 +650,7 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
plugin.Name,
plugin.ID,
"filter",
"n.a.",
)
}

Expand All @@ -623,6 +673,64 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginEventsDuration,
prometheus.CounterValue,
float64(plugin.Events.DurationInMillis/1000),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginSuccesses,
prometheus.CounterValue,
float64(plugin.Documents.Successes),
pipelineID,
plugin.Name,
plugin.ID,
"output",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginFailures,
prometheus.CounterValue,
float64(plugin.Documents.RetryableFailures),
pipelineID,
plugin.Name,
plugin.ID,
"output",
"yes",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginFailures,
prometheus.CounterValue,
float64(plugin.Documents.NonRetryableFailures),
pipelineID,
plugin.Name,
plugin.ID,
"output",
"no",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginBulkRequests,
prometheus.CounterValue,
float64(plugin.BulkRequests.Successes),
pipelineID,
plugin.Name,
plugin.ID,
"output",
"success",
)
ch <- prometheus.MustNewConstMetric(
c.PipelinePluginBulkRequests,
prometheus.CounterValue,
float64(plugin.BulkRequests.WithErrors),
pipelineID,
plugin.Name,
plugin.ID,
"output",
"error",
)
}

if pipeline.Queue.Type != "memory" {
Expand All @@ -647,6 +755,13 @@ func (c *NodeStatsCollector) collect(ch chan<- prometheus.Metric) (*prometheus.D
pipelineID,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineQueueQueueSize,
prometheus.CounterValue,
float64(pipeline.Queue.Capacity.QueueSizeInBytes),
pipelineID,
)

ch <- prometheus.MustNewConstMetric(
c.PipelineQueueMaxUnreadEvents,
prometheus.CounterValue,
Expand Down
File renamed without changes.

0 comments on commit aec146e

Please sign in to comment.