diff --git a/example.js b/example.js index bbe1f24..8a79ae2 100644 --- a/example.js +++ b/example.js @@ -1,31 +1,24 @@ 'use strict' -const pino = require('pino')() - -pino.info('hello world') -pino.error('this is at error level') -pino.info('the answer is %d', 42) -pino.info({ obj: 42 }, 'hello world') -pino.info({ obj: 42, b: 2 }, 'hello world') -pino.info({ obj2: { aa: 'bbb' } }, 'another') -setImmediate(function () { - pino.info('after setImmediate') +const pinoJs = require('pino') +const elastic = require('./lib')({ + index: 'pinotest', + node: 'http://localhost:9200', + auth: { + apiKey: 'someKey' + }, + consistency: 'one', + 'es-version': 7, + 'flush-bytes': 1000 }) -pino.error(new Error('an error')) - -const child = pino.child({ a: 'property' }) -child.info('hello child!') - -const childsChild = child.child({ another: 'property' }) -childsChild.info('hello baby..') - -pino.debug('this should be mute') - -pino.level = 'trace' - -pino.debug('this is a debug statement') -pino.child({ another: 'property' }).debug('this is a debug statement via child') -pino.trace('this is a trace statement') +const level = 'trace' +const pino = pinoJs({ level }, pinoJs.multistream([ + { level, stream: elastic }, + { level, stream: process.stdout } +])) -pino.debug('this is a "debug" statement with "') +pino.trace('test') +setInterval(function () { + pino.trace('test') +}, 10000) diff --git a/lib.js b/lib.js index 7217965..8e15f7b 100644 --- a/lib.js +++ b/lib.js @@ -3,7 +3,11 @@ /* eslint no-prototype-builtins: 0 */ const split = require('split2') -const { Client } = require('@elastic/elasticsearch') +const { + Client, + ClusterConnectionPool, + HttpConnection +} = require('@elastic/elasticsearch') function initializeBulkHandler (opts, client, splitter) { const esVersion = Number(opts['es-version']) || 7 @@ -16,7 +20,6 @@ function initializeBulkHandler (opts, client, splitter) { splitter.destroy = () => { if (typeof client.connectionPool.resurrect === 'function') { client.connectionPool.resurrect({ name: 'elasticsearch-js' }) - initializeBulkHandler(opts, client, splitter) } } @@ -112,7 +115,9 @@ function pinoElasticSearch (opts) { node: opts.node, auth: opts.auth, cloud: opts.cloud, - ssl: { rejectUnauthorized: opts.rejectUnauthorized } + ssl: { rejectUnauthorized: opts.rejectUnauthorized }, + Connection: HttpConnection, + ConnectionPool: ClusterConnectionPool } if (opts.tls) { @@ -127,8 +132,16 @@ function pinoElasticSearch (opts) { clientOpts.Connection = opts.Connection } + if (opts.ConnectionPool) { + clientOpts.ConnectionPool = opts.ConnectionPool + } + const client = new Client(clientOpts) + client.diagnostic.on('resurrect', () => { + initializeBulkHandler(opts, client, splitter) + }) + initializeBulkHandler(opts, client, splitter) return splitter diff --git a/test/unit.test.js b/test/unit.test.js index 4a15e32..5f074be 100644 --- a/test/unit.test.js +++ b/test/unit.test.js @@ -15,10 +15,7 @@ const options = { } const dsOptions = { - index: 'logs-pino-test', - type: 'log', - consistency: 'one', - node: 'http://localhost:9200', + ...options, op_type: 'create' } @@ -33,6 +30,9 @@ test('make sure log is a valid json', (t) => { const Client = function (config) { t.equal(config.node, options.node) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -55,6 +55,9 @@ test('make sure log is a valid json', (t) => { test('date can be a number', (t) => { t.plan(1) const Client = function (config) {} + Client.prototype.diagnostic = { + on: () => {} + } const threeDaysInMillis = 3 * 24 * 60 * 60 * 1000 const time = new Date(Date.now() - threeDaysInMillis) @@ -82,6 +85,9 @@ test('Uses the type parameter only with ES < 7 / 1', (t) => { const Client = function (config) { t.equal(config.node, options.node) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { @@ -107,6 +113,9 @@ test('Uses the type parameter only with ES < 7 / 2', (t) => { const Client = function (config) { t.equal(config.node, options.node) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -131,6 +140,9 @@ test('ecs format', (t) => { const Client = function (config) { t.equal(config.node, options.node) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -155,10 +167,7 @@ test('ecs format', (t) => { test('auth and cloud parameters are properly passed to client', (t) => { const opts = { - index: 'pinotest', - type: 'log', - consistency: 'one', - node: 'http://localhost:9200', + ...options, auth: { username: 'user', password: 'pass' @@ -174,6 +183,9 @@ test('auth and cloud parameters are properly passed to client', (t) => { t.equal(config.auth, opts.auth) t.equal(config.cloud, opts.cloud) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) {} } @@ -185,10 +197,7 @@ test('auth and cloud parameters are properly passed to client', (t) => { test('apikey is passed through auth param properly to client', (t) => { const opts = { - index: 'pinotest', - type: 'log', - consistency: 'one', - node: 'http://localhost:9200', + ...options, auth: { apiKey: 'aHR0cHM6Ly9leGFtcGxlLmNvbQ' } @@ -199,6 +208,9 @@ test('apikey is passed through auth param properly to client', (t) => { t.equal(config.node, opts.node) t.equal(config.auth, opts.auth) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) {} } @@ -213,6 +225,9 @@ test('make sure `flush-interval` is passed to bulk request', (t) => { const Client = function (config) { t.equal(config.node, options.node) } + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { t.equal(opts.flushInterval, 12345) @@ -233,12 +248,15 @@ test('make sure `op_type` is passed to bulk onDocument request', (t) => { t.plan(2) const Client = function (config) {} + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { const result = opts.onDocument({}) - t.equal(result.index._index, 'logs-pino-test', '_index should be correctly set to `logs-pino-test`') - t.equal(result.index.op_type, 'create', '`op_type` should be set to `create`') + t.equal(result.index._index, dsOptions.index, `_index should be correctly set to \`${dsOptions.index}\``) + t.equal(result.index.op_type, dsOptions.op_type, `\`op_type\` should be set to \`${dsOptions.op_type}\``) t.end() } } @@ -258,6 +276,9 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) = time: '2021-09-01T01:01:01.038Z' } const Client = function (config) {} + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { async bulk (opts) { @@ -278,6 +299,9 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) = test('resurrect client connection pool when datasource split is destroyed', (t) => { let isResurrected = false const Client = function (config) {} + Client.prototype.diagnostic = { + on: () => {} + } Client.prototype.helpers = { bulk: async function (opts) {