From e4beebb30484d8b989e604d19ff01616b4fe8e05 Mon Sep 17 00:00:00 2001 From: Roman Untilov Date: Fri, 18 Aug 2023 17:31:53 +0300 Subject: [PATCH] fix support for elasticsearch v7 + add types (#169) * fix support for elasticsearch v7 + add types * add tests for types + minor improvements * add back tls option --------- Co-authored-by: Roman UNTILOV --- README.md | 67 ++++------ cli.js | 3 +- example.js | 5 +- lib.d.ts | 56 +++++++++ lib.js | 40 +++--- package.json | 12 +- test-d/lib.test-d.ts | 27 ++++ test/acceptance.test.js | 86 ++++++++----- test/unit.test.js | 273 +++++++++++++++++++++++++++++++++------- usage.txt | 1 - 10 files changed, 422 insertions(+), 148 deletions(-) create mode 100644 lib.d.ts create mode 100644 test-d/lib.test-d.ts diff --git a/README.md b/README.md index 2ff0455..4c5691f 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,6 @@ npm install pino-elasticsearch -g -t | --type the name of the type to use; default: log -f | --flush-bytes the number of bytes for each bulk insert; default: 1000 -t | --flush-interval time that the helper will wait before flushing; default: 30000 - -b | --bulk-size the number of documents for each bulk insert [DEPRECATED] -l | --trace-level trace level for the elasticsearch client, default 'error' (info, debug, trace). | --es-version specify the major version number of Elasticsearch (eg: 5, 6, 7) (this is needed only if you are using Elasticsearch <= 7) @@ -50,10 +49,9 @@ const pinoElastic = require('pino-elasticsearch') const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) const logger = pino({ level: 'info' }, streamToElastic) @@ -72,10 +70,9 @@ const pinoMultiStream = require('pino-multi-stream').multistream; const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }); const pinoOptions = {}; @@ -100,10 +97,9 @@ const Connection = const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000, + esVersion: 7, + flushBytes: 1000, Connection }) @@ -124,10 +120,9 @@ const pinoElastic = require('pino-elasticsearch'); const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) streamToElastic.on('', (error) => console.log(event)); @@ -137,7 +132,7 @@ The following table lists the events emitted by the stream handler: | Event | Callback Signature | Description | | ----- | ------------------ | ----------- | -| `unknown` | `(line: string, error: string) => void` | Event received by `pino-elasticsearch` is unparseable (via `JSON.parse`) | +| `unknown` | `(line: string, error: string) => void` | Event received by `pino-elasticsearch` is unparsable (via `JSON.parse`) | | `insertError` | `(error: Error & { document: Record }) => void` | The bulk insert request to Elasticsearch failed (records dropped). | | `insert` | `(stats: Record) => void` | Called when an insert was successfully performed | | `error` | `(error: Error) => void` | Called when the Elasticsearch client fails for some other reason | @@ -154,10 +149,9 @@ const pinoElastic = require('pino-elasticsearch'); const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) streamToElastic.on( @@ -198,10 +192,9 @@ const pinoElastic = require('pino-elasticsearch') const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) const logger = pino({ level: 'info', ...ecsFormat }, streamToElastic) @@ -227,7 +220,6 @@ const streamToElastic = pinoElastic({ // the logTime is a ISO 8601 formatted string of the log line return `awesome-app-${logTime.substring(5, 10)}` }, - consistency: 'one', node: 'http://localhost:9200' }) // ... @@ -237,16 +229,15 @@ The function **must** be sync, doesn't throw and return a string. #### Datastreams -Indexing to datastreams requires the `op_type` to be set to `create`: +Indexing to datastreams requires the `opType` to be set to `create`: ```js const pino = require('pino') const pinoElastic = require('pino-elasticsearch') const streamToElastic = pinoElastic({ index: "type-dataset-namespace", - consistency: 'one', node: 'http://localhost:9200', - op_type: 'create' + opType: 'create' }) // ... ``` @@ -254,27 +245,26 @@ const streamToElastic = pinoElastic({ #### Error handling ```js const pino = require('pino') -const ecsFormat = require('@elastic/ecs-pino-format')() +const ecsFormat = require('@elastic/ecs-pino-format') const pinoElastic = require('pino-elasticsearch') const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) // Capture errors like unable to connect Elasticsearch instance. streamToElastic.on('error', (error) => { console.error('Elasticsearch client error:', error); }) -// Capture errors returned from Elasticsearch, "it will be called for everytime a document can't be indexed". +// Capture errors returned from Elasticsearch, "it will be called every time a document can't be indexed". streamToElastic.on('insertError', (error) => { console.error('Elasticsearch server error:', error); }) -const logger = pino({ level: 'info', ...ecsFormat }, streamToElastic) +const logger = pino({ level: 'info', ...ecsFormat() }, streamToElastic) logger.info('hello world') ``` @@ -306,14 +296,13 @@ const pinoElastic = require('pino-elasticsearch') const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', auth: { username: 'user', password: 'pwd' }, - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) ``` @@ -323,7 +312,6 @@ const pinoElastic = require('pino-elasticsearch') const streamToElastic = pinoElastic({ index: 'an-index', - consistency: 'one', node: 'http://localhost:9200', cloud: { id: 'name:bG9jYWxob3N0JGFiY2QkZWZnaA==' @@ -331,8 +319,8 @@ const streamToElastic = pinoElastic({ auth: { apiKey: 'apikey123' }, - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) ``` @@ -345,9 +333,8 @@ use pino-elasticsearch as a module is simple, use [pino-multi-stream](https://ww ```js const pinoms = require('pino-multi-stream') const pinoEs = require('pino-elasticsearch')({ - host: '192.168.1.220', - index: 'zb', - port: '9200' + node: 'http://192.168.1.220:9200', + index: 'zb' }) const logger = pinoms({ @@ -366,8 +353,6 @@ logger.error('error') ``` -*** Notice, the `host` and `port` parameters of `pino-elasticsearch` are required *** - ## Setup and Testing Setting up pino-elasticsearch is easy, and you can use the bundled diff --git a/cli.js b/cli.js index b93aedf..149a302 100755 --- a/cli.js +++ b/cli.js @@ -57,7 +57,6 @@ const flags = minimist(process.argv.slice(2), { help: 'h', node: 'n', index: 'i', - 'bulk-size': 'b', 'flush-bytes': 'f', 'flush-interval': 't', 'trace-level': 'l', @@ -72,7 +71,7 @@ const flags = minimist(process.argv.slice(2), { } }) -const allowedProps = ['node', 'index', 'bulk-size', 'flush-btyes', 'flush-interval', 'trace-level', 'username', 'password', 'api-key', 'cloud', 'es-version', 'rejectUnauthorized'] +const allowedProps = ['node', 'index', 'flush-bytes', 'flush-interval', 'trace-level', 'username', 'password', 'api-key', 'cloud', 'es-version', 'rejectUnauthorized'] if (flags['read-config']) { if (flags['read-config'].match(/.*\.json$/) !== null) { diff --git a/example.js b/example.js index 8a79ae2..f54e0f9 100644 --- a/example.js +++ b/example.js @@ -7,9 +7,8 @@ const elastic = require('./lib')({ auth: { apiKey: 'someKey' }, - consistency: 'one', - 'es-version': 7, - 'flush-bytes': 1000 + esVersion: 7, + flushBytes: 1000 }) const level = 'trace' diff --git a/lib.d.ts b/lib.d.ts new file mode 100644 index 0000000..053f48a --- /dev/null +++ b/lib.d.ts @@ -0,0 +1,56 @@ +import type { Transform } from 'stream' +import type { ClientOptions } from '@elastic/elasticsearch' + +export default pinoElasticsearch + +declare function pinoElasticsearch(options?: Options): DestinationStream + +export type DestinationStream = Transform & { + /** + * when something, that cannot be parsed, is encountered + */ + on(event: 'unknown', handler: (line: string, error: string) => void): void + /** + * when a bulk insert request failed which resulted in logs being dropped + */ + on(event: 'insertError', handler: (error: Error & { document: Record }) => void): void + /** + * when a batch of logs was sent successfully + */ + on(event: 'insert', handler: (stats: Record) => void): void + /** + * when some other kind of error happened, e.g. connection issues + */ + on(event: 'error', handler: (error: Error) => void): void +} + +export type Options = Pick & { + index?: Index + + type?: string + + /** @deprecated use `opType` instead */ + op_type?: OpType; + opType?: OpType; + + /** @deprecated use `flushBytes` instead */ + 'flush-bytes'?: number | undefined + flushBytes?: number | undefined + + /** @deprecated use `flushInterval` instead */ + 'flush-interval'?: number | undefined + flushInterval?: number | undefined + + /** @deprecated use `esVersion` instead */ + 'es-version'?: number | undefined + esVersion?: number | undefined + + /** @deprecated use `tls.rejectUnauthorized` instead */ + rejectUnauthorized?: boolean + + tls?: ClientOptions['ssl']; +} + +export type Index = string | `${string | ''}%{DATE}${string | ''}` | ((logTime: string) => string) + +export type OpType = 'create' | 'index' \ No newline at end of file diff --git a/lib.js b/lib.js index 8e15f7b..4f221ad 100644 --- a/lib.js +++ b/lib.js @@ -3,18 +3,14 @@ /* eslint no-prototype-builtins: 0 */ const split = require('split2') -const { - Client, - ClusterConnectionPool, - HttpConnection -} = require('@elastic/elasticsearch') +const { Client } = require('@elastic/elasticsearch') function initializeBulkHandler (opts, client, splitter) { - const esVersion = Number(opts['es-version']) || 7 + const esVersion = Number(opts.esVersion || opts['es-version']) || 7 const index = opts.index || 'pino' const buildIndexName = typeof index === 'function' ? index : null const type = esVersion >= 7 ? undefined : (opts.type || 'log') - const opType = esVersion >= 7 ? opts.op_type : undefined + const opType = esVersion >= 7 ? (opts.opType || opts.op_type) : undefined // Resurrect connection pool on destroy splitter.destroy = () => { @@ -25,8 +21,8 @@ function initializeBulkHandler (opts, client, splitter) { const bulkInsert = client.helpers.bulk({ datasource: splitter, - flushBytes: opts['flush-bytes'] || 1000, - flushInterval: opts['flush-interval'] || 30000, + flushBytes: opts.flushBytes || opts['flush-bytes'] || 1000, + flushInterval: opts.flushInterval || opts['flush-interval'] || 30000, refreshOnCompletion: getIndexName(), onDocument (doc) { const date = doc.time || doc['@timestamp'] @@ -62,9 +58,21 @@ function initializeBulkHandler (opts, client, splitter) { } } -function pinoElasticSearch (opts) { +function pinoElasticSearch (opts = {}) { + if (opts['flush-bytes']) { + process.emitWarning('The "flush-bytes" option has been deprecated, use "flushBytes" instead') + } + + if (opts['flush-interval']) { + process.emitWarning('The "flush-interval" option has been deprecated, use "flushInterval" instead') + } + + if (opts['es-version']) { + process.emitWarning('The "es-version" option has been deprecated, use "esVersion" instead') + } + if (opts['bulk-size']) { - process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead') + process.emitWarning('The "bulk-size" option has been removed, use "flushBytes" instead') delete opts['bulk-size'] } @@ -115,13 +123,7 @@ function pinoElasticSearch (opts) { node: opts.node, auth: opts.auth, cloud: opts.cloud, - ssl: { rejectUnauthorized: opts.rejectUnauthorized }, - Connection: HttpConnection, - ConnectionPool: ClusterConnectionPool - } - - if (opts.tls) { - clientOpts.tls = opts.tls + ssl: { rejectUnauthorized: opts.rejectUnauthorized, ...opts.tls } } if (opts.caFingerprint) { @@ -138,7 +140,7 @@ function pinoElasticSearch (opts) { const client = new Client(clientOpts) - client.diagnostic.on('resurrect', () => { + client.on('resurrect', () => { initializeBulkHandler(opts, client, splitter) }) diff --git a/package.json b/package.json index e943724..0247a22 100644 --- a/package.json +++ b/package.json @@ -3,11 +3,12 @@ "version": "6.5.0", "description": "Load pino logs into ElasticSearch", "main": "./lib.js", + "types": "./lib.d.ts", "bin": { "pino-elasticsearch": "./cli.js" }, "scripts": { - "test": "standard && tap test/*.test.js -j 1 --no-coverage --timeout 120" + "test": "standard && tsd && tap test/*.test.js -j 1 --no-coverage --timeout 120" }, "pre-commit": "test", "keywords": [ @@ -23,15 +24,16 @@ "license": "MIT", "devDependencies": { "@elastic/ecs-pino-format": "^1.1.1", - "is-elasticsearch-running": "^0.1.0", + "@types/node": "^20.5.0", "pino": "^8.4.0", "pre-commit": "^1.2.2", "proxyquire": "^2.1.3", "standard": "^17.0.0", - "tap": "^16.0.0" + "tap": "^16.0.0", + "tsd": "^0.28.1" }, "dependencies": { - "@elastic/elasticsearch": "^8.2.1", + "@elastic/elasticsearch": "^7.17.12", "minimist": "^1.2.5", "pump": "^3.0.0", "split2": "^4.0.0" @@ -47,4 +49,4 @@ "directories": { "test": "test" } -} +} \ No newline at end of file diff --git a/test-d/lib.test-d.ts b/test-d/lib.test-d.ts new file mode 100644 index 0000000..1a8f5a4 --- /dev/null +++ b/test-d/lib.test-d.ts @@ -0,0 +1,27 @@ +import { expectType, expectDeprecated, expectNotDeprecated, expectDocCommentIncludes } from 'tsd' +import pinoElasticSearch, { DestinationStream, Options } from '../'; + +const options = {} as Options; + +expectType(pinoElasticSearch()) +expectType(pinoElasticSearch(options)); + +expectDeprecated(options['flush-bytes']) +expectNotDeprecated(options.flushBytes) +expectType(options.flushBytes) + +expectDeprecated(options['flush-interval']) +expectNotDeprecated(options.flushInterval) +expectType(options.flushInterval) + +expectDeprecated(options['op_type']) +expectNotDeprecated(options.opType) +expectType(options.opType) + +expectDeprecated(options['es-version']) +expectNotDeprecated(options.esVersion) +expectType(options.esVersion) + +expectDeprecated(options.rejectUnauthorized) +expectNotDeprecated(options.tls?.rejectUnauthorized) +expectType(options.tls?.rejectUnauthorized) diff --git a/test/acceptance.test.js b/test/acceptance.test.js index 625dbfc..94f2380 100644 --- a/test/acceptance.test.js +++ b/test/acceptance.test.js @@ -1,43 +1,64 @@ 'use strict' const { once } = require('events') -const pino = require('pino') -const IER = require('is-elasticsearch-running') +const { pino } = require('pino') const elastic = require('../') -const tap = require('tap') -const test = require('tap').test +const { teardown, beforeEach, test } = require('tap') const { Client } = require('@elastic/elasticsearch') -const client = new Client({ node: 'http://localhost:9200' }) -const EcsFormat = require('@elastic/ecs-pino-format') +const ecsFormat = require('@elastic/ecs-pino-format') const index = 'pinotest' const streamIndex = 'logs-pino-test' const type = 'log' -const consistency = 'one' const node = 'http://localhost:9200' const timeout = 5000 +const auth = { + apiKey: process.env.ELASTICSEARCH_API_KEY, + bearer: process.env.ELASTICSEARCH_BEARER, + username: process.env.ELASTICSEARCH_USERNAME, + password: process.env.ELASTICSEARCH_PASSWORD +} + +const client = new Client({ node, auth }) + +function esIsRunning () { + return client.ping() + .then(() => true) + .catch(() => false) +} + +async function esWaitCluster () { + const ATTEMPTS_LIMIT = 10 + + for (let i = 0; i <= ATTEMPTS_LIMIT; i += 1) { + try { + await client.cluster.health({ wait_for_status: 'green', timeout: '60s' }) + } catch (error) { + if (i === ATTEMPTS_LIMIT) { + throw error + } + } + } +} -tap.teardown(() => { +teardown(() => { client.close() }) let esVersion = 7 let esMinor = 15 -let es const supportsDatastreams = () => esVersion > 7 || (esVersion === 7 && esMinor >= 9) -tap.beforeEach(async () => { - if (es) { - es = IER() - if (!await es.isRunning()) { - await es.waitCluster() - } +beforeEach(async () => { + if (!await esIsRunning()) { + await esWaitCluster() } - const result = await client.info() - esVersion = Number(result.version.number.split('.')[0]) - esMinor = Number(result.version.number.split('.')[1]) + + const { body: { version } } = await client.info() + esVersion = Number(version.number.split('.')[0]) + esMinor = Number(version.number.split('.')[1]) await client.indices.delete({ index }, { ignore: [404] }) await client.indices.create({ index }) @@ -50,7 +71,7 @@ tap.beforeEach(async () => { test('store a log line', { timeout }, async (t) => { t.plan(2) - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) log.info('hello world') @@ -69,10 +90,10 @@ test('store a log line', { timeout }, async (t) => { t.equal(documents[0].msg, 'hello world') }) -test('Ignores a boolean line even though it is JSON-parseable', { timeout }, (t) => { +test('Ignores a boolean line even though it is JSON-parsable', { timeout }, (t) => { t.plan(2) - const instance = elastic({ index, type, consistency, node }) + const instance = elastic({ index, type, node, auth }) instance.on('unknown', (obj, body) => { t.equal(obj, 'true', 'Object is parsed') @@ -85,7 +106,7 @@ test('Ignores a boolean line even though it is JSON-parseable', { timeout }, (t) test('Ignores "null" being parsed as json', { timeout }, (t) => { t.plan(2) - const instance = elastic({ index, type, consistency, node }) + const instance = elastic({ index, type, node, auth }) instance.on('unknown', (obj, body) => { t.equal(obj, 'null', 'Object is parsed') @@ -98,7 +119,7 @@ test('Ignores "null" being parsed as json', { timeout }, (t) => { test('Can process number being parsed as json', { timeout }, (t) => { t.plan(0) - const instance = elastic({ index, type, consistency, node }) + const instance = elastic({ index, type, node, auth }) instance.on('unknown', (obj, body) => { t.error(obj, body) @@ -110,7 +131,7 @@ test('Can process number being parsed as json', { timeout }, (t) => { test('store an deeply nested log line', { timeout }, async (t) => { t.plan(2) - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) log.info({ @@ -138,7 +159,7 @@ test('store an deeply nested log line', { timeout }, async (t) => { test('store lines in bulk', { timeout }, async (t) => { t.plan(6) - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) log.info('hello world') @@ -167,7 +188,7 @@ test('replaces date in index', { timeout }, async (t) => { t.plan(2) const index = 'pinotest-%{DATE}' - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) await client.indices.delete( @@ -194,7 +215,7 @@ test('replaces date in index during bulk insert', { timeout }, async (t) => { t.plan(6) const index = 'pinotest-%{DATE}' - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) await client.indices.delete( @@ -227,9 +248,8 @@ test('replaces date in index during bulk insert', { timeout }, async (t) => { test('Use ecs format', { timeout }, async (t) => { t.plan(2) - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) - const ecsFormat = EcsFormat() - const log = pino({ ...ecsFormat }, instance) + const instance = elastic({ index, type, node, esVersion, auth }) + const log = pino({ ...ecsFormat() }, instance) log.info('hello world') @@ -257,7 +277,7 @@ test('dynamic index name', { timeout }, async (t) => { return indexNameGenerated } - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) log.info('hello world') @@ -286,7 +306,7 @@ test('dynamic index name during bulk insert', { timeout }, async (t) => { return indexNameGenerated } - const instance = elastic({ index, type, consistency, node, 'es-version': esVersion }) + const instance = elastic({ index, type, node, esVersion, auth }) const log = pino(instance) log.info('hello world') @@ -316,7 +336,7 @@ test('handle datastreams during bulk insert', { timeout }, async (t) => { // Arrange t.plan(6) - const instance = elastic({ index: streamIndex, type, consistency, node, 'es-version': esVersion, op_type: 'create' }) + const instance = elastic({ index: streamIndex, type, node, esVersion, opType: 'create', auth }) const log = pino(instance) // Act diff --git a/test/unit.test.js b/test/unit.test.js index 5f074be..1c6a96b 100644 --- a/test/unit.test.js +++ b/test/unit.test.js @@ -10,13 +10,12 @@ const matchISOString = /\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d\.\d+([+-][0 const options = { index: 'pinotest', type: 'log', - consistency: 'one', node: 'http://localhost:9200' } const dsOptions = { ...options, - op_type: 'create' + opType: 'create' } test('make sure date format is valid', (t) => { @@ -30,9 +29,7 @@ test('make sure log is a valid json', (t) => { const Client = function (config) { t.equal(config.node, options.node) } - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -55,9 +52,7 @@ test('make sure log is a valid json', (t) => { test('date can be a number', (t) => { t.plan(1) const Client = function (config) {} - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} const threeDaysInMillis = 3 * 24 * 60 * 60 * 1000 const time = new Date(Date.now() - threeDaysInMillis) @@ -82,13 +77,39 @@ test('date can be a number', (t) => { test('Uses the type parameter only with ES < 7 / 1', (t) => { t.plan(2) + const Client = function (config) { t.equal(config.node, options.node) } - Client.prototype.diagnostic = { - on: () => {} + Client.prototype.on = () => {} + + Client.prototype.helpers = { + async bulk (opts) { + for await (const chunk of opts.datasource) { + const action = opts.onDocument(chunk) + t.equal(action.index._type, 'log') + } + } } + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + const instance = elastic(Object.assign(options, { esVersion: 6 })) + const log = pino(instance) + const prettyLog = `some logs goes here. + another log...` + log.info(['info'], prettyLog) +}) + +test('Uses the type parameter only with ES < 7 / 1, even with the deprecated `es-version` option', (t) => { + t.plan(2) + + const Client = function (config) { + t.equal(config.node, options.node) + } + Client.prototype.on = () => {} + Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -110,12 +131,36 @@ test('Uses the type parameter only with ES < 7 / 1', (t) => { test('Uses the type parameter only with ES < 7 / 2', (t) => { t.plan(2) + const Client = function (config) { t.equal(config.node, options.node) } - Client.prototype.diagnostic = { - on: () => {} + Client.prototype.on = () => {} + Client.prototype.helpers = { + async bulk (opts) { + for await (const chunk of opts.datasource) { + const action = opts.onDocument(chunk) + t.equal(action.index._type, undefined) + } + } + } + + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + const instance = elastic(Object.assign(options, { esVersion: 7 })) + const log = pino(instance) + const prettyLog = `some logs goes here. + another log...` + log.info(['info'], prettyLog) +}) + +test('Uses the type parameter only with ES < 7 / 2, even with the deprecate `es-version` option', (t) => { + t.plan(2) + const Client = function (config) { + t.equal(config.node, options.node) } + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -140,9 +185,7 @@ test('ecs format', (t) => { const Client = function (config) { t.equal(config.node, options.node) } - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) { for await (const chunk of opts.datasource) { @@ -183,9 +226,7 @@ test('auth and cloud parameters are properly passed to client', (t) => { t.equal(config.auth, opts.auth) t.equal(config.cloud, opts.cloud) } - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) {} } @@ -195,7 +236,7 @@ test('auth and cloud parameters are properly passed to client', (t) => { elastic(opts) }) -test('apikey is passed through auth param properly to client', (t) => { +test('apiKey is passed through auth param properly to client', (t) => { const opts = { ...options, auth: { @@ -208,9 +249,7 @@ test('apikey is passed through auth param properly to client', (t) => { t.equal(config.node, opts.node) t.equal(config.auth, opts.auth) } - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) {} } @@ -220,43 +259,104 @@ test('apikey is passed through auth param properly to client', (t) => { elastic(opts) }) -test('make sure `flush-interval` is passed to bulk request', (t) => { - t.plan(2) - const Client = function (config) { - t.equal(config.node, options.node) - } - Client.prototype.diagnostic = { - on: () => {} - } +test('make sure `flushInterval` is passed to bulk request', (t) => { + t.plan(1) + + const Client = function (config) {} + Client.prototype.on = () => {} + Client.prototype.helpers = { async bulk (opts) { t.equal(opts.flushInterval, 12345) - t.end() } } const elastic = proxyquire('../', { '@elastic/elasticsearch': { Client } }) - options['flush-interval'] = 12345 + options.flushInterval = 12345 const instance = elastic(options) const log = pino(instance) log.info(['info'], 'abc') }) -test('make sure `op_type` is passed to bulk onDocument request', (t) => { - t.plan(2) +test('make sure deprecated `flush-interval` is passed to bulk request', (t) => { + t.plan(1) + + const flushInterval = 12345 + + const Client = function (config) {} + Client.prototype.on = () => {} + + Client.prototype.helpers = { + async bulk (opts) { + t.equal(opts.flushInterval, flushInterval) + } + } + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options, 'flush-interval': flushInterval }) + const log = pino(instance) + log.info(['info'], 'abc') +}) + +test('make sure `flushBytes` is passed to bulk request', (t) => { + t.plan(1) + + const flushBytes = true + + const Client = function (config) {} + Client.prototype.on = () => {} + + Client.prototype.helpers = { + async bulk (opts) { + t.equal(opts.flushBytes, flushBytes) + } + } + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options, flushBytes }) + const log = pino(instance) + log.info(['info'], 'abc') +}) + +test('make sure deprecated `flush-bytes` is passed to bulk request', (t) => { + t.plan(1) + + const flushBytes = true const Client = function (config) {} - Client.prototype.diagnostic = { - on: () => {} + Client.prototype.on = () => {} + + Client.prototype.helpers = { + async bulk (opts) { + t.equal(opts.flushBytes, flushBytes) + } } + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options, 'flush-bytes': flushBytes }) + const log = pino(instance) + log.info(['info'], 'abc') +}) + +test('make sure `opType` is passed to bulk onDocument request', (t) => { + t.plan(2) + + const Client = function (config) {} + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) { const result = opts.onDocument({}) t.equal(result.index._index, dsOptions.index, `_index should be correctly set to \`${dsOptions.index}\``) - t.equal(result.index.op_type, dsOptions.op_type, `\`op_type\` should be set to \`${dsOptions.op_type}\``) + t.equal(result.index.op_type, dsOptions.opType, `\`op_type\` should be set to \`${dsOptions.opType}\``) t.end() } } @@ -269,16 +369,39 @@ test('make sure `op_type` is passed to bulk onDocument request', (t) => { log.info(['info'], 'abc') }) -test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) => { +test('make sure deprecated `op_type` is passed to bulk onDocument request', (t) => { + t.plan(2) + + const Client = function (config) {} + Client.prototype.on = () => {} + + Client.prototype.helpers = { + async bulk (opts) { + const result = opts.onDocument({}) + t.equal(result.index._index, dsOptions.index, `_index should be correctly set to \`${dsOptions.index}\``) + t.equal(result.index.op_type, dsOptions.opType, `\`op_type\` should be set to \`${dsOptions.opType}\``) + t.end() + } + } + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const { opType, ...rest } = dsOptions + + const instance = elastic({ ...rest, op_type: opType }) + const log = pino(instance) + log.info(['info'], 'abc') +}) + +test('make sure `@timestamp` is correctly set when `opType` is `create`', (t) => { t.plan(1) const document = { time: '2021-09-01T01:01:01.038Z' } const Client = function (config) {} - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} Client.prototype.helpers = { async bulk (opts) { @@ -299,9 +422,7 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) = test('resurrect client connection pool when datasource split is destroyed', (t) => { let isResurrected = false const Client = function (config) {} - Client.prototype.diagnostic = { - on: () => {} - } + Client.prototype.on = () => {} Client.prototype.helpers = { bulk: async function (opts) { @@ -328,3 +449,67 @@ test('resurrect client connection pool when datasource split is destroyed', (t) const prettyLog = 'Example of a log' log.info(['info'], prettyLog) }) + +test('make sure deprecated `rejectUnauthorized` is passed to client constructor', (t) => { + t.plan(1) + + const rejectUnauthorized = true + + const Client = function (config) { + t.equal(config.ssl.rejectUnauthorized, rejectUnauthorized) + } + + Client.prototype.on = () => {} + Client.prototype.helpers = { async bulk () {} } + + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options, rejectUnauthorized }) + const log = pino(instance) + log.info(['info'], 'abc') +}) + +test('make sure `tls.rejectUnauthorized` is passed to client constructor', (t) => { + t.plan(1) + + const tls = { rejectUnauthorized: true } + + const Client = function (config) { + t.equal(config.ssl.rejectUnauthorized, tls.rejectUnauthorized) + } + + Client.prototype.on = () => {} + Client.prototype.helpers = { async bulk () {} } + + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options, tls }) + const log = pino(instance) + log.info(['info'], 'abc') +}) + +test('make sure `tls.rejectUnauthorized` overrides deprecated `rejectUnauthorized`', (t) => { + t.plan(1) + + const rejectUnauthorized = true + const tls = { rejectUnauthorized: false } + + const Client = function (config) { + t.equal(config.ssl.rejectUnauthorized, tls.rejectUnauthorized) + } + + Client.prototype.on = () => {} + Client.prototype.helpers = { async bulk () {} } + + const elastic = proxyquire('../', { + '@elastic/elasticsearch': { Client } + }) + + const instance = elastic({ ...options, rejectUnauthorized, tls }) + const log = pino(instance) + log.info(['info'], 'abc') +}) diff --git a/usage.txt b/usage.txt index 57053d9..81e22de 100644 --- a/usage.txt +++ b/usage.txt @@ -15,7 +15,6 @@ -t | --type the name of the type to use; default: log -f | --flush-bytes the number of bytes for each bulk insert; default: 1000 -t | --flush-interval time that the helper will wait before flushing; default: 30000 - -b | --bulk-size the number of documents for each bulk insert [DEPERCATED] -l | --trace-level trace level for the elasticsearch client, default 'error' (info, debug, trace). --es-version specify the major version number of Elasticsearch (eg: 5, 6, 7) (this is needed only if you are using Elasticsearch <= 7)