From 082ec423fa9803719270ec0e2c7ed371941b2b40 Mon Sep 17 00:00:00 2001 From: Ricardo Olsen Date: Mon, 13 Jan 2025 16:03:07 -0300 Subject: [PATCH 1/3] Increase timeout to detect inactive mongodb connection on nodejs projects to avoid unnecessary disconections. --- src/OPC-UA-Server/index.js | 2 +- src/alarm_beep/alarm_beep.js | 2 +- src/cs_custom_processor/cs_custom_processor.js | 2 +- src/cs_data_processor/cs_data_processor.js | 2 +- src/demo_simul/index.js | 2 +- src/grafana_alert2event/grafana_alert2event.js | 2 +- src/mongofw/index.js | 2 +- src/mongowr/index.js | 2 +- src/mqtt-sparkplug/index.js | 2 +- src/server_realtime_auth/index.js | 2 +- src/telegraf-listener/index.js | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/OPC-UA-Server/index.js b/src/OPC-UA-Server/index.js index dae964d4..15ed0079 100644 --- a/src/OPC-UA-Server/index.js +++ b/src/OPC-UA-Server/index.js @@ -779,7 +779,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') HintMongoIsConnected = false diff --git a/src/alarm_beep/alarm_beep.js b/src/alarm_beep/alarm_beep.js index 69ec7b69..001304da 100644 --- a/src/alarm_beep/alarm_beep.js +++ b/src/alarm_beep/alarm_beep.js @@ -167,7 +167,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') HintMongoIsConnected = false diff --git a/src/cs_custom_processor/cs_custom_processor.js b/src/cs_custom_processor/cs_custom_processor.js index 83543d1d..b70e872e 100644 --- a/src/cs_custom_processor/cs_custom_processor.js +++ b/src/cs_custom_processor/cs_custom_processor.js @@ -81,7 +81,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') MongoStatus.HintMongoIsConnected = false diff --git a/src/cs_data_processor/cs_data_processor.js b/src/cs_data_processor/cs_data_processor.js index 9df2742d..20d03363 100644 --- a/src/cs_data_processor/cs_data_processor.js +++ b/src/cs_data_processor/cs_data_processor.js @@ -1256,7 +1256,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') MongoStatus.HintMongoIsConnected = false diff --git a/src/demo_simul/index.js b/src/demo_simul/index.js index 91e565f5..61dca947 100644 --- a/src/demo_simul/index.js +++ b/src/demo_simul/index.js @@ -537,7 +537,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 const tr = setTimeout(() => { Log.log('Mongo ping timeout error!') HintMongoIsConnected = false diff --git a/src/grafana_alert2event/grafana_alert2event.js b/src/grafana_alert2event/grafana_alert2event.js index 7816abcd..149b4f12 100644 --- a/src/grafana_alert2event/grafana_alert2event.js +++ b/src/grafana_alert2event/grafana_alert2event.js @@ -307,7 +307,7 @@ if ( })() // test mongoDB connectivity -let CheckMongoConnectionTimeout = 1000 +let CheckMongoConnectionTimeout = 10000 let HintMongoIsConnected = true async function checkConnectedMongo(client) { if (!client) { diff --git a/src/mongofw/index.js b/src/mongofw/index.js index daa8bd22..9f1bfc6c 100644 --- a/src/mongofw/index.js +++ b/src/mongofw/index.js @@ -80,7 +80,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 2000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') MongoStatus.HintMongoIsConnected = false diff --git a/src/mongowr/index.js b/src/mongowr/index.js index daa8bd22..9f1bfc6c 100644 --- a/src/mongowr/index.js +++ b/src/mongowr/index.js @@ -80,7 +80,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 2000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') MongoStatus.HintMongoIsConnected = false diff --git a/src/mqtt-sparkplug/index.js b/src/mqtt-sparkplug/index.js index e6751c26..77cf4686 100644 --- a/src/mqtt-sparkplug/index.js +++ b/src/mqtt-sparkplug/index.js @@ -2421,7 +2421,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') MongoStatus.HintMongoIsConnected = false diff --git a/src/server_realtime_auth/index.js b/src/server_realtime_auth/index.js index 88d1b1ae..091e1253 100644 --- a/src/server_realtime_auth/index.js +++ b/src/server_realtime_auth/index.js @@ -2374,7 +2374,7 @@ async function checkConnectedMongo(client) { if (!client) { return false } - const CheckMongoConnectionTimeout = 1000 + const CheckMongoConnectionTimeout = 10000 let tr = setTimeout(() => { Log.log('Mongo ping timeout error!') HintMongoIsConnected = false diff --git a/src/telegraf-listener/index.js b/src/telegraf-listener/index.js index 3071c328..715551d6 100644 --- a/src/telegraf-listener/index.js +++ b/src/telegraf-listener/index.js @@ -678,7 +678,7 @@ if ( })() // test mongoDB connectivity -let CheckMongoConnectionTimeout = 1000 +let CheckMongoConnectionTimeout = 10000 let HintMongoIsConnected = true async function checkConnectedMongo(client) { if (!client) { From 52705317b0c7fddfdac29a53645ee6f11c4eadcd Mon Sep 17 00:00:00 2001 From: Ricardo Olsen Date: Tue, 14 Jan 2025 13:10:22 -0300 Subject: [PATCH 2/3] cs_data_processor: avoid uncaught exceptions closing the proceess; make it more responsive; avoid becomming inactive when mongo connection is slow; use resumeToken when reopening change stream. --- src/cs_data_processor/cs_data_processor.js | 193 ++++++++++++--------- 1 file changed, 110 insertions(+), 83 deletions(-) diff --git a/src/cs_data_processor/cs_data_processor.js b/src/cs_data_processor/cs_data_processor.js index 20d03363..599d3eb5 100644 --- a/src/cs_data_processor/cs_data_processor.js +++ b/src/cs_data_processor/cs_data_processor.js @@ -32,6 +32,10 @@ const { setInterval } = require('timers') const MongoStatus = { HintMongoIsConnected: false } const LowestPriorityThatBeeps = 1 // will beep for priorities zero and one +process.on('uncaughtException', (err) => + Log.log('Uncaught Exception:' + JSON.stringify(err)) +) + const args = process.argv.slice(2) var inst = null if (args.length > 0) inst = parseInt(args[0]) @@ -96,10 +100,11 @@ const pipeline = [ let sqlRtDataQueue = new Queue() // queue of realtime values to insert on postgreSQL let mongoRtDataQueue = new Queue() // queue of realtime values to insert on MongoDB let digitalUpdatesCount = 0 + let clientMongo = null // mark as frozen unchanged analog values greater than 1 after timeout setInterval(async function () { - if (collection && MongoStatus.HintMongoIsConnected) { + if (collection && MongoStatus.HintMongoIsConnected && clientMongo) { collection .updateMany( { @@ -133,13 +138,17 @@ const pipeline = [ } }, 17317) - setInterval(async function () { + // process updates to mongo/realtimeData + async function processRtDataMongoUpdates() { if ( !collection || + !clientMongo || !MongoStatus.HintMongoIsConnected || mongoRtDataQueue.isEmpty() - ) + ) { + setTimeout(processRtDataMongoUpdates, 150) return + } let cnt = 0 let updArr = [] while (!mongoRtDataQueue.isEmpty()) { @@ -179,75 +188,81 @@ const pipeline = [ .catch(function (err) { Log.log('Error on Mongodb query!', err) }) - // Log.log(JSON.stringify(res)) if (cnt) Log.log('Mongo Updates ' + cnt) - }, 150) + setTimeout(processRtDataMongoUpdates, 150) + } + processRtDataMongoUpdates() - // write values to sql files for later insertion on postgreSQL - setInterval(async function () { - if (!histCollection || !MongoStatus.HintMongoIsConnected) return - let doInsertData = false - let sqlTransaction = - 'START TRANSACTION;\n' + - 'INSERT INTO hist (tag, time_tag, value, value_json, time_tag_at_source, flags) VALUES ' - - let cntH = 0 - let insertArr = [] - while (!sqlHistQueue.isEmpty()) { - doInsertData = true - let entry = sqlHistQueue.peek() - sqlHistQueue.dequeue() - sqlTransaction = sqlTransaction + '\n(' + entry.sql + '),' - insertArr.push(entry.obj) - cntH++ + // write values to sql files for later insertion on postgreSQL, and mongo hist + async function processSqlAndMongoHistUpdates() { + if (!histCollection || !clientMongo || !MongoStatus.HintMongoIsConnected) { + setTimeout(processSqlAndMongoHistUpdates, 333) + return } - if (cntH) Log.log('PGSQL/Mongo Hist updates ' + cntH) - if (doInsertData) { - histCollection - .insertMany(insertArr, { ordered: false, writeConcern: { w: 0 } }) - .catch(function (err) { - Log.log('Error on Mongodb query!', err) - }) + try { + let doInsertData = false + let sqlTransaction = + 'START TRANSACTION;\n' + + 'INSERT INTO hist (tag, time_tag, value, value_json, time_tag_at_source, flags) VALUES ' + + let cntH = 0 + let insertArr = [] + while (!sqlHistQueue.isEmpty()) { + doInsertData = true + let entry = sqlHistQueue.peek() + sqlHistQueue.dequeue() + sqlTransaction = sqlTransaction + '\n(' + entry.sql + '),' + insertArr.push(entry.obj) + cntH++ + } + if (cntH) Log.log('PGSQL/Mongo Hist updates ' + cntH) + + if (doInsertData) { + histCollection + .insertMany(insertArr, { ordered: false, writeConcern: { w: 0 } }) + .catch(function (err) { + Log.log('Error on Mongodb query!', err) + }) + sqlTransaction = sqlTransaction.substring(0, sqlTransaction.length - 1) // remove last comma + sqlTransaction = sqlTransaction + ' \n' + // this cause problems when tag/time repeated on same transaction + // sqlTransaction = sqlTransaction + "ON CONFLICT (tag, time_tag) DO UPDATE SET value=EXCLUDED.value, value_json=EXCLUDED.value_json, time_tag_at_source=EXCLUDED.time_tag_at_source, flags=EXCLUDED.flags;\n"; + sqlTransaction = + sqlTransaction + 'ON CONFLICT (tag, time_tag) DO NOTHING;\n' + sqlTransaction = sqlTransaction + 'COMMIT;\n' + fs.writeFile( + sqlFilesPath + + 'pg_hist_' + + new Date().getTime() + + '_' + + jsConfig.Instance + + '.sql', + sqlTransaction, + (err) => { + if (err) Log.log('Error writing SQL file!') + } + ) + } + + doInsertData = false + sqlTransaction = '' + let cntR = 0 + sqlTransaction = + sqlTransaction + + 'WITH ordered_values AS ( SELECT DISTINCT ON (tag) tag, time_tag, json_data FROM (VALUES ' + while (!sqlRtDataQueue.isEmpty()) { + doInsertData = true + let sql = sqlRtDataQueue.peek() + sqlRtDataQueue.dequeue() + sqlTransaction = sqlTransaction + '\n (' + sql + '),' + cntR++ + } sqlTransaction = sqlTransaction.substring(0, sqlTransaction.length - 1) // remove last comma sqlTransaction = sqlTransaction + ' \n' - // this cause problems when tag/time repeated on same transaction - // sqlTransaction = sqlTransaction + "ON CONFLICT (tag, time_tag) DO UPDATE SET value=EXCLUDED.value, value_json=EXCLUDED.value_json, time_tag_at_source=EXCLUDED.time_tag_at_source, flags=EXCLUDED.flags;\n"; sqlTransaction = - sqlTransaction + 'ON CONFLICT (tag, time_tag) DO NOTHING;\n' - sqlTransaction = sqlTransaction + 'COMMIT;\n' - fs.writeFile( - sqlFilesPath + - 'pg_hist_' + - new Date().getTime() + - '_' + - jsConfig.Instance + - '.sql', - sqlTransaction, - (err) => { - if (err) Log.log('Error writing SQL file!') - } - ) - } - - doInsertData = false - sqlTransaction = '' - let cntR = 0 - sqlTransaction = - sqlTransaction + - 'WITH ordered_values AS ( SELECT DISTINCT ON (tag) tag, time_tag, json_data FROM (VALUES ' - while (!sqlRtDataQueue.isEmpty()) { - doInsertData = true - let sql = sqlRtDataQueue.peek() - sqlRtDataQueue.dequeue() - sqlTransaction = sqlTransaction + '\n (' + sql + '),' - cntR++ - } - sqlTransaction = sqlTransaction.substring(0, sqlTransaction.length - 1) // remove last comma - sqlTransaction = sqlTransaction + ' \n' - sqlTransaction = - sqlTransaction + - `) AS t(tag, time_tag, json_data) + sqlTransaction + + `) AS t(tag, time_tag, json_data) ORDER BY tag, time_tag DESC ) INSERT INTO realtime_data (tag, time_tag, json_data) @@ -257,27 +272,32 @@ const pipeline = [ SET time_tag = EXCLUDED.time_tag, json_data = EXCLUDED.json_data; ` - if (cntR) Log.log('PGSQL RT updates ' + cntR) - - if (doInsertData) { - fs.writeFile( - sqlFilesPath + - 'pg_rtdata_' + - new Date().getTime() + - '_' + - jsConfig.Instance + - '.sql', - sqlTransaction, - (err) => { - if (err) Log.log('Error writing SQL file!') - } - ) + if (cntR) Log.log('PGSQL RT updates ' + cntR) + + if (doInsertData) { + fs.writeFile( + sqlFilesPath + + 'pg_rtdata_' + + new Date().getTime() + + '_' + + jsConfig.Instance + + '.sql', + sqlTransaction, + (err) => { + if (err) Log.log('Error writing SQL file!') + } + ) + } + } catch (e) { + Log.log('Error in processSqlAndMongoHistUpdates: ' + e) } - }, 1000) + setTimeout(processSqlAndMongoHistUpdates, 333) + } + processSqlAndMongoHistUpdates() - let clientMongo = null let invalidDetectIntervalHandle = null let latencyIntervalHandle = null + let resumeToken = null while (true) { if (clientMongo === null) await MongoClient.connect( @@ -286,12 +306,15 @@ const pipeline = [ ) .then(async (client) => { clientMongo = client - clientMongo.on('topologyClosed', (_) => { + clientMongo.on('topologyClosed', () => { MongoStatus.HintMongoIsConnected = false + clientMongo = null Log.log('MongoDB server topologyClosed') }) MongoStatus.HintMongoIsConnected = true Log.log('Connected correctly to MongoDB server') + if (resumeToken) + Log.log('resumeToken: ' + JSON.stringify(resumeToken)) let latencyAccTotal = 0 let latencyTotalCnt = 0 @@ -310,6 +333,7 @@ const pipeline = [ histCollection = db.collection(jsConfig.HistCollectionName) const changeStream = collection.watch(pipeline, { fullDocument: 'updateLookup', + resumeAfter: resumeToken, }) await createSpecialTags(collection) @@ -458,6 +482,7 @@ const pipeline = [ // start listen to changes changeStream.on('change', (change) => { try { + resumeToken = changeStream.resumeToken if (change.operationType === 'delete') return // // for older versions of mongodb @@ -1274,6 +1299,8 @@ async function checkConnectedMongo(client) { MongoStatus.HintMongoIsConnected = true return true } else { + if (!!client && !!client.topology && client.topology.isConnected()) + return true MongoStatus.HintMongoIsConnected = false return false } From c6377f59f6e5c39a8fd3163dea4441426fe26358 Mon Sep 17 00:00:00 2001 From: Ricardo Olsen Date: Tue, 14 Jan 2025 13:11:56 -0300 Subject: [PATCH 3/3] Avoid uncaught exceptions closing the nodejs processes. --- src/OPC-UA-Server/index.js | 2 ++ src/alarm_beep/alarm_beep.js | 2 ++ src/cs_custom_processor/cs_custom_processor.js | 2 ++ src/demo_simul/index.js | 2 ++ src/grafana_alert2event/grafana_alert2event.js | 2 ++ src/mongofw/index.js | 2 ++ src/mongowr/index.js | 2 ++ src/mqtt-sparkplug/index.js | 2 ++ src/server_realtime_auth/index.js | 2 ++ src/telegraf-listener/index.js | 2 ++ 10 files changed, 20 insertions(+) diff --git a/src/OPC-UA-Server/index.js b/src/OPC-UA-Server/index.js index 15ed0079..bf09d8cc 100644 --- a/src/OPC-UA-Server/index.js +++ b/src/OPC-UA-Server/index.js @@ -32,6 +32,8 @@ const AppDefs = require('./app-defs') const { LoadConfig, getMongoConnectionOptions } = require('./load-config') let HintMongoIsConnected = true +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + ;(async () => { const jsConfig = LoadConfig() // load and parse config file Log.levelCurrent = jsConfig.LogLevel diff --git a/src/alarm_beep/alarm_beep.js b/src/alarm_beep/alarm_beep.js index 001304da..d9703c73 100644 --- a/src/alarm_beep/alarm_beep.js +++ b/src/alarm_beep/alarm_beep.js @@ -27,6 +27,8 @@ const { MongoClient } = require('mongodb') const { setInterval } = require('timers') const sys = require('child_process') +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + const WavFilesWin = [ 'c:\\windows\\media\\Windows Default.wav', 'c:\\windows\\media\\Windows Background.wav', diff --git a/src/cs_custom_processor/cs_custom_processor.js b/src/cs_custom_processor/cs_custom_processor.js index b70e872e..6e4dee60 100644 --- a/src/cs_custom_processor/cs_custom_processor.js +++ b/src/cs_custom_processor/cs_custom_processor.js @@ -25,6 +25,8 @@ const Redundancy = require('./redundancy') const { MongoClient } = require('mongodb') const CustomProcessor = require('./customized_module').CustomProcessor +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + const args = process.argv.slice(2) let inst = null if (args.length > 0) inst = parseInt(args[0]) diff --git a/src/demo_simul/index.js b/src/demo_simul/index.js index 61dca947..cdf05479 100644 --- a/src/demo_simul/index.js +++ b/src/demo_simul/index.js @@ -24,6 +24,8 @@ const AppDefs = require('./app-defs') const LoadConfig = require('./load-config') const { MongoClient, Double } = require('mongodb') +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + const pipeline = [ { $project: { documentKey: false }, diff --git a/src/grafana_alert2event/grafana_alert2event.js b/src/grafana_alert2event/grafana_alert2event.js index 149b4f12..7697b29b 100644 --- a/src/grafana_alert2event/grafana_alert2event.js +++ b/src/grafana_alert2event/grafana_alert2event.js @@ -46,6 +46,8 @@ const app = express() app.use(express.json()) let soeQueue = new Queue() // queue of SOE events +process.on('uncaughtException', err => console.log('Uncaught Exception:' + JSON.stringify(err))) + app.listen(HTTP_PORT, IP_BIND, () => { console.log('listening on ' + IP_BIND + ':' + HTTP_PORT) }) diff --git a/src/mongofw/index.js b/src/mongofw/index.js index 9f1bfc6c..75c06d90 100644 --- a/src/mongofw/index.js +++ b/src/mongofw/index.js @@ -24,6 +24,8 @@ const Redundancy = require('./redundancy') const { MongoClient } = require('mongodb') const CustomProcessor = require('./customized_module').CustomProcessor +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + const args = process.argv.slice(2) let inst = null if (args.length > 0) inst = parseInt(args[0]) diff --git a/src/mongowr/index.js b/src/mongowr/index.js index 9f1bfc6c..75c06d90 100644 --- a/src/mongowr/index.js +++ b/src/mongowr/index.js @@ -24,6 +24,8 @@ const Redundancy = require('./redundancy') const { MongoClient } = require('mongodb') const CustomProcessor = require('./customized_module').CustomProcessor +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + const args = process.argv.slice(2) let inst = null if (args.length > 0) inst = parseInt(args[0]) diff --git a/src/mqtt-sparkplug/index.js b/src/mqtt-sparkplug/index.js index 77cf4686..8f709ac5 100644 --- a/src/mqtt-sparkplug/index.js +++ b/src/mqtt-sparkplug/index.js @@ -36,6 +36,8 @@ const AutoTag = require('./auto-tag') const { castSparkplugValue: castSparkplugValue } = require('./cast') const autoTag = require('./auto-tag') +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + const SparkplugNS = 'spBv1.0' const DevicesList = [] // contains either EoN nodes or devices const ValuesQueue = new Queue() // queue of values to update acquisition diff --git a/src/server_realtime_auth/index.js b/src/server_realtime_auth/index.js index 091e1253..eab9cc1d 100644 --- a/src/server_realtime_auth/index.js +++ b/src/server_realtime_auth/index.js @@ -58,6 +58,8 @@ const dbAuth = require('./app/models') const { authJwt } = require('./app/middlewares') const { canSendCommands } = require('./app/middlewares/authJwt.js') +process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err))) + // Argument NOAUTH disables user authentication var args = process.argv.slice(2) if (args.length > 0) if (args[0] === 'NOAUTH') AUTHENTICATION = false diff --git a/src/telegraf-listener/index.js b/src/telegraf-listener/index.js index 715551d6..116ceec6 100644 --- a/src/telegraf-listener/index.js +++ b/src/telegraf-listener/index.js @@ -41,6 +41,8 @@ const serverUdpSocket = dgram.createSocket({ type: 'udp4' }) let bindCount = 0 const grpSep = '~' +process.on('uncaughtException', err => console.log('Uncaught Exception:' + JSON.stringify(err))) + let ListCreatedTags = [] let ValuesQueue = new Queue() // queue of values to update