Skip to content

Commit dc6de90

Browse files
committed
Added mongofw and mongowr processes.
1 parent c518bdc commit dc6de90

21 files changed

+1554
-0
lines changed

platform-linux/build.sh

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ cd ../carbone-reports
9090
npm install
9191
cd ../backup-mongo
9292
npm install
93+
cd ../mongofw
94+
npm install
95+
cd ../mongowr
96+
npm install
9397

9498
cd ../log-io/ui
9599
npm install

platform-mac/build.sh

+4
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ cd ../carbone-reports
8282
npm install
8383
cd ../backup-mongo
8484
npm install
85+
cd ../mongofw
86+
npm install
87+
cd ../mongowr
88+
npm install
8589
cd ../log-io/ui
8690
npm install
8791
npm run build

platform-windows/build.bat

+6
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ call %NPM% install
135135
cd %SRCPATH%\backup-mongo
136136
call %NPM% install
137137

138+
cd %SRCPATH\mongofw
139+
call %NPM% install
140+
141+
cd %SRCPATH\mongowr
142+
call %NPM% install
143+
138144
cd %SRCPATH%\log-io\ui
139145
call %NPM% install
140146
call %NPM% run build

platform-windows/buildupd.bat

+6
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ call %NPM% update
135135
cd %SRCPATH%\backup-mongo
136136
call %NPM% i --package-lock-only
137137
call %NPM% update
138+
cd %SRCPATH%\mongofw
139+
call %NPM% i --package-lock-only
140+
call %NPM% update
141+
cd %SRCPATH%\mongowr
142+
call %NPM% i --package-lock-only
143+
call %NPM% update
138144
cd %SRCPATH%\log-io\ui
139145
call %NPM% i --package-lock-only
140146
call %NPM% update

src/mongofw/app-defs.js

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict'
2+
3+
/*
4+
* {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen
5+
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada).
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, version 3.
10+
*
11+
* This program is distributed in the hope that it will be useful, but
12+
* WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
* General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU General Public License
17+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
*/
19+
20+
module.exports = {
21+
NAME: 'MONGOFW',
22+
ENV_PREFIX: 'JS_MONGOFW',
23+
MSG: '{json:scada} - Mongofw - forward data from protocol drivers to another JSON-SCADA installation.',
24+
VERSION: '0.1.0',
25+
}

src/mongofw/customized_module.js

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
'use strict'
2+
3+
/*
4+
* Customizable processor of mongodb changes via change streams.
5+
*
6+
* THIS FILE IS INTENDED TO BE CUSTOMIZED BY USERS TO DO SPECIAL PROCESSING
7+
*
8+
* {json:scada} - Copyright (c) 2020-2024 - Ricardo L. Olsen
9+
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada).
10+
*
11+
* This program is free software: you can redistribute it and/or modify
12+
* it under the terms of the GNU General Public License as published by
13+
* the Free Software Foundation, version 3.
14+
*
15+
* This program is distributed in the hope that it will be useful, but
16+
* WITHOUT ANY WARRANTY; without even the implied warranty of
17+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18+
* General Public License for more details.
19+
*
20+
* You should have received a copy of the GNU General Public License
21+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
22+
*/
23+
24+
const Log = require('./simple-logger')
25+
const { Double } = require('mongodb')
26+
const { setInterval } = require('timers')
27+
const dgram = require('dgram');
28+
29+
// UDP broadcast options
30+
const udpPort = 12345;
31+
const udpHostDst = "192.168.0.255";
32+
// Create a UDP socket
33+
const udpSocket = dgram.createSocket('udp4');
34+
35+
udpSocket.bind(udpPort, () => {
36+
udpSocket.setBroadcast(true)
37+
// udpSocket.setMulticastInterface('::%eth1');
38+
});
39+
40+
41+
let maxSz = 0;
42+
let cnt = 0;
43+
44+
const UserActionsCollectionName = 'userActions'
45+
const RealtimeDataCollectionName = 'realtimeData'
46+
const CommandsQueueCollectionName = 'commandsQueue'
47+
const SoeDataCollectionName = 'soeData'
48+
const ProcessInstancesCollectionName = 'processInstances'
49+
const ProtocolDriverInstancesCollectionName = 'protocolDriverInstances'
50+
const ProtocolConnectionsCollectionName = 'protocolConnections'
51+
52+
let CyclicIntervalHandle = null
53+
54+
// this will be called by the main module when mongo is connected (or reconnected)
55+
module.exports.CustomProcessor = function (
56+
clientMongo,
57+
jsConfig,
58+
Redundancy,
59+
MongoStatus
60+
) {
61+
if (clientMongo === null) return
62+
const db = clientMongo.db(jsConfig.mongoDatabaseName)
63+
64+
// -------------------------------------------------------------------------------------------
65+
// EXAMPLE OF CYCLIC PROCESSING AT INTERVALS
66+
// BEGIN EXAMPLE
67+
68+
let CyclicProcess = async function () {
69+
// do cyclic processing at each CyclicInterval ms
70+
71+
if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected)
72+
return // do nothing if process is inactive
73+
74+
try {
75+
let res = await db
76+
.collection(RealtimeDataCollectionName)
77+
.findOne({ _id: -2 }) // id of point tag with number of digital updates
78+
79+
Log.log(
80+
'Custom Process - Checking number of digital updates: ' +
81+
res.valueString
82+
)
83+
} catch (err) {
84+
Log.log(err)
85+
}
86+
87+
return
88+
}
89+
const CyclicInterval = 5000 // interval time in ms
90+
clearInterval(CyclicIntervalHandle) // clear older instances if any
91+
CyclicIntervalHandle = setInterval(CyclicProcess, CyclicInterval) // start a cyclic processing
92+
93+
// EXAMPLE OF CYCLIC PROCESSING AT INTERVALS
94+
// END EXAMPLE
95+
// -------------------------------------------------------------------------------------------
96+
97+
const changeStreamUserActions = db
98+
.collection(RealtimeDataCollectionName)
99+
.watch(
100+
{ $match: { operationType: 'update' } },
101+
{
102+
fullDocument: 'updateLookup'
103+
}
104+
)
105+
106+
try {
107+
changeStreamUserActions.on('error', change => {
108+
if (clientMongo) clientMongo.close()
109+
clientMongo = null
110+
Log.log('Custom Process - Error on changeStreamUserActions!')
111+
})
112+
changeStreamUserActions.on('close', change => {
113+
Log.log('Custom Process - Closed changeStreamUserActions!')
114+
})
115+
changeStreamUserActions.on('end', change => {
116+
if (clientMongo) clientMongo.close()
117+
clientMongo = null
118+
Log.log('Custom Process - Ended changeStreamUserActions!')
119+
})
120+
121+
// start listen to changes
122+
changeStreamUserActions.on('change', change => {
123+
// Log.log(change.fullDocument)
124+
if (!Redundancy.ProcessStateIsActive() || !MongoStatus.HintMongoIsConnected)
125+
return // do nothing if process is inactive
126+
127+
// will send only update data from drivers
128+
if (!change.updateDescription.updatedFields?.sourceDataUpdate)
129+
return;
130+
if (change.updateDescription.updatedFields?.sourceDataUpdate?.valueBsonAtSource)
131+
delete change.updateDescription.updatedFields.sourceDataUpdate.valueBsonAtSource;
132+
if (!Redundancy.ProcessStateIsActive()) return // do nothing if process is inactive
133+
const fwObj = {
134+
cnt: cnt++,
135+
operationType: change.operationType,
136+
documentKey: change.documentKey,
137+
updateDescription: change.updateDescription
138+
}
139+
const opData = JSON.stringify(fwObj);
140+
const message = Buffer.from(opData);
141+
if (message.length > maxSz) maxSz = message.length;
142+
143+
if (message.length > 60000) {
144+
console.log('Message too large: ', opData);
145+
}
146+
else
147+
udpSocket.send(message, 0, message.length, udpPort, udpHostDst, (err, bytes) => {
148+
if (err) {
149+
console.log('UDP error:', err);
150+
} else {
151+
// console.log('Data sent via UDP', opData);
152+
//console.log('Size: ', message.length);
153+
//console.log('Max: ', maxSz);
154+
}
155+
156+
});
157+
158+
})
159+
} catch (e) {
160+
Log.log('Custom Process - Error: ' + e)
161+
}
162+
163+
}

src/mongofw/index.js

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
'use strict'
2+
3+
/*
4+
* Customizable processor of mongodb changes via change streams.
5+
* DO NOT EDIT THIS FILE! CUSTOMIZE THE customized_module.js file
6+
* {json:scada} - Copyright (c) 2020-2023 - Ricardo L. Olsen
7+
* This file is part of the JSON-SCADA distribution (https://github.com/riclolsen/json-scada).
8+
*
9+
* This program is free software: you can redistribute it and/or modify
10+
* it under the terms of the GNU General Public License as published by
11+
* the Free Software Foundation, version 3.
12+
*
13+
* This program is distributed in the hope that it will be useful, but
14+
* WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16+
* General Public License for more details.
17+
*
18+
* You should have received a copy of the GNU General Public License
19+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
20+
*/
21+
22+
const Log = require('./simple-logger')
23+
const LoadConfig = require('./load-config')
24+
const Redundancy = require('./redundancy')
25+
const { MongoClient } = require('mongodb')
26+
const CustomProcessor = require('./customized_module').CustomProcessor
27+
28+
const args = process.argv.slice(2)
29+
let inst = null
30+
if (args.length > 0) inst = parseInt(args[0])
31+
32+
let logLevel = null
33+
if (args.length > 1) logLevel = parseInt(args[1])
34+
let confFile = null
35+
if (args.length > 2) confFile = args[2]
36+
const jsConfig = LoadConfig(confFile, logLevel, inst)
37+
const MongoStatus = {HintMongoIsConnected : false}
38+
Log.log('Connecting to MongoDB server...')
39+
;(async () => {
40+
let clientMongo = null
41+
while (true) {
42+
if (clientMongo === null)
43+
await MongoClient.connect(
44+
jsConfig.mongoConnectionString,
45+
jsConfig.MongoConnectionOptions
46+
)
47+
.then(async (client) => {
48+
clientMongo = client
49+
MongoStatus.HintMongoIsConnected = true
50+
const db = clientMongo.db(jsConfig.mongoDatabaseName)
51+
Log.log('Connected correctly to MongoDB server')
52+
Redundancy.Start(5000, clientMongo, db, jsConfig, MongoStatus)
53+
CustomProcessor(clientMongo, jsConfig, Redundancy, MongoStatus)
54+
})
55+
.catch(function (err) {
56+
if (clientMongo) clientMongo.close()
57+
clientMongo = null
58+
Log.log(err)
59+
})
60+
61+
// wait 5 seconds
62+
await new Promise((resolve) => setTimeout(resolve, 5000))
63+
64+
// detect connection problems, if error will null the client to later reconnect
65+
if (clientMongo === undefined) {
66+
Log.log('Disconnected Mongodb!')
67+
clientMongo = null
68+
}
69+
if (clientMongo)
70+
if (!(await checkConnectedMongo(clientMongo))) {
71+
// not anymore connected, will retry
72+
Log.log('Disconnected Mongodb!')
73+
if (clientMongo) clientMongo.close()
74+
clientMongo = null
75+
}
76+
}
77+
})()
78+
79+
// test mongoDB connectivity
80+
async function checkConnectedMongo(client) {
81+
if (!client) {
82+
return false
83+
}
84+
const CheckMongoConnectionTimeout = 1000
85+
let tr = setTimeout(() => {
86+
Log.log('Mongo ping timeout error!')
87+
MongoStatus.HintMongoIsConnected = false
88+
}, CheckMongoConnectionTimeout)
89+
90+
let res = null
91+
try {
92+
res = await client.db('admin').command({ ping: 1 })
93+
clearTimeout(tr)
94+
} catch (e) {
95+
Log.log('Error on mongodb connection!')
96+
return false
97+
}
98+
if ('ok' in res && res.ok) {
99+
MongoStatus.HintMongoIsConnected = true
100+
return true
101+
} else {
102+
MongoStatus.HintMongoIsConnected = false
103+
return false
104+
}
105+
}

0 commit comments

Comments
 (0)