-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
117 lines (104 loc) · 3.02 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"use strict";
const _ = require('lodash');
const csv = require('csv');
const multipipe = require('multipipe');
const stream = require('stream');
const _messagePattern = /^duration: (\d+\.\d+)\s*ms\s+(plan|statement):\s*([\s\S]*)$/;
const _textPattern = /^Query Text:\s*([\s\S]*)$/;
const _dateFields = ['session_start_time', 'log_time'];
const _numericFields = ['process_id', 'session_line_num'];
class PostgresCSVLog extends stream.Transform {
constructor(options) {
super({
allowHalfOpen: false,
readableObjectMode: true,
writableObjectMode: true
});
}
static extractAutoExplainMessageFields(messagePlan) {
const textMatch = messagePlan.match(_textPattern);
if (_.size(textMatch)) {
return {
query: textMatch[1]
};
}
else {
let plan;
try {
plan = JSON.parse(messagePlan);
}
catch (e) {
return;
// If the query/plan is not in text format or JSON format, we just
// ignore it here since there isn't much else we can do.
}
return {
plan: plan,
query: plan['Query Text'],
};
}
}
static extractStatementMessageFields(statement) {
return {
query: statement
};
}
_transform(record, encoding, callback) {
for (const field of _dateFields) {
record[field] = new Date(record[field]);
}
for (const field of _numericFields) {
record[field] = +record[field];
}
if (record.sql_state_code === '00000') {
const messageMatches = record.message.match(_messagePattern);
if (!_.size(messageMatches)) {
// If the record comes from a successful query but doesn't match a pattern we can parse duration, query, or plan from, it's dropped.
return setImmediate(callback);
}
record.duration = +messageMatches[1];
if (messageMatches[2] === 'plan') {
record.source = 'from_auto_explain';
_.assign(record, PostgresCSVLog.extractAutoExplainMessageFields(messageMatches[3]));
}
// Parse log_min_duration_statement lines, which start with `statement` instead of `plan`.
if (messageMatches[2] === 'statement') {
record.source = 'from_log_min_duration_statement';
_.assign(record, PostgresCSVLog.extractStatementMessageFields(messageMatches[3]));
}
}
this.push(record);
setImmediate(callback);
}
}
module.exports = (options) => {
const csvParser = csv.parse({
columns: [
'log_time',
'user_name',
'database_name',
'process_id',
'connection_from',
'session_id',
'session_line_num',
'command_tag',
'session_start_time',
'virtual_transaction_id',
'transaction_id',
'error_severity',
'sql_state_code',
'message',
'detail',
'hint',
'internal_query',
'internal_query_pos',
'context',
'query',
'query_pos',
'location',
'application_name',
],
});
const logParser = new PostgresCSVLog(options);
return multipipe(csvParser, logParser);
};