@@ -32,6 +32,10 @@ const { setInterval } = require('timers')
32
32
const MongoStatus = { HintMongoIsConnected : false }
33
33
const LowestPriorityThatBeeps = 1 // will beep for priorities zero and one
34
34
35
+ process . on ( 'uncaughtException' , ( err ) =>
36
+ Log . log ( 'Uncaught Exception:' + JSON . stringify ( err ) )
37
+ )
38
+
35
39
const args = process . argv . slice ( 2 )
36
40
var inst = null
37
41
if ( args . length > 0 ) inst = parseInt ( args [ 0 ] )
@@ -96,10 +100,11 @@ const pipeline = [
96
100
let sqlRtDataQueue = new Queue ( ) // queue of realtime values to insert on postgreSQL
97
101
let mongoRtDataQueue = new Queue ( ) // queue of realtime values to insert on MongoDB
98
102
let digitalUpdatesCount = 0
103
+ let clientMongo = null
99
104
100
105
// mark as frozen unchanged analog values greater than 1 after timeout
101
106
setInterval ( async function ( ) {
102
- if ( collection && MongoStatus . HintMongoIsConnected ) {
107
+ if ( collection && MongoStatus . HintMongoIsConnected && clientMongo ) {
103
108
collection
104
109
. updateMany (
105
110
{
@@ -133,13 +138,17 @@ const pipeline = [
133
138
}
134
139
} , 17317 )
135
140
136
- setInterval ( async function ( ) {
141
+ // process updates to mongo/realtimeData
142
+ async function processRtDataMongoUpdates ( ) {
137
143
if (
138
144
! collection ||
145
+ ! clientMongo ||
139
146
! MongoStatus . HintMongoIsConnected ||
140
147
mongoRtDataQueue . isEmpty ( )
141
- )
148
+ ) {
149
+ setTimeout ( processRtDataMongoUpdates , 150 )
142
150
return
151
+ }
143
152
let cnt = 0
144
153
let updArr = [ ]
145
154
while ( ! mongoRtDataQueue . isEmpty ( ) ) {
@@ -179,75 +188,81 @@ const pipeline = [
179
188
. catch ( function ( err ) {
180
189
Log . log ( 'Error on Mongodb query!' , err )
181
190
} )
182
- // Log.log(JSON.stringify(res))
183
191
if ( cnt ) Log . log ( 'Mongo Updates ' + cnt )
184
- } , 150 )
192
+ setTimeout ( processRtDataMongoUpdates , 150 )
193
+ }
194
+ processRtDataMongoUpdates ( )
185
195
186
- // write values to sql files for later insertion on postgreSQL
187
- setInterval ( async function ( ) {
188
- if ( ! histCollection || ! MongoStatus . HintMongoIsConnected ) return
189
- let doInsertData = false
190
- let sqlTransaction =
191
- 'START TRANSACTION;\n' +
192
- 'INSERT INTO hist (tag, time_tag, value, value_json, time_tag_at_source, flags) VALUES '
193
-
194
- let cntH = 0
195
- let insertArr = [ ]
196
- while ( ! sqlHistQueue . isEmpty ( ) ) {
197
- doInsertData = true
198
- let entry = sqlHistQueue . peek ( )
199
- sqlHistQueue . dequeue ( )
200
- sqlTransaction = sqlTransaction + '\n(' + entry . sql + '),'
201
- insertArr . push ( entry . obj )
202
- cntH ++
196
+ // write values to sql files for later insertion on postgreSQL, and mongo hist
197
+ async function processSqlAndMongoHistUpdates ( ) {
198
+ if ( ! histCollection || ! clientMongo || ! MongoStatus . HintMongoIsConnected ) {
199
+ setTimeout ( processSqlAndMongoHistUpdates , 333 )
200
+ return
203
201
}
204
- if ( cntH ) Log . log ( 'PGSQL/Mongo Hist updates ' + cntH )
205
202
206
- if ( doInsertData ) {
207
- histCollection
208
- . insertMany ( insertArr , { ordered : false , writeConcern : { w : 0 } } )
209
- . catch ( function ( err ) {
210
- Log . log ( 'Error on Mongodb query!' , err )
211
- } )
203
+ try {
204
+ let doInsertData = false
205
+ let sqlTransaction =
206
+ 'START TRANSACTION;\n' +
207
+ 'INSERT INTO hist (tag, time_tag, value, value_json, time_tag_at_source, flags) VALUES '
208
+
209
+ let cntH = 0
210
+ let insertArr = [ ]
211
+ while ( ! sqlHistQueue . isEmpty ( ) ) {
212
+ doInsertData = true
213
+ let entry = sqlHistQueue . peek ( )
214
+ sqlHistQueue . dequeue ( )
215
+ sqlTransaction = sqlTransaction + '\n(' + entry . sql + '),'
216
+ insertArr . push ( entry . obj )
217
+ cntH ++
218
+ }
219
+ if ( cntH ) Log . log ( 'PGSQL/Mongo Hist updates ' + cntH )
220
+
221
+ if ( doInsertData ) {
222
+ histCollection
223
+ . insertMany ( insertArr , { ordered : false , writeConcern : { w : 0 } } )
224
+ . catch ( function ( err ) {
225
+ Log . log ( 'Error on Mongodb query!' , err )
226
+ } )
227
+ sqlTransaction = sqlTransaction . substring ( 0 , sqlTransaction . length - 1 ) // remove last comma
228
+ sqlTransaction = sqlTransaction + ' \n'
229
+ // this cause problems when tag/time repeated on same transaction
230
+ // sqlTransaction = sqlTransaction + "ON CONFLICT (tag, time_tag) DO UPDATE SET value=EXCLUDED.value, value_json=EXCLUDED.value_json, time_tag_at_source=EXCLUDED.time_tag_at_source, flags=EXCLUDED.flags;\n";
231
+ sqlTransaction =
232
+ sqlTransaction + 'ON CONFLICT (tag, time_tag) DO NOTHING;\n'
233
+ sqlTransaction = sqlTransaction + 'COMMIT;\n'
234
+ fs . writeFile (
235
+ sqlFilesPath +
236
+ 'pg_hist_' +
237
+ new Date ( ) . getTime ( ) +
238
+ '_' +
239
+ jsConfig . Instance +
240
+ '.sql' ,
241
+ sqlTransaction ,
242
+ ( err ) => {
243
+ if ( err ) Log . log ( 'Error writing SQL file!' )
244
+ }
245
+ )
246
+ }
247
+
248
+ doInsertData = false
249
+ sqlTransaction = ''
250
+ let cntR = 0
251
+ sqlTransaction =
252
+ sqlTransaction +
253
+ 'WITH ordered_values AS ( SELECT DISTINCT ON (tag) tag, time_tag, json_data FROM (VALUES '
254
+ while ( ! sqlRtDataQueue . isEmpty ( ) ) {
255
+ doInsertData = true
256
+ let sql = sqlRtDataQueue . peek ( )
257
+ sqlRtDataQueue . dequeue ( )
258
+ sqlTransaction = sqlTransaction + '\n (' + sql + '),'
259
+ cntR ++
260
+ }
212
261
sqlTransaction = sqlTransaction . substring ( 0 , sqlTransaction . length - 1 ) // remove last comma
213
262
sqlTransaction = sqlTransaction + ' \n'
214
- // this cause problems when tag/time repeated on same transaction
215
- // sqlTransaction = sqlTransaction + "ON CONFLICT (tag, time_tag) DO UPDATE SET value=EXCLUDED.value, value_json=EXCLUDED.value_json, time_tag_at_source=EXCLUDED.time_tag_at_source, flags=EXCLUDED.flags;\n";
216
263
sqlTransaction =
217
- sqlTransaction + 'ON CONFLICT (tag, time_tag) DO NOTHING;\n'
218
- sqlTransaction = sqlTransaction + 'COMMIT;\n'
219
- fs . writeFile (
220
- sqlFilesPath +
221
- 'pg_hist_' +
222
- new Date ( ) . getTime ( ) +
223
- '_' +
224
- jsConfig . Instance +
225
- '.sql' ,
226
- sqlTransaction ,
227
- ( err ) => {
228
- if ( err ) Log . log ( 'Error writing SQL file!' )
229
- }
230
- )
231
- }
232
-
233
- doInsertData = false
234
- sqlTransaction = ''
235
- let cntR = 0
236
- sqlTransaction =
237
- sqlTransaction +
238
- 'WITH ordered_values AS ( SELECT DISTINCT ON (tag) tag, time_tag, json_data FROM (VALUES '
239
- while ( ! sqlRtDataQueue . isEmpty ( ) ) {
240
- doInsertData = true
241
- let sql = sqlRtDataQueue . peek ( )
242
- sqlRtDataQueue . dequeue ( )
243
- sqlTransaction = sqlTransaction + '\n (' + sql + '),'
244
- cntR ++
245
- }
246
- sqlTransaction = sqlTransaction . substring ( 0 , sqlTransaction . length - 1 ) // remove last comma
247
- sqlTransaction = sqlTransaction + ' \n'
248
- sqlTransaction =
249
- sqlTransaction +
250
- `) AS t(tag, time_tag, json_data)
264
+ sqlTransaction +
265
+ `) AS t(tag, time_tag, json_data)
251
266
ORDER BY tag, time_tag DESC
252
267
)
253
268
INSERT INTO realtime_data (tag, time_tag, json_data)
@@ -257,27 +272,32 @@ const pipeline = [
257
272
SET time_tag = EXCLUDED.time_tag,
258
273
json_data = EXCLUDED.json_data;
259
274
`
260
- if ( cntR ) Log . log ( 'PGSQL RT updates ' + cntR )
261
-
262
- if ( doInsertData ) {
263
- fs . writeFile (
264
- sqlFilesPath +
265
- 'pg_rtdata_' +
266
- new Date ( ) . getTime ( ) +
267
- '_' +
268
- jsConfig . Instance +
269
- '.sql' ,
270
- sqlTransaction ,
271
- ( err ) => {
272
- if ( err ) Log . log ( 'Error writing SQL file!' )
273
- }
274
- )
275
+ if ( cntR ) Log . log ( 'PGSQL RT updates ' + cntR )
276
+
277
+ if ( doInsertData ) {
278
+ fs . writeFile (
279
+ sqlFilesPath +
280
+ 'pg_rtdata_' +
281
+ new Date ( ) . getTime ( ) +
282
+ '_' +
283
+ jsConfig . Instance +
284
+ '.sql' ,
285
+ sqlTransaction ,
286
+ ( err ) => {
287
+ if ( err ) Log . log ( 'Error writing SQL file!' )
288
+ }
289
+ )
290
+ }
291
+ } catch ( e ) {
292
+ Log . log ( 'Error in processSqlAndMongoHistUpdates: ' + e )
275
293
}
276
- } , 1000 )
294
+ setTimeout ( processSqlAndMongoHistUpdates , 333 )
295
+ }
296
+ processSqlAndMongoHistUpdates ( )
277
297
278
- let clientMongo = null
279
298
let invalidDetectIntervalHandle = null
280
299
let latencyIntervalHandle = null
300
+ let resumeToken = null
281
301
while ( true ) {
282
302
if ( clientMongo === null )
283
303
await MongoClient . connect (
@@ -286,12 +306,15 @@ const pipeline = [
286
306
)
287
307
. then ( async ( client ) => {
288
308
clientMongo = client
289
- clientMongo . on ( 'topologyClosed' , ( _ ) => {
309
+ clientMongo . on ( 'topologyClosed' , ( ) => {
290
310
MongoStatus . HintMongoIsConnected = false
311
+ clientMongo = null
291
312
Log . log ( 'MongoDB server topologyClosed' )
292
313
} )
293
314
MongoStatus . HintMongoIsConnected = true
294
315
Log . log ( 'Connected correctly to MongoDB server' )
316
+ if ( resumeToken )
317
+ Log . log ( 'resumeToken: ' + JSON . stringify ( resumeToken ) )
295
318
296
319
let latencyAccTotal = 0
297
320
let latencyTotalCnt = 0
@@ -310,6 +333,7 @@ const pipeline = [
310
333
histCollection = db . collection ( jsConfig . HistCollectionName )
311
334
const changeStream = collection . watch ( pipeline , {
312
335
fullDocument : 'updateLookup' ,
336
+ resumeAfter : resumeToken ,
313
337
} )
314
338
315
339
await createSpecialTags ( collection )
@@ -458,6 +482,7 @@ const pipeline = [
458
482
// start listen to changes
459
483
changeStream . on ( 'change' , ( change ) => {
460
484
try {
485
+ resumeToken = changeStream . resumeToken
461
486
if ( change . operationType === 'delete' ) return
462
487
463
488
// // for older versions of mongodb
@@ -1256,7 +1281,7 @@ async function checkConnectedMongo(client) {
1256
1281
if ( ! client ) {
1257
1282
return false
1258
1283
}
1259
- const CheckMongoConnectionTimeout = 1000
1284
+ const CheckMongoConnectionTimeout = 10000
1260
1285
let tr = setTimeout ( ( ) => {
1261
1286
Log . log ( 'Mongo ping timeout error!' )
1262
1287
MongoStatus . HintMongoIsConnected = false
@@ -1274,6 +1299,8 @@ async function checkConnectedMongo(client) {
1274
1299
MongoStatus . HintMongoIsConnected = true
1275
1300
return true
1276
1301
} else {
1302
+ if ( ! ! client && ! ! client . topology && client . topology . isConnected ( ) )
1303
+ return true
1277
1304
MongoStatus . HintMongoIsConnected = false
1278
1305
return false
1279
1306
}
0 commit comments