Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to cs_data_processor and other nodejs processes. #198

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/OPC-UA-Server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const AppDefs = require('./app-defs')
const { LoadConfig, getMongoConnectionOptions } = require('./load-config')
let HintMongoIsConnected = true

process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err)))

;(async () => {
const jsConfig = LoadConfig() // load and parse config file
Log.levelCurrent = jsConfig.LogLevel
Expand Down Expand Up @@ -779,7 +781,7 @@ async function checkConnectedMongo(client) {
if (!client) {
return false
}
const CheckMongoConnectionTimeout = 1000
const CheckMongoConnectionTimeout = 10000
let tr = setTimeout(() => {
Log.log('Mongo ping timeout error!')
HintMongoIsConnected = false
Expand Down
4 changes: 3 additions & 1 deletion src/alarm_beep/alarm_beep.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const { MongoClient } = require('mongodb')
const { setInterval } = require('timers')
const sys = require('child_process')

process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err)))

const WavFilesWin = [
'c:\\windows\\media\\Windows Default.wav',
'c:\\windows\\media\\Windows Background.wav',
Expand Down Expand Up @@ -167,7 +169,7 @@ async function checkConnectedMongo(client) {
if (!client) {
return false
}
const CheckMongoConnectionTimeout = 1000
const CheckMongoConnectionTimeout = 10000
let tr = setTimeout(() => {
Log.log('Mongo ping timeout error!')
HintMongoIsConnected = false
Expand Down
4 changes: 3 additions & 1 deletion src/cs_custom_processor/cs_custom_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const Redundancy = require('./redundancy')
const { MongoClient } = require('mongodb')
const CustomProcessor = require('./customized_module').CustomProcessor

process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err)))

const args = process.argv.slice(2)
let inst = null
if (args.length > 0) inst = parseInt(args[0])
Expand Down Expand Up @@ -81,7 +83,7 @@ async function checkConnectedMongo(client) {
if (!client) {
return false
}
const CheckMongoConnectionTimeout = 1000
const CheckMongoConnectionTimeout = 10000
let tr = setTimeout(() => {
Log.log('Mongo ping timeout error!')
MongoStatus.HintMongoIsConnected = false
Expand Down
195 changes: 111 additions & 84 deletions src/cs_data_processor/cs_data_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const { setInterval } = require('timers')
const MongoStatus = { HintMongoIsConnected: false }
const LowestPriorityThatBeeps = 1 // will beep for priorities zero and one

process.on('uncaughtException', (err) =>
Log.log('Uncaught Exception:' + JSON.stringify(err))
)

const args = process.argv.slice(2)
var inst = null
if (args.length > 0) inst = parseInt(args[0])
Expand Down Expand Up @@ -96,10 +100,11 @@ const pipeline = [
let sqlRtDataQueue = new Queue() // queue of realtime values to insert on postgreSQL
let mongoRtDataQueue = new Queue() // queue of realtime values to insert on MongoDB
let digitalUpdatesCount = 0
let clientMongo = null

// mark as frozen unchanged analog values greater than 1 after timeout
setInterval(async function () {
if (collection && MongoStatus.HintMongoIsConnected) {
if (collection && MongoStatus.HintMongoIsConnected && clientMongo) {
collection
.updateMany(
{
Expand Down Expand Up @@ -133,13 +138,17 @@ const pipeline = [
}
}, 17317)

setInterval(async function () {
// process updates to mongo/realtimeData
async function processRtDataMongoUpdates() {
if (
!collection ||
!clientMongo ||
!MongoStatus.HintMongoIsConnected ||
mongoRtDataQueue.isEmpty()
)
) {
setTimeout(processRtDataMongoUpdates, 150)
return
}
let cnt = 0
let updArr = []
while (!mongoRtDataQueue.isEmpty()) {
Expand Down Expand Up @@ -179,75 +188,81 @@ const pipeline = [
.catch(function (err) {
Log.log('Error on Mongodb query!', err)
})
// Log.log(JSON.stringify(res))
if (cnt) Log.log('Mongo Updates ' + cnt)
}, 150)
setTimeout(processRtDataMongoUpdates, 150)
}
processRtDataMongoUpdates()

// write values to sql files for later insertion on postgreSQL
setInterval(async function () {
if (!histCollection || !MongoStatus.HintMongoIsConnected) return
let doInsertData = false
let sqlTransaction =
'START TRANSACTION;\n' +
'INSERT INTO hist (tag, time_tag, value, value_json, time_tag_at_source, flags) VALUES '

let cntH = 0
let insertArr = []
while (!sqlHistQueue.isEmpty()) {
doInsertData = true
let entry = sqlHistQueue.peek()
sqlHistQueue.dequeue()
sqlTransaction = sqlTransaction + '\n(' + entry.sql + '),'
insertArr.push(entry.obj)
cntH++
// write values to sql files for later insertion on postgreSQL, and mongo hist
async function processSqlAndMongoHistUpdates() {
if (!histCollection || !clientMongo || !MongoStatus.HintMongoIsConnected) {
setTimeout(processSqlAndMongoHistUpdates, 333)
return
}
if (cntH) Log.log('PGSQL/Mongo Hist updates ' + cntH)

if (doInsertData) {
histCollection
.insertMany(insertArr, { ordered: false, writeConcern: { w: 0 } })
.catch(function (err) {
Log.log('Error on Mongodb query!', err)
})
try {
let doInsertData = false
let sqlTransaction =
'START TRANSACTION;\n' +
'INSERT INTO hist (tag, time_tag, value, value_json, time_tag_at_source, flags) VALUES '

let cntH = 0
let insertArr = []
while (!sqlHistQueue.isEmpty()) {
doInsertData = true
let entry = sqlHistQueue.peek()
sqlHistQueue.dequeue()
sqlTransaction = sqlTransaction + '\n(' + entry.sql + '),'
insertArr.push(entry.obj)
cntH++
}
if (cntH) Log.log('PGSQL/Mongo Hist updates ' + cntH)

if (doInsertData) {
histCollection
.insertMany(insertArr, { ordered: false, writeConcern: { w: 0 } })
.catch(function (err) {
Log.log('Error on Mongodb query!', err)
})
sqlTransaction = sqlTransaction.substring(0, sqlTransaction.length - 1) // remove last comma
sqlTransaction = sqlTransaction + ' \n'
// this cause problems when tag/time repeated on same transaction
// sqlTransaction = sqlTransaction + "ON CONFLICT (tag, time_tag) DO UPDATE SET value=EXCLUDED.value, value_json=EXCLUDED.value_json, time_tag_at_source=EXCLUDED.time_tag_at_source, flags=EXCLUDED.flags;\n";
sqlTransaction =
sqlTransaction + 'ON CONFLICT (tag, time_tag) DO NOTHING;\n'
sqlTransaction = sqlTransaction + 'COMMIT;\n'
fs.writeFile(
sqlFilesPath +
'pg_hist_' +
new Date().getTime() +
'_' +
jsConfig.Instance +
'.sql',
sqlTransaction,
(err) => {
if (err) Log.log('Error writing SQL file!')
}
)
}

doInsertData = false
sqlTransaction = ''
let cntR = 0
sqlTransaction =
sqlTransaction +
'WITH ordered_values AS ( SELECT DISTINCT ON (tag) tag, time_tag, json_data FROM (VALUES '
while (!sqlRtDataQueue.isEmpty()) {
doInsertData = true
let sql = sqlRtDataQueue.peek()
sqlRtDataQueue.dequeue()
sqlTransaction = sqlTransaction + '\n (' + sql + '),'
cntR++
}
sqlTransaction = sqlTransaction.substring(0, sqlTransaction.length - 1) // remove last comma
sqlTransaction = sqlTransaction + ' \n'
// this cause problems when tag/time repeated on same transaction
// sqlTransaction = sqlTransaction + "ON CONFLICT (tag, time_tag) DO UPDATE SET value=EXCLUDED.value, value_json=EXCLUDED.value_json, time_tag_at_source=EXCLUDED.time_tag_at_source, flags=EXCLUDED.flags;\n";
sqlTransaction =
sqlTransaction + 'ON CONFLICT (tag, time_tag) DO NOTHING;\n'
sqlTransaction = sqlTransaction + 'COMMIT;\n'
fs.writeFile(
sqlFilesPath +
'pg_hist_' +
new Date().getTime() +
'_' +
jsConfig.Instance +
'.sql',
sqlTransaction,
(err) => {
if (err) Log.log('Error writing SQL file!')
}
)
}

doInsertData = false
sqlTransaction = ''
let cntR = 0
sqlTransaction =
sqlTransaction +
'WITH ordered_values AS ( SELECT DISTINCT ON (tag) tag, time_tag, json_data FROM (VALUES '
while (!sqlRtDataQueue.isEmpty()) {
doInsertData = true
let sql = sqlRtDataQueue.peek()
sqlRtDataQueue.dequeue()
sqlTransaction = sqlTransaction + '\n (' + sql + '),'
cntR++
}
sqlTransaction = sqlTransaction.substring(0, sqlTransaction.length - 1) // remove last comma
sqlTransaction = sqlTransaction + ' \n'
sqlTransaction =
sqlTransaction +
`) AS t(tag, time_tag, json_data)
sqlTransaction +
`) AS t(tag, time_tag, json_data)
ORDER BY tag, time_tag DESC
)
INSERT INTO realtime_data (tag, time_tag, json_data)
Expand All @@ -257,27 +272,32 @@ const pipeline = [
SET time_tag = EXCLUDED.time_tag,
json_data = EXCLUDED.json_data;
`
if (cntR) Log.log('PGSQL RT updates ' + cntR)

if (doInsertData) {
fs.writeFile(
sqlFilesPath +
'pg_rtdata_' +
new Date().getTime() +
'_' +
jsConfig.Instance +
'.sql',
sqlTransaction,
(err) => {
if (err) Log.log('Error writing SQL file!')
}
)
if (cntR) Log.log('PGSQL RT updates ' + cntR)

if (doInsertData) {
fs.writeFile(
sqlFilesPath +
'pg_rtdata_' +
new Date().getTime() +
'_' +
jsConfig.Instance +
'.sql',
sqlTransaction,
(err) => {
if (err) Log.log('Error writing SQL file!')
}
)
}
} catch (e) {
Log.log('Error in processSqlAndMongoHistUpdates: ' + e)
}
}, 1000)
setTimeout(processSqlAndMongoHistUpdates, 333)
}
processSqlAndMongoHistUpdates()

let clientMongo = null
let invalidDetectIntervalHandle = null
let latencyIntervalHandle = null
let resumeToken = null
while (true) {
if (clientMongo === null)
await MongoClient.connect(
Expand All @@ -286,12 +306,15 @@ const pipeline = [
)
.then(async (client) => {
clientMongo = client
clientMongo.on('topologyClosed', (_) => {
clientMongo.on('topologyClosed', () => {
MongoStatus.HintMongoIsConnected = false
clientMongo = null
Log.log('MongoDB server topologyClosed')
})
MongoStatus.HintMongoIsConnected = true
Log.log('Connected correctly to MongoDB server')
if (resumeToken)
Log.log('resumeToken: ' + JSON.stringify(resumeToken))

let latencyAccTotal = 0
let latencyTotalCnt = 0
Expand All @@ -310,6 +333,7 @@ const pipeline = [
histCollection = db.collection(jsConfig.HistCollectionName)
const changeStream = collection.watch(pipeline, {
fullDocument: 'updateLookup',
resumeAfter: resumeToken,
})

await createSpecialTags(collection)
Expand Down Expand Up @@ -458,6 +482,7 @@ const pipeline = [
// start listen to changes
changeStream.on('change', (change) => {
try {
resumeToken = changeStream.resumeToken
if (change.operationType === 'delete') return

// // for older versions of mongodb
Expand Down Expand Up @@ -1256,7 +1281,7 @@ async function checkConnectedMongo(client) {
if (!client) {
return false
}
const CheckMongoConnectionTimeout = 1000
const CheckMongoConnectionTimeout = 10000
let tr = setTimeout(() => {
Log.log('Mongo ping timeout error!')
MongoStatus.HintMongoIsConnected = false
Expand All @@ -1274,6 +1299,8 @@ async function checkConnectedMongo(client) {
MongoStatus.HintMongoIsConnected = true
return true
} else {
if (!!client && !!client.topology && client.topology.isConnected())
return true
MongoStatus.HintMongoIsConnected = false
return false
}
Expand Down
4 changes: 3 additions & 1 deletion src/demo_simul/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const AppDefs = require('./app-defs')
const LoadConfig = require('./load-config')
const { MongoClient, Double } = require('mongodb')

process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err)))

const pipeline = [
{
$project: { documentKey: false },
Expand Down Expand Up @@ -537,7 +539,7 @@ async function checkConnectedMongo(client) {
if (!client) {
return false
}
const CheckMongoConnectionTimeout = 1000
const CheckMongoConnectionTimeout = 10000
const tr = setTimeout(() => {
Log.log('Mongo ping timeout error!')
HintMongoIsConnected = false
Expand Down
4 changes: 3 additions & 1 deletion src/grafana_alert2event/grafana_alert2event.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const app = express()
app.use(express.json())
let soeQueue = new Queue() // queue of SOE events

process.on('uncaughtException', err => console.log('Uncaught Exception:' + JSON.stringify(err)))

app.listen(HTTP_PORT, IP_BIND, () => {
console.log('listening on ' + IP_BIND + ':' + HTTP_PORT)
})
Expand Down Expand Up @@ -307,7 +309,7 @@ if (
})()

// test mongoDB connectivity
let CheckMongoConnectionTimeout = 1000
let CheckMongoConnectionTimeout = 10000
let HintMongoIsConnected = true
async function checkConnectedMongo(client) {
if (!client) {
Expand Down
4 changes: 3 additions & 1 deletion src/mongofw/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const Redundancy = require('./redundancy')
const { MongoClient } = require('mongodb')
const CustomProcessor = require('./customized_module').CustomProcessor

process.on('uncaughtException', err => Log.log('Uncaught Exception:' + JSON.stringify(err)))

const args = process.argv.slice(2)
let inst = null
if (args.length > 0) inst = parseInt(args[0])
Expand Down Expand Up @@ -80,7 +82,7 @@ async function checkConnectedMongo(client) {
if (!client) {
return false
}
const CheckMongoConnectionTimeout = 2000
const CheckMongoConnectionTimeout = 10000
let tr = setTimeout(() => {
Log.log('Mongo ping timeout error!')
MongoStatus.HintMongoIsConnected = false
Expand Down
Loading
Loading