-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
331 lines (290 loc) · 9 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
/**
* @file
*/
'use strict';
// Required Node.js modules.
const Pool = require('pg').Pool;
const Promise = require('bluebird');
const sequence = require('seqnce');
/**
* Using the specified connection pool, performs the specified function using
* a series of queries and a user-defined callback function.
* @param {BoundPool} pool
* @param {Function} fn
* @param {Array.<Object>} queries
* @param {Function} callback
* @return {Promise}
* @private
*/
function multiple(pool, fn, queries, callback) {
let queryObjects = queries.map(query => {
if (typeof query === 'string') {
return { text: query, values: [] };
}
return Object.assign({ values: [] }, query);
});
if (callback && typeof callback === 'function') {
return fn(pool, queryObjects, callback);
}
return new Promise((resolve, reject) => {
fn(pool, queryObjects, (error, result) => {
if (error) {
return reject(error);
}
resolve(result);
});
});
};
/**
* Normalizes arguments passed to any of the pga methods that handle multiple
* queries- parallel or transact.
* @param {Array} args [description]
* @return {Object}
* @private
*/
function normalize(args) {
let result = {
queries: null,
callback: null
};
if (args.length === 1 && Array.isArray(args[0])) {
result.queries = args[0];
} else if (args.length === 2 && Array.isArray(args[0])) {
result.queries = args[0];
if (typeof args[1] === 'function') {
result.callback = args[1];
}
} else {
result.queries = args;
if (typeof result.queries[result.queries.length - 1] === 'function') {
result.callback = result.queries.pop();
}
}
return result;
};
/**
* Using the specified connection pool, performs a series of specified queries
* in parallel, executing a specified callback function once all queries have
* successfully completed.
* @param {BoundPool} pool
* @param {Array.<Object>} queries
* @param {Function} callback
* @private
*/
function performParallel(pool, queries, callback) {
let count = 0,
results = new Array(queries.length);
queries.forEach((query, index) => {
pool.query(query, (error, result) => {
if (error) {
return callback(error, results);
}
results[index] = result;
if (++count === queries.length) {
return callback(null, results);
}
});
});
};
/**
* Using the specified connection pool, performs a series of specified queries
* using any specified parameters in sequence, finally executing a specified
* callback function with any error or result. Will automatically rollback the
* transaction if it fails and commit if it succeeds.
* @param {BoundPool} pool
* @param {Array.<Object>} queries
* @param {Function} callback
* @private
*/
function performTransaction(pool, queries, callback) {
pool.connect((error, client, done) => {
if (error) {
done(client);
return callback(error, null);
}
client.query('BEGIN', error => {
if (error) {
return rollback(client, done, error, callback);
}
sequence(queries, (results, current, next) => {
let query = current.text,
params = current.values;
client.query(query, params, (error, result) => {
if (error) {
return rollback(client, done, error, callback);
}
results.push(result);
next();
});
}, (results) => {
client.query('COMMIT', error => {
if (error) {
return rollback(client, done, error, callback);
}
done(client);
return callback(null, results);
});
});
});
});
};
/**
* Rolls back any failed transaction.
* @param {Client} client
* @param {Function} done
* @param {Error} error
* @param {Function} callback
* @private
*/
function rollback(client, done, error, callback) {
client.query('ROLLBACK', rollbackError => {
done(rollbackError);
return callback(rollbackError || error, null);
});
};
module.exports = function makeAdapter(config) {
let pool = new Pool(config);
/**
* @type {PostgreSQLAdapter}
*/
return {
/**
* Closes the connection pool.
* @return {null}
* @public
*/
close: function close() {
return pool.end.apply(pool, arguments);
},
/**
* Performs a series of database queries in parallel over a multiple client
* connections to optimize performance, returning results after all the
* queries have finished execution. The callback is optional, and if no
* callback is provided, #parallel will return a Promise object. An error
* in any one of the queries will result in the immediate termination of
* the function, yielding the execution of the callback with an error and
* a potential partial array of results from other successful queries. When
* used to return a Promise object, the Promise will be rejected on the
* first error without exposing any completed results.
*
* It is not safe to use #parallel with queries that may have an impact on
* the database.
* @param {Array.<Object>} queries
* @param {Function} [callback] Optional.
* @return {*}
* @public
* @example
* const pga = require('pga');
* let db = pga(config);
*
* db.parallel([
* { text: 'SELECT COUNT(*) FROM test;' },
* { text: 'SELECT * FROM test WHERE id = $1::int;', values: [ 1 ] },
* { text: 'SELECT * FROM test;' }
* ], function(error, results) {
* if (error) {
* return console.error(error);
* }
* console.log(results);
* });
*
* db.parallel([
* { text: 'SELECT COUNT(*) FROM test;' },
* { text: 'SELECT * FROM test WHERE id = $1::int;', values: [ 1 ] },
* { text: 'SELECT * FROM test;' }
* ]).then(function(results) {
* console.log(results);
* }).catch(function(error) {
* console.error(error);
* });
*/
parallel(...args) {
let { queries, callback } = normalize(args);
return multiple(pool, performParallel, queries, callback);
},
/**
* Exported for the sake of unit testing, primarily.
* @type {Pool}
*/
pool: pool,
/**
* Performs a basic query using the pg-pool module's #query method.
* @param {String} query
* @param {Array} [params] Optional.
* @param {Function} [callback] Optional.
* @return {*}
* @public
* @example
* const pga = require('pga');
* let db = pga(config);
*
* db.query('SELECT * FROM test;', function(error, result) {
* if (error) {
* return console.error(error);
* }
* console.log(result);
* });
*
* db.query('SELECT * FROM test;').then(function(result) {
* console.log(result);
* }).catch(
* console.error(error);
* });
*
* db.query('SELECT * FROM test WHERE name = $1::text;', ['testing'], function(error, result) {
* if (error) {
* return console.error(error);
* }
* console.log(result);
* });
*
* db.query('SELECT * FROM test WHERE name = $1::text;', ['testing']).then(function(result) {
* console.log(result);
* }).catch(function(error) {
* console.error(error);
* });
*/
query() {
return pool.query.apply(pool, arguments);
},
/**
* Performs a database transaction, or a sequential set of SQL queries. The
* callback is optional, and if no callback is provided, #transact will
* return a Promise object.
* @param {Array.<Object>} queries
* @param {Function} [callback] Optional.
* @return {*}
* @public
* @example
* const pga = require('pga');
* let db = pga(config);
*
* db.transact([
* { text: 'SELECT COUNT(*) FROM test;' },
* { text: 'SELECT * FROM test WHERE id = $1::int;', values: [ 1 ] },
* { text: 'INSERT INTO test (name) VALUES ($1:text);', values: [ 'Name!' ] },
* { text: 'SELECT COUNT(*) FROM test;' }
* ], function(error, results) {
* if (error) {
* return console.error(error);
* }
* console.log(results);
* });
*
* db.transact([
* { text: 'SELECT COUNT(*) FROM test;' },
* { text: 'SELECT * FROM test WHERE id = $1::int;', values: [ 1 ] },
* { text: 'INSERT INTO test (name) VALUES ($1:text);', values: [ 'Name!' ] },
* { text: 'SELECT COUNT(*) FROM test;' }
* ]).then(function(results) {
* console.log(results);
* }).catch(function(error) {
* console.error(error);
* });
*/
transact(...args) {
let { queries, callback } = normalize(args);
return multiple(pool, performTransaction, queries, callback);
}
};
};