-
Notifications
You must be signed in to change notification settings - Fork 67
/
Copy pathlib.js
152 lines (127 loc) · 3.94 KB
/
lib.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
'use strict'
/* eslint no-prototype-builtins: 0 */
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
function initializeBulkHandler (opts, client, splitter) {
const esVersion = Number(opts.esVersion || opts['es-version']) || 7
const index = opts.index || 'pino'
const buildIndexName = typeof index === 'function' ? index : null
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
const opType = esVersion >= 7 ? (opts.opType || opts.op_type) : undefined
// Resurrect connection pool on destroy
splitter.destroy = () => {
if (typeof client.connectionPool.resurrect === 'function') {
client.connectionPool.resurrect({ name: 'elasticsearch-js' })
}
}
const bulkInsert = client.helpers.bulk({
datasource: splitter,
flushBytes: opts.flushBytes || opts['flush-bytes'] || 1000,
flushInterval: opts.flushInterval || opts['flush-interval'] || 30000,
refreshOnCompletion: getIndexName(),
onDocument (doc) {
const date = doc.time || doc['@timestamp']
if (opType === 'create') {
doc['@timestamp'] = date
}
return {
index: {
_index: getIndexName(date),
_type: type,
op_type: opType
}
}
},
onDrop (doc) {
const error = new Error('Dropped document')
error.document = doc
splitter.emit('insertError', error)
}
})
bulkInsert.then(
(stats) => splitter.emit('insert', stats),
(err) => splitter.emit('error', err)
)
function getIndexName (time = new Date().toISOString()) {
if (buildIndexName) {
return buildIndexName(time)
}
return index.replace('%{DATE}', time.substring(0, 10))
}
}
function pinoElasticSearch (opts = {}) {
if (opts['flush-bytes']) {
process.emitWarning('The "flush-bytes" option has been deprecated, use "flushBytes" instead')
}
if (opts['flush-interval']) {
process.emitWarning('The "flush-interval" option has been deprecated, use "flushInterval" instead')
}
if (opts['es-version']) {
process.emitWarning('The "es-version" option has been deprecated, use "esVersion" instead')
}
if (opts['bulk-size']) {
process.emitWarning('The "bulk-size" option has been removed, use "flushBytes" instead')
delete opts['bulk-size']
}
const splitter = split(function (line) {
let value
try {
value = JSON.parse(line)
} catch (error) {
this.emit('unknown', line, error)
return
}
if (typeof value === 'boolean') {
this.emit('unknown', line, 'Boolean value ignored')
return
}
if (value === null) {
this.emit('unknown', line, 'Null value ignored')
return
}
if (typeof value !== 'object') {
value = {
data: value,
time: setDateTimeString(value)
}
} else {
if (value['@timestamp'] === undefined) {
value.time = setDateTimeString(value)
}
}
function setDateTimeString (value) {
if (typeof value === 'object' && value.hasOwnProperty('time')) {
if (
(typeof value.time === 'string' && value.time.length) ||
(typeof value.time === 'number' && value.time >= 0)
) {
return new Date(value.time).toISOString()
}
}
return new Date().toISOString()
}
return value
}, { autoDestroy: true })
const clientOpts = {
node: opts.node,
auth: opts.auth,
cloud: opts.cloud,
tls: { rejectUnauthorized: opts.rejectUnauthorized, ...opts.tls }
}
if (opts.caFingerprint) {
clientOpts.caFingerprint = opts.caFingerprint
}
if (opts.Connection) {
clientOpts.Connection = opts.Connection
}
if (opts.ConnectionPool) {
clientOpts.ConnectionPool = opts.ConnectionPool
}
const client = new Client(clientOpts)
client.diagnostic.on('resurrect', () => {
initializeBulkHandler(opts, client, splitter)
})
initializeBulkHandler(opts, client, splitter)
return splitter
}
module.exports = pinoElasticSearch