forked from skorpworks/logbus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstage.js
77 lines (64 loc) · 1.97 KB
/
stage.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const _ = require('lodash')
module.exports = (name, props, plugin, logbus) => {
logbus.pipeline.on('READY', () => {
// TODO: an overly convulted way for main() to communicate that stage is ready
logbus.ready = true
})
const inChannels = props.inChannels || []
const outChannels = props.outChannels
const isInput = inChannels.length === 0
const isOutput = props.outChannels.length === 0
const isErrors = props.module === 'errors'
const isStats = props.module === 'stats'
if (plugin.onInput) {
inChannels.forEach(inChannel => logbus.pipeline.on(inChannel, plugin.onInput))
}
const waitingOn = {}
let stopped = false
const start = () => {
if (plugin.start) {
return plugin.start()
}
return new Promise(resolve => {
resolve({stage: logbus.stage})
})
}
async function stop(input) {
delete waitingOn[input]
if (Object.keys(waitingOn).length === 0) {
logbus.log.info('stopping via', input || 'SHUTDOWN')
if (plugin.stop) {
try {
await plugin.stop()
} catch (err) {
logbus.log.error(err, {stage: name}, 'failed to stop')
}
}
logbus.pipeline.emit(name + '.stopped', name)
stopped = true
}
}
function waitOn(stage) {
waitingOn[stage] = true
}
function inputs(stages) {
const matches = []
_.each(stages, (stage, sname) => {
if (_.intersection(stage.outChannels, inChannels).length !== 0) {
matches.push(sname)
}
})
return matches
}
function outputs(stages) {
const matches = []
_.each(stages, (stage, sname) => {
if (_.intersection(stage.inChannels, props.outChannels).length !== 0) {
matches.push(sname)
}
})
return matches
}
// TODO: This sucks - all kinds of odd coupling twix stage, plugin, and logbus instance
return {start, stop, inputs, outputs, inChannels, outChannels, isInput, isOutput, isErrors, isStats, waitOn, waitingOn, stopped: () => stopped}
}