forked from dorianj/postgres-csvlog
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
88 lines (80 loc) · 2.21 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"use strict";
const _ = require('lodash');
const csv = require('csv');
const multipipe = require('multipipe');
const stream = require('stream');
const _durationPattern = /^duration: (\d+\.\d+)\s*ms\s+plan:\s*([\s\S]*)$/;
const _textPattern = /^Query Text:\s*([\s\S]*)$/;
const _dateFields = ['session_start_time', 'log_time'];
const _numericFields = ['process_id', 'session_line_num'];
class PostgresCSVLog extends stream.Transform {
constructor(options) {
super({
allowHalfOpen: false,
readableObjectMode: true,
writableObjectMode: true
});
}
_transform(record, encoding, callback) {
for (const field of _dateFields) {
record[field] = new Date(record[field]);
}
for (const field of _numericFields) {
record[field] = +record[field];
}
// Attempt to parse log_min_duration_statement lines.
if (record.sql_state_code === '00000') {
const match = record.message.match(_durationPattern);
if (_.size(match)) {
record.duration = +match[1];
const textMatch = match[2].match(_textPattern);
if (_.size(textMatch)) {
record.query = textMatch[1];
}
else {
try {
record.plan = JSON.parse(match[2]);
record.query = record.plan['Query Text'];
}
catch (e) {
// If the query/plan is not in text format or JSON format, we just
// ignore it here since there isn't much else we can do.
}
}
}
}
this.push(record);
setImmediate(callback);
}
}
module.exports = (options) => {
const csvParser = csv.parse({
columns: [
'log_time',
'user_name',
'database_name',
'process_id',
'connection_from',
'session_id',
'session_line_num',
'command_tag',
'session_start_time',
'virtual_transaction_id',
'transaction_id',
'error_severity',
'sql_state_code',
'message',
'detail',
'hint',
'internal_query',
'internal_query_pos',
'context',
'query',
'query_pos',
'location',
'application_name',
],
});
const logParser = new PostgresCSVLog(options);
return multipipe(csvParser, logParser);
};