@@ -231,25 +231,26 @@ const pipeline = [
231
231
}
232
232
233
233
doInsertData = false
234
- sqlTransaction = 'START TRANSACTION;\n '
234
+ sqlTransaction = ''
235
235
let cntR = 0
236
+ sqlTransaction =
237
+ sqlTransaction +
238
+ 'INSERT INTO realtime_data (tag, time_tag, json_data) VALUES '
236
239
while ( ! sqlRtDataQueue . isEmpty ( ) ) {
237
240
doInsertData = true
238
241
let sql = sqlRtDataQueue . peek ( )
239
242
sqlRtDataQueue . dequeue ( )
240
- sqlTransaction =
241
- sqlTransaction +
242
- 'INSERT INTO realtime_data (tag, time_tag, json_data) VALUES '
243
- sqlTransaction = sqlTransaction + ' (' + sql + ') '
244
- sqlTransaction =
245
- sqlTransaction +
246
- 'ON CONFLICT (tag) DO UPDATE SET time_tag=EXCLUDED.time_tag, json_data=EXCLUDED.json_data;\n'
243
+ sqlTransaction = sqlTransaction + '\n (' + sql + '),'
247
244
cntR ++
248
245
}
246
+ sqlTransaction = sqlTransaction . substring ( 0 , sqlTransaction . length - 1 ) // remove last comma
247
+ sqlTransaction = sqlTransaction + ' \n'
248
+ sqlTransaction =
249
+ sqlTransaction +
250
+ 'ON CONFLICT (tag) DO UPDATE SET time_tag=EXCLUDED.time_tag, json_data=EXCLUDED.json_data;\n'
249
251
if ( cntR ) Log . log ( 'PGSQL RT updates ' + cntR )
250
252
251
253
if ( doInsertData ) {
252
- sqlTransaction = sqlTransaction + 'COMMIT;\n'
253
254
fs . writeFile (
254
255
sqlFilesPath +
255
256
'pg_rtdata_' +
@@ -450,6 +451,19 @@ const pipeline = [
450
451
try {
451
452
if ( change . operationType === 'delete' ) return
452
453
454
+ // // for older versions of mongodb
455
+ // if (
456
+ // change.operationType === 'replace' &&
457
+ // !change?.updateDescription?.updatedFields &&
458
+ // change.fullDocument.sourceDataUpdate
459
+ // ) {
460
+ // change['updateDescription'] = {
461
+ // updatedFields: {
462
+ // sourceDataUpdate: change.fullDocument.sourceDataUpdate,
463
+ // },
464
+ // }
465
+ // }
466
+
453
467
let isSOE = false
454
468
let alarmRange = 0
455
469
0 commit comments