-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinflux-sink.js
124 lines (96 loc) · 2.6 KB
/
influx-sink.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
'use strict'
var influx = require('influx')
var _ = require('lodash')
var bole = require('bole')
var through = require('through2')
var locals = {
plugin: 'vidi-influx-sink',
role: 'metrics',
enabled: true,
batch: {
max: 5,
timeout: 500,
},
influx: {
host: 'localhost',
port: '8086',
username: 'metrics',
password: 'metrics',
database: 'vidi_metrics'
}
}
module.exports = function (options) {
var seneca = this
var extend = seneca.util.deepextend
locals = extend(locals, options)
locals.log = bole(locals.plugin)
locals.client = influx(locals.influx)
locals.batch.list = {}
locals.batch.count = 0
locals.batch.stream = through.obj(on_write)
locals.batch.stream.on('error', on_stream_err)
locals.batch.next = Date.now() + locals.batch.timeout
seneca.add({role: locals.role, hook: 'sink'}, sink)
seneca.add({role: locals.role, enabled: '*'}, enable_disable)
locals.log.info(`batch size: ${locals.batch.max}`)
locals.log.info(`batch timeout: ${locals.batch.timeout}`)
return locals.plugin
}
function on_stream_err (err) {
locals.log.err('write stream error:')
locals.log.err(err)
}
function sink (msg, done) {
var stream = locals.batch.stream
var client = locals.client
if (locals.enabled && msg && msg.metric) {
stream.write(msg.metric)
}
done()
}
function enable_disable (msg, done) {
locals.enabled = msg.enabled
done()
}
// Called each time the stream is written to
function on_write (metric, enc, done) {
var name = metric.name
var values = metric.values
var tags = metric.tags
locals.batch.list[name] = locals.batch.list[name] || []
locals.batch.list[name].push([values, tags])
locals.batch.count = locals.batch.count + 1
var exceeded = locals.batch.count >= locals.batch.max
var expired = Date.now() > locals.batch.next
if (exceeded || expired) {
write_batch()
}
done()
}
function write_batch () {
var db = `${locals.influx.database}:${locals.influx.port}`
var written = locals.batch.count
var batches = locals.batch.list
locals.batch.list = {}
locals.batch.count = 0
reset_timeout()
function on_err (err) {
if (err) {
locals.log.error('error writing to influx:')
locals.log.error(err)
}
else {
locals.log.info(`${written} metric(s) written to ${db}`)
}
}
locals.client.writeSeries(batches, on_err)
}
function reset_timeout () {
var timeout = locals.batch.timeout
var next = locals.batch.next
if (timeout) {
clearTimeout(locals.batch.timer)
}
locals.batch.timer = setTimeout(() => {write_batch()}, timeout)
locals.batch.next = Date.now() + next
}