diff --git a/lib.js b/lib.js index e7a5a42..d6c6a65 100644 --- a/lib.js +++ b/lib.js @@ -5,6 +5,60 @@ const split = require('split2') const { Client } = require('@elastic/elasticsearch') +function initializeBulkHandler (opts, client, splitter) { + const esVersion = Number(opts['es-version']) || 7 + const index = opts.index || 'pino' + const buildIndexName = typeof index === 'function' ? index : null + const type = esVersion >= 7 ? undefined : (opts.type || 'log') + const opType = esVersion >= 7 ? opts.op_type : undefined + + // Resurrect connection pool on destroy + splitter.destroy = () => { + if (typeof client.connectionPool.resurrect === 'function') { + client.connectionPool.resurrect({ name: 'elasticsearch-js' }) + initializeBulkHandler(opts, client, splitter) + } + } + + const bulkInsert = client.helpers.bulk({ + datasource: splitter, + flushBytes: opts['flush-bytes'] || 1000, + flushInterval: opts['flush-interval'] || 30000, + refreshOnCompletion: getIndexName(), + onDocument (doc) { + const date = doc.time || doc['@timestamp'] + if (opType === 'create') { + doc['@timestamp'] = date + } + + return { + index: { + _index: getIndexName(date), + _type: type, + op_type: opType + } + } + }, + onDrop (doc) { + const error = new Error('Dropped document') + error.document = doc + splitter.emit('insertError', error) + } + }) + + bulkInsert.then( + (stats) => splitter.emit('insert', stats), + (err) => splitter.emit('error', err) + ) + + function getIndexName (time = new Date().toISOString()) { + if (buildIndexName) { + return buildIndexName(time) + } + return index.replace('%{DATE}', time.substring(0, 10)) + } +} + function pinoElasticSearch (opts) { if (opts['bulk-size']) { process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead') @@ -60,58 +114,14 @@ function pinoElasticSearch (opts) { cloud: opts.cloud, ssl: { rejectUnauthorized: opts.rejectUnauthorized } } + if (opts.Connection) { clientOpts.Connection = opts.Connection } const client = new Client(clientOpts) - const esVersion = Number(opts['es-version']) || 7 - const index = opts.index || 'pino' - const buildIndexName = typeof index === 'function' ? index : null - const type = esVersion >= 7 ? undefined : (opts.type || 'log') - const opType = esVersion >= 7 ? opts.op_type : undefined - const b = client.helpers.bulk({ - datasource: splitter, - flushBytes: opts['flush-bytes'] || 1000, - flushInterval: opts['flush-interval'] || 30000, - refreshOnCompletion: getIndexName(), - onDocument (doc) { - const date = doc.time || doc['@timestamp'] - if (opType === 'create') { - doc['@timestamp'] = date - } - - return { - index: { - _index: getIndexName(date), - _type: type, - op_type: opType - } - } - }, - onDrop (doc) { - const error = new Error('Dropped document') - error.document = doc - splitter.emit('insertError', error) - } - }) - - b.then( - (stats) => splitter.emit('insert', stats), - (err) => splitter.emit('error', err) - ) - - splitter._destroy = function (err, cb) { - b.then(() => cb(err), (e2) => cb(e2 || err)) - } - - function getIndexName (time = new Date().toISOString()) { - if (buildIndexName) { - return buildIndexName(time) - } - return index.replace('%{DATE}', time.substring(0, 10)) - } + initializeBulkHandler(opts, client, splitter) return splitter } diff --git a/test/unit.test.js b/test/unit.test.js index af35ce2..4a15e32 100644 --- a/test/unit.test.js +++ b/test/unit.test.js @@ -274,3 +274,33 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) = const log = pino(instance) log.info(['info'], 'abc') }) + +test('resurrect client connection pool when datasource split is destroyed', (t) => { + let isResurrected = false + const Client = function (config) {} + + Client.prototype.helpers = { + bulk: async function (opts) { + if (!isResurrected) { + opts.datasource.destroy() + } + } + } + + Client.prototype.connectionPool = { + resurrect: function () { + isResurrected = true + t.end() + } + } + + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options }) + const log = pino(instance) + + const prettyLog = 'Example of a log' + log.info(['info'], prettyLog) +})