Skip to content

Commit

Permalink
fix reconnection (#168)
Browse files Browse the repository at this point in the history
* fix reconnection

* ConnectionPool typo

---------

Co-authored-by: sheldhur <[email protected]>
  • Loading branch information
sheldhur and sheldhur authored Jul 21, 2023
1 parent 69c2ada commit 61d9f51
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 43 deletions.
45 changes: 19 additions & 26 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
'use strict'

const pino = require('pino')()

pino.info('hello world')
pino.error('this is at error level')
pino.info('the answer is %d', 42)
pino.info({ obj: 42 }, 'hello world')
pino.info({ obj: 42, b: 2 }, 'hello world')
pino.info({ obj2: { aa: 'bbb' } }, 'another')
setImmediate(function () {
pino.info('after setImmediate')
const pinoJs = require('pino')
const elastic = require('./lib')({
index: 'pinotest',
node: 'http://localhost:9200',
auth: {
apiKey: 'someKey'
},
consistency: 'one',
'es-version': 7,
'flush-bytes': 1000
})
pino.error(new Error('an error'))

const child = pino.child({ a: 'property' })
child.info('hello child!')

const childsChild = child.child({ another: 'property' })
childsChild.info('hello baby..')

pino.debug('this should be mute')

pino.level = 'trace'

pino.debug('this is a debug statement')

pino.child({ another: 'property' }).debug('this is a debug statement via child')
pino.trace('this is a trace statement')
const level = 'trace'
const pino = pinoJs({ level }, pinoJs.multistream([
{ level, stream: elastic },
{ level, stream: process.stdout }
]))

pino.debug('this is a "debug" statement with "')
pino.trace('test')
setInterval(function () {
pino.trace('test')
}, 10000)
19 changes: 16 additions & 3 deletions lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
/* eslint no-prototype-builtins: 0 */

const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
const {
Client,
ClusterConnectionPool,
HttpConnection
} = require('@elastic/elasticsearch')

function initializeBulkHandler (opts, client, splitter) {
const esVersion = Number(opts['es-version']) || 7
Expand All @@ -16,7 +20,6 @@ function initializeBulkHandler (opts, client, splitter) {
splitter.destroy = () => {
if (typeof client.connectionPool.resurrect === 'function') {
client.connectionPool.resurrect({ name: 'elasticsearch-js' })
initializeBulkHandler(opts, client, splitter)
}
}

Expand Down Expand Up @@ -112,7 +115,9 @@ function pinoElasticSearch (opts) {
node: opts.node,
auth: opts.auth,
cloud: opts.cloud,
ssl: { rejectUnauthorized: opts.rejectUnauthorized }
ssl: { rejectUnauthorized: opts.rejectUnauthorized },
Connection: HttpConnection,
ConnectionPool: ClusterConnectionPool
}

if (opts.tls) {
Expand All @@ -127,8 +132,16 @@ function pinoElasticSearch (opts) {
clientOpts.Connection = opts.Connection
}

if (opts.ConnectionPool) {
clientOpts.ConnectionPool = opts.ConnectionPool
}

const client = new Client(clientOpts)

client.diagnostic.on('resurrect', () => {
initializeBulkHandler(opts, client, splitter)
})

initializeBulkHandler(opts, client, splitter)

return splitter
Expand Down
52 changes: 38 additions & 14 deletions test/unit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ const options = {
}

const dsOptions = {
index: 'logs-pino-test',
type: 'log',
consistency: 'one',
node: 'http://localhost:9200',
...options,
op_type: 'create'
}

Expand All @@ -33,6 +30,9 @@ test('make sure log is a valid json', (t) => {
const Client = function (config) {
t.equal(config.node, options.node)
}
Client.prototype.diagnostic = {
on: () => {}
}
Client.prototype.helpers = {
async bulk (opts) {
for await (const chunk of opts.datasource) {
Expand All @@ -55,6 +55,9 @@ test('make sure log is a valid json', (t) => {
test('date can be a number', (t) => {
t.plan(1)
const Client = function (config) {}
Client.prototype.diagnostic = {
on: () => {}
}

const threeDaysInMillis = 3 * 24 * 60 * 60 * 1000
const time = new Date(Date.now() - threeDaysInMillis)
Expand Down Expand Up @@ -82,6 +85,9 @@ test('Uses the type parameter only with ES < 7 / 1', (t) => {
const Client = function (config) {
t.equal(config.node, options.node)
}
Client.prototype.diagnostic = {
on: () => {}
}

Client.prototype.helpers = {
async bulk (opts) {
Expand All @@ -107,6 +113,9 @@ test('Uses the type parameter only with ES < 7 / 2', (t) => {
const Client = function (config) {
t.equal(config.node, options.node)
}
Client.prototype.diagnostic = {
on: () => {}
}
Client.prototype.helpers = {
async bulk (opts) {
for await (const chunk of opts.datasource) {
Expand All @@ -131,6 +140,9 @@ test('ecs format', (t) => {
const Client = function (config) {
t.equal(config.node, options.node)
}
Client.prototype.diagnostic = {
on: () => {}
}
Client.prototype.helpers = {
async bulk (opts) {
for await (const chunk of opts.datasource) {
Expand All @@ -155,10 +167,7 @@ test('ecs format', (t) => {

test('auth and cloud parameters are properly passed to client', (t) => {
const opts = {
index: 'pinotest',
type: 'log',
consistency: 'one',
node: 'http://localhost:9200',
...options,
auth: {
username: 'user',
password: 'pass'
Expand All @@ -174,6 +183,9 @@ test('auth and cloud parameters are properly passed to client', (t) => {
t.equal(config.auth, opts.auth)
t.equal(config.cloud, opts.cloud)
}
Client.prototype.diagnostic = {
on: () => {}
}
Client.prototype.helpers = {
async bulk (opts) {}
}
Expand All @@ -185,10 +197,7 @@ test('auth and cloud parameters are properly passed to client', (t) => {

test('apikey is passed through auth param properly to client', (t) => {
const opts = {
index: 'pinotest',
type: 'log',
consistency: 'one',
node: 'http://localhost:9200',
...options,
auth: {
apiKey: 'aHR0cHM6Ly9leGFtcGxlLmNvbQ'
}
Expand All @@ -199,6 +208,9 @@ test('apikey is passed through auth param properly to client', (t) => {
t.equal(config.node, opts.node)
t.equal(config.auth, opts.auth)
}
Client.prototype.diagnostic = {
on: () => {}
}
Client.prototype.helpers = {
async bulk (opts) {}
}
Expand All @@ -213,6 +225,9 @@ test('make sure `flush-interval` is passed to bulk request', (t) => {
const Client = function (config) {
t.equal(config.node, options.node)
}
Client.prototype.diagnostic = {
on: () => {}
}
Client.prototype.helpers = {
async bulk (opts) {
t.equal(opts.flushInterval, 12345)
Expand All @@ -233,12 +248,15 @@ test('make sure `op_type` is passed to bulk onDocument request', (t) => {
t.plan(2)

const Client = function (config) {}
Client.prototype.diagnostic = {
on: () => {}
}

Client.prototype.helpers = {
async bulk (opts) {
const result = opts.onDocument({})
t.equal(result.index._index, 'logs-pino-test', '_index should be correctly set to `logs-pino-test`')
t.equal(result.index.op_type, 'create', '`op_type` should be set to `create`')
t.equal(result.index._index, dsOptions.index, `_index should be correctly set to \`${dsOptions.index}\``)
t.equal(result.index.op_type, dsOptions.op_type, `\`op_type\` should be set to \`${dsOptions.op_type}\``)
t.end()
}
}
Expand All @@ -258,6 +276,9 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) =
time: '2021-09-01T01:01:01.038Z'
}
const Client = function (config) {}
Client.prototype.diagnostic = {
on: () => {}
}

Client.prototype.helpers = {
async bulk (opts) {
Expand All @@ -278,6 +299,9 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) =
test('resurrect client connection pool when datasource split is destroyed', (t) => {
let isResurrected = false
const Client = function (config) {}
Client.prototype.diagnostic = {
on: () => {}
}

Client.prototype.helpers = {
bulk: async function (opts) {
Expand Down

0 comments on commit 61d9f51

Please sign in to comment.