Skip to content

Commit

Permalink
Datastream support using op_type create config (#101)
Browse files Browse the repository at this point in the history
* Allow indexing to datastreams

* Create unit tests for datastream handling

* Remove node nullable check in datastream unit test

* Acceptance test for datastreams (if supported)
  • Loading branch information
OhSoooLucky authored Sep 29, 2021
1 parent 75a9e8b commit ba8e46a
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 2 deletions.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,22 @@ const streamToElastic = pinoElastic({

The function **must** be sync, doesn't throw and return a string.

#### Datastreams

Indexing to datastreams requires the `op_type` to be set to `create`:
```js
const pino = require('pino')
const pinoElastic = require('pino-elasticsearch')

const streamToElastic = pinoElastic({
index: "type-dataset-namespace",
consistency: 'one',
node: 'http://localhost:9200',
op_type: 'create'
})
// ...
```

#### Error handling
```js
const pino = require('pino')
Expand Down
9 changes: 9 additions & 0 deletions docker-compose-v7-es-only-datastreams.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
version: '3.6'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.9.0
environment:
- discovery.type=single-node
container_name: elasticsearch
ports: ['9200:9200']
11 changes: 9 additions & 2 deletions lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,23 @@ function pinoElasticSearch (opts) {
const index = opts.index || 'pino'
const buildIndexName = typeof index === 'function' ? index : null
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
const opType = esVersion >= 7 ? opts.op_type : undefined
const b = client.helpers.bulk({
datasource: splitter,
flushBytes: opts['flush-bytes'] || 1000,
flushInterval: opts['flush-interval'] || 30000,
refreshOnCompletion: getIndexName(),
onDocument (doc) {
const date = doc.time || doc['@timestamp']
if (opType === 'create') {
doc['@timestamp'] = date
}

return {
index: {
_index: getIndexName(doc.time || doc['@timestamp']),
_type: type
_index: getIndexName(date),
_type: type,
op_type: opType
}
}
},
Expand Down
54 changes: 54 additions & 0 deletions test/acceptance.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const test = require('tap').test
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const EcsFormat = require('@elastic/ecs-pino-format')

const index = 'pinotest'
const streamIndex = 'logs-pino-test'
const type = 'log'
const consistency = 'one'
const node = 'http://localhost:9200'
Expand All @@ -20,8 +22,12 @@ tap.teardown(() => {
})

let esVersion = 7
let esMinor = 15
let es

const supportsDatastreams =
() => esVersion > 7 || (esVersion === 7 && esMinor >= 9)

tap.beforeEach(async () => {
if (es) {
es = IER()
Expand All @@ -31,8 +37,14 @@ tap.beforeEach(async () => {
}
const result = await client.info()
esVersion = Number(result.body.version.number.split('.')[0])
esMinor = Number(result.body.version.number.split('.')[1])
await client.indices.delete({ index }, { ignore: [404] })
await client.indices.create({ index })

if (supportsDatastreams()) {
await client.indices.deleteDataStream({ name: streamIndex }, { ignore: [404] })
await client.indices.createDataStream({ name: streamIndex })
}
})

test('store a log line', { timeout }, async (t) => {
Expand Down Expand Up @@ -298,3 +310,45 @@ test('dynamic index name during bulk insert', { timeout }, async (t) => {
t.equal(doc.msg, 'hello world')
}
})

test('handle datastreams during bulk insert', { timeout }, async (t) => {
if (supportsDatastreams()) {
// Arrange
t.plan(6)

const instance = elastic({ index: streamIndex, type, consistency, node, 'es-version': esVersion, op_type: 'create' })
const log = pino(instance)

// Act
const logEntries = [
{ time: '2021-09-01T01:01:01.732Z' },
{ time: '2021-09-01T01:01:02.400Z' },
{ time: '2021-09-01T01:01:02.948Z' },
{ time: '2021-09-02T01:01:03.731Z' },
{ time: '2021-09-02T03:00:45.704Z' }
]

logEntries.forEach(x => log.info(x, 'Hello world!'))

setImmediate(() => instance.end())

// Assert
const [stats] = await once(instance, 'insert')
t.equal(stats.successful, 5)

const documents = await client.helpers.search({
index: streamIndex,
type: esVersion >= 7 ? undefined : type,
body: {
query: { match_all: {} }
}
})

for (let i = 0; i < documents.length; i++) {
t.equal(documents[i]['@timestamp'], logEntries[i].time)
}
} else {
t.comment('The current elasticsearch version does not support datastreams yet!')
}
t.end()
})
54 changes: 54 additions & 0 deletions test/unit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ const options = {
node: 'http://localhost:9200'
}

const dsOptions = {
index: 'logs-pino-test',
type: 'log',
consistency: 'one',
node: 'http://localhost:9200',
op_type: 'create'
}

test('make sure date format is valid', (t) => {
t.type(fix.datetime.object, 'string')
t.equal(fix.datetime.object, fix.datetime.string)
Expand Down Expand Up @@ -220,3 +228,49 @@ test('make sure `flush-interval` is passed to bulk request', (t) => {
const log = pino(instance)
log.info(['info'], 'abc')
})

test('make sure `op_type` is passed to bulk onDocument request', (t) => {
t.plan(2)

const Client = function (config) {}

Client.prototype.helpers = {
async bulk (opts) {
const result = opts.onDocument({})
t.equal(result.index._index, 'logs-pino-test', '_index should be correctly set to `logs-pino-test`')
t.equal(result.index.op_type, 'create', '`op_type` should be set to `create`')
t.end()
}
}
const elastic = proxyquire('../', {
'@elastic/elasticsearch': { Client }
})

const instance = elastic(dsOptions)
const log = pino(instance)
log.info(['info'], 'abc')
})

test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) => {
t.plan(1)

const document = {
time: '2021-09-01T01:01:01.038Z'
}
const Client = function (config) {}

Client.prototype.helpers = {
async bulk (opts) {
opts.onDocument(document)
t.equal(document['@timestamp'], '2021-09-01T01:01:01.038Z', 'Document @timestamp does not equal the provided timestamp')
t.end()
}
}
const elastic = proxyquire('../', {
'@elastic/elasticsearch': { Client }
})

const instance = elastic(dsOptions)
const log = pino(instance)
log.info(['info'], 'abc')
})

0 comments on commit ba8e46a

Please sign in to comment.