Skip to content

Commit

Permalink
fix support for elasticsearch v7 + add types (#169)
Browse files Browse the repository at this point in the history
* fix support for elasticsearch v7 + add types

* add tests for types + minor improvements

* add back tls option

---------

Co-authored-by: Roman UNTILOV <[email protected]>
  • Loading branch information
robyte-ctrl and Roman UNTILOV authored Aug 18, 2023
1 parent 2610f4d commit e4beebb
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 148 deletions.
67 changes: 26 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ npm install pino-elasticsearch -g
-t | --type the name of the type to use; default: log
-f | --flush-bytes the number of bytes for each bulk insert; default: 1000
-t | --flush-interval time that the helper will wait before flushing; default: 30000
-b | --bulk-size the number of documents for each bulk insert [DEPRECATED]
-l | --trace-level trace level for the elasticsearch client, default 'error' (info, debug, trace).
| --es-version specify the major version number of Elasticsearch (eg: 5, 6, 7)
(this is needed only if you are using Elasticsearch <= 7)
Expand All @@ -50,10 +49,9 @@ const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})

const logger = pino({ level: 'info' }, streamToElastic)
Expand All @@ -72,10 +70,9 @@ const pinoMultiStream = require('pino-multi-stream').multistream;

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
});

const pinoOptions = {};
Expand All @@ -100,10 +97,9 @@ const Connection = <custom Connection>

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000,
esVersion: 7,
flushBytes: 1000,
Connection
})

Expand All @@ -124,10 +120,9 @@ const pinoElastic = require('pino-elasticsearch');

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})

streamToElastic.on('<event>', (error) => console.log(event));
Expand All @@ -137,7 +132,7 @@ The following table lists the events emitted by the stream handler:

| Event | Callback Signature | Description |
| ----- | ------------------ | ----------- |
| `unknown` | `(line: string, error: string) => void` | Event received by `pino-elasticsearch` is unparseable (via `JSON.parse`) |
| `unknown` | `(line: string, error: string) => void` | Event received by `pino-elasticsearch` is unparsable (via `JSON.parse`) |
| `insertError` | `(error: Error & { document: Record<string, any> }) => void` | The bulk insert request to Elasticsearch failed (records dropped). |
| `insert` | `(stats: Record<string, any>) => void` | Called when an insert was successfully performed |
| `error` | `(error: Error) => void` | Called when the Elasticsearch client fails for some other reason |
Expand All @@ -154,10 +149,9 @@ const pinoElastic = require('pino-elasticsearch');

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})

streamToElastic.on(
Expand Down Expand Up @@ -198,10 +192,9 @@ const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})

const logger = pino({ level: 'info', ...ecsFormat }, streamToElastic)
Expand All @@ -227,7 +220,6 @@ const streamToElastic = pinoElastic({
// the logTime is a ISO 8601 formatted string of the log line
return `awesome-app-${logTime.substring(5, 10)}`
},
consistency: 'one',
node: 'http://localhost:9200'
})
// ...
Expand All @@ -237,44 +229,42 @@ The function **must** be sync, doesn't throw and return a string.

#### Datastreams

Indexing to datastreams requires the `op_type` to be set to `create`:
Indexing to datastreams requires the `opType` to be set to `create`:
```js
const pino = require('pino')
const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: "type-dataset-namespace",
consistency: 'one',
node: 'http://localhost:9200',
op_type: 'create'
opType: 'create'
})
// ...
```

#### Error handling
```js
const pino = require('pino')
const ecsFormat = require('@elastic/ecs-pino-format')()
const ecsFormat = require('@elastic/ecs-pino-format')
const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})

// Capture errors like unable to connect Elasticsearch instance.
streamToElastic.on('error', (error) => {
console.error('Elasticsearch client error:', error);
})
// Capture errors returned from Elasticsearch, "it will be called for everytime a document can't be indexed".
// Capture errors returned from Elasticsearch, "it will be called every time a document can't be indexed".
streamToElastic.on('insertError', (error) => {
console.error('Elasticsearch server error:', error);
})

const logger = pino({ level: 'info', ...ecsFormat }, streamToElastic)
const logger = pino({ level: 'info', ...ecsFormat() }, streamToElastic)

logger.info('hello world')
```
Expand Down Expand Up @@ -306,14 +296,13 @@ const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
auth: {
username: 'user',
password: 'pwd'
},
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})
```

Expand All @@ -323,16 +312,15 @@ const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: 'an-index',
consistency: 'one',
node: 'http://localhost:9200',
cloud: {
id: 'name:bG9jYWxob3N0JGFiY2QkZWZnaA=='
},
auth: {
apiKey: 'apikey123'
},
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})
```

Expand All @@ -345,9 +333,8 @@ use pino-elasticsearch as a module is simple, use [pino-multi-stream](https://ww
```js
const pinoms = require('pino-multi-stream')
const pinoEs = require('pino-elasticsearch')({
host: '192.168.1.220',
index: 'zb',
port: '9200'
node: 'http://192.168.1.220:9200',
index: 'zb'
})

const logger = pinoms({
Expand All @@ -366,8 +353,6 @@ logger.error('error')

```

*** Notice, the `host` and `port` parameters of `pino-elasticsearch` are required ***

## Setup and Testing

Setting up pino-elasticsearch is easy, and you can use the bundled
Expand Down
3 changes: 1 addition & 2 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ const flags = minimist(process.argv.slice(2), {
help: 'h',
node: 'n',
index: 'i',
'bulk-size': 'b',
'flush-bytes': 'f',
'flush-interval': 't',
'trace-level': 'l',
Expand All @@ -72,7 +71,7 @@ const flags = minimist(process.argv.slice(2), {
}
})

const allowedProps = ['node', 'index', 'bulk-size', 'flush-btyes', 'flush-interval', 'trace-level', 'username', 'password', 'api-key', 'cloud', 'es-version', 'rejectUnauthorized']
const allowedProps = ['node', 'index', 'flush-bytes', 'flush-interval', 'trace-level', 'username', 'password', 'api-key', 'cloud', 'es-version', 'rejectUnauthorized']

if (flags['read-config']) {
if (flags['read-config'].match(/.*\.json$/) !== null) {
Expand Down
5 changes: 2 additions & 3 deletions example.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ const elastic = require('./lib')({
auth: {
apiKey: 'someKey'
},
consistency: 'one',
'es-version': 7,
'flush-bytes': 1000
esVersion: 7,
flushBytes: 1000
})

const level = 'trace'
Expand Down
56 changes: 56 additions & 0 deletions lib.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import type { Transform } from 'stream'
import type { ClientOptions } from '@elastic/elasticsearch'

export default pinoElasticsearch

declare function pinoElasticsearch(options?: Options): DestinationStream

export type DestinationStream = Transform & {
/**
* when something, that cannot be parsed, is encountered
*/
on(event: 'unknown', handler: (line: string, error: string) => void): void
/**
* when a bulk insert request failed which resulted in logs being dropped
*/
on(event: 'insertError', handler: (error: Error & { document: Record<string, any> }) => void): void
/**
* when a batch of logs was sent successfully
*/
on(event: 'insert', handler: (stats: Record<string, any>) => void): void
/**
* when some other kind of error happened, e.g. connection issues
*/
on(event: 'error', handler: (error: Error) => void): void
}

export type Options = Pick<ClientOptions, 'node' | 'auth' | 'cloud' | 'caFingerprint' | 'Connection' | 'ConnectionPool'> & {
index?: Index

type?: string

/** @deprecated use `opType` instead */
op_type?: OpType;
opType?: OpType;

/** @deprecated use `flushBytes` instead */
'flush-bytes'?: number | undefined
flushBytes?: number | undefined

/** @deprecated use `flushInterval` instead */
'flush-interval'?: number | undefined
flushInterval?: number | undefined

/** @deprecated use `esVersion` instead */
'es-version'?: number | undefined
esVersion?: number | undefined

/** @deprecated use `tls.rejectUnauthorized` instead */
rejectUnauthorized?: boolean

tls?: ClientOptions['ssl'];
}

export type Index = string | `${string | ''}%{DATE}${string | ''}` | ((logTime: string) => string)

export type OpType = 'create' | 'index'
40 changes: 21 additions & 19 deletions lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@
/* eslint no-prototype-builtins: 0 */

const split = require('split2')
const {
Client,
ClusterConnectionPool,
HttpConnection
} = require('@elastic/elasticsearch')
const { Client } = require('@elastic/elasticsearch')

function initializeBulkHandler (opts, client, splitter) {
const esVersion = Number(opts['es-version']) || 7
const esVersion = Number(opts.esVersion || opts['es-version']) || 7
const index = opts.index || 'pino'
const buildIndexName = typeof index === 'function' ? index : null
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
const opType = esVersion >= 7 ? opts.op_type : undefined
const opType = esVersion >= 7 ? (opts.opType || opts.op_type) : undefined

// Resurrect connection pool on destroy
splitter.destroy = () => {
Expand All @@ -25,8 +21,8 @@ function initializeBulkHandler (opts, client, splitter) {

const bulkInsert = client.helpers.bulk({
datasource: splitter,
flushBytes: opts['flush-bytes'] || 1000,
flushInterval: opts['flush-interval'] || 30000,
flushBytes: opts.flushBytes || opts['flush-bytes'] || 1000,
flushInterval: opts.flushInterval || opts['flush-interval'] || 30000,
refreshOnCompletion: getIndexName(),
onDocument (doc) {
const date = doc.time || doc['@timestamp']
Expand Down Expand Up @@ -62,9 +58,21 @@ function initializeBulkHandler (opts, client, splitter) {
}
}

function pinoElasticSearch (opts) {
function pinoElasticSearch (opts = {}) {
if (opts['flush-bytes']) {
process.emitWarning('The "flush-bytes" option has been deprecated, use "flushBytes" instead')
}

if (opts['flush-interval']) {
process.emitWarning('The "flush-interval" option has been deprecated, use "flushInterval" instead')
}

if (opts['es-version']) {
process.emitWarning('The "es-version" option has been deprecated, use "esVersion" instead')
}

if (opts['bulk-size']) {
process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead')
process.emitWarning('The "bulk-size" option has been removed, use "flushBytes" instead')
delete opts['bulk-size']
}

Expand Down Expand Up @@ -115,13 +123,7 @@ function pinoElasticSearch (opts) {
node: opts.node,
auth: opts.auth,
cloud: opts.cloud,
ssl: { rejectUnauthorized: opts.rejectUnauthorized },
Connection: HttpConnection,
ConnectionPool: ClusterConnectionPool
}

if (opts.tls) {
clientOpts.tls = opts.tls
ssl: { rejectUnauthorized: opts.rejectUnauthorized, ...opts.tls }
}

if (opts.caFingerprint) {
Expand All @@ -138,7 +140,7 @@ function pinoElasticSearch (opts) {

const client = new Client(clientOpts)

client.diagnostic.on('resurrect', () => {
client.on('resurrect', () => {
initializeBulkHandler(opts, client, splitter)
})

Expand Down
Loading

0 comments on commit e4beebb

Please sign in to comment.