From cf8e10b290e6fa759a3a9fb2cb62edd1258215bf Mon Sep 17 00:00:00 2001 From: yshashix Date: Fri, 22 Sep 2023 10:43:36 +0200 Subject: [PATCH] oisp cid:componenet id check removed --- api/sparkplug_data.ingestion.js | 163 ++---------- config.js | 2 +- test/unit/api_spb_data_ingestionTest.js | 336 ------------------------ 3 files changed, 21 insertions(+), 480 deletions(-) diff --git a/api/sparkplug_data.ingestion.js b/api/sparkplug_data.ingestion.js index 366280a7..59fbc077 100644 --- a/api/sparkplug_data.ingestion.js +++ b/api/sparkplug_data.ingestion.js @@ -278,41 +278,7 @@ module.exports = function(logger) { } }; - /**Retrive data from SQL to cache if not present in cache to enhance performance - * SQL check also is useful for verify if the alias/cid sent is previously registered alias or not - */ - - me.getSpBDidAndDataType = async function (item) { - var cid = item.alias; - //check whether cid = uuid - if (!uuidValidate(cid)) { - throw new Error("cid not UUID. Rejected!"); - } - var value = await me.cache.getValues(cid); - var didAndDataType; - if (value === null || (Array.isArray(value) && value.length === 1 && value[0] === null)) { - me.logger.debug("Could not find " + cid + "in cache. Now trying sql query."); - var sqlquery='SELECT devices.id,"dataType" FROM dashboard.device_components,dashboard.devices,dashboard.component_types WHERE "componentId"::text=\'' + cid + '\' and "deviceUID"::text=devices.uid::text and device_components."componentTypeId"::text=component_types.id::text'; - me.logger.debug("Applying SQL query: " + sqlquery); - didAndDataType = await me.cache.sequelize.query(sqlquery, { type: QueryTypes.SELECT }); - me.logger.debug("Result of sql query: " + JSON.stringify(didAndDataType)); - } else { - me.logger.debug("Found in cache: " + JSON.stringify(value) + ", Component is valid"); - didAndDataType = [value]; - } - if (didAndDataType === undefined || didAndDataType === null) { - throw new Error("DB lookup failed!"); - } - var redisResult = await me.cache.setValue(cid, "id", didAndDataType[0].id) && me.cache.setValue(cid, "dataType", didAndDataType[0].dataType); - didAndDataType[0].dataElement = item; - if (redisResult) { - return didAndDataType[0]; - } else { - me.logger.warn("Could not store db value in redis. This will significantly reduce performance"); - return didAndDataType[0]; - } - }; - + me.createKafakaPubData = function(topic,bodyMessage){ /*** For forwarding sparkplugB data directly without kafka metrics format * @param spBkafkaProduce is set in the config file @@ -329,29 +295,13 @@ module.exports = function(logger) { */ bodyMessage.metrics.forEach(item => { var kafkaMessage = item; - if (subTopic[2] === "NBIRTH") { + if (subTopic[2] === "NBIRTH" || subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") { var key = topic; var message = {key, value: JSON.stringify(kafkaMessage)}; me.logger.debug("Selecting kafka message topic SparkplugB with spB format payload for data type: "+subTopic[2]); me.kafkaAggregator.addMessage(message,config.sparkplug.spBkafKaTopic); return true; - } - /**Validating component id in the database to check for DDATA */ - else if (subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") { - me.getSpBDidAndDataType(item).then(values => { - if (values) { - me.logger.debug("SpB payload is valid with component registered." + kafkaMessage.name); - var key = topic; - var addMessageTopic = config.sparkplug.spBkafKaTopic; - let message = {key, value: JSON.stringify(kafkaMessage)}; - me.kafkaAggregator.addMessage(message, addMessageTopic); - return true; - } - }).catch(function(err) { - me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err); - return false; - }); - } + } }); } if ( config.sparkplug.ngsildKafkaProduce) { @@ -367,71 +317,25 @@ module.exports = function(logger) { /**Validating component id in the database to check for DDATA */ else if (subTopic[2] === "DDATA") { let metricName = kafkaMessage.name.split("/"); - let metricType = metricName[0]; - me.getSpBDidAndDataType(item).then(values => { - if (values) { - me.logger.debug("SpB payload is valid with component registered for :" + kafkaMessage.name); - var key = topic; - var ngsiMappedKafkaMessage; - if (metricType === "Relationship") { - ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage); - me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage)); - let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; - me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); - } else if (metricType === "Property") { - ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage); - me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage)); - let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; - me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); - } else { - me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: "); - } - return true; - } - }).catch(function(err) { - me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err); - return false; - }); + let metricType = metricName[0]; + var key = topic; + var ngsiMappedKafkaMessage; + if (metricType === "Relationship") { + ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage); + me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage)); + let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; + me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); + } else if (metricType === "Property") { + ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage); + me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage)); + let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)}; + me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic); + } else { + me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: "); + } + return true; } }); - } else if (!config.sparkplug.spBKafkaProduce && !config.sparkplug.ngsildKafkaProduce) { - /*** For forwarding sparkplugB data according to kafka metrics format to be shown in dashboard - * @param kafkaProduce is set in the config file - * */ - me.logger.debug("Kafka metric topic is selected as to meet frontend dashboard data format"); - // For NBIRTH message type with kafka topic, ignoring the kafka send - let subTopic = topic.split("/"); - if (subTopic[2] === "NBIRTH") { - me.logger.info("Received spB NBIRTH message, ignoring currently kafka forward for metric topic"); - return true; - } - // Go through payload metrics and check whether cid/alias is correct - // Check CID in catch or SQL table for security check of component registered - var promarray = []; - bodyMessage.metrics.forEach(item => { - var value = me.getSpBDidAndDataType(item); - promarray.push(value); - }); - Promise.all(promarray) - .then(values => - values.map(item => { - var kafkaMessage = me.prepareKafkaPayload(item, accountId); - if (kafkaMessage === null) { - var msg = "Validation of " + JSON.stringify(item.dataElement) + " for type " + item.dataType + " failed!"; - me.logger.warn(msg); - me.sendErrorOverChannel(accountId, devID, msg); - return; - } - var key = accountId + "." + kafkaMessage.cid; - var message = {key, value: JSON.stringify(kafkaMessage)}; - me.logger.debug("Received spB message, kafka payload created to forward for metric topic :" + JSON.stringify(message)); - me.kafkaAggregator.addMessage(message, config.kafka.metricsTopic); - }) - ) - .catch(function(err) { - me.logger.warn("Could not send data to Kafka " + err); - return; - }); } }; @@ -500,31 +404,4 @@ module.exports = function(logger) { me.connectTopics(); }; - /** - * Prepare datapoint from frontend data structure to Kafka like the following example: - * {"dataType":"Number", "aid":"account_id", "cid":"component_id", "value":"1", - * "systemOn": 1574262569420, "on": 1574262569420, "loc": null} - */ - me.prepareKafkaPayload = function(didAndDataType, accountId){ - var dataElement = didAndDataType.dataElement; - var value = normalizeBoolean(dataElement.value.toString(), didAndDataType.dataType); - if (!validate(value, didAndDataType.dataType)) { - return null; - } - const msg = { - dataType: didAndDataType.dataType, - aid: accountId, - cid: dataElement.alias, - on: dataElement.timestamp, - systemOn : dataElement.timestamp, - value: value - }; - if (undefined !== dataElement.attributes) { - msg.attributes = dataElement.attributes; - } - if (undefined !== dataElement.loc) { - msg.loc = dataElement.loc; - } - return msg; - }; }; diff --git a/config.js b/config.js index ebe96047..a8a10703 100644 --- a/config.js +++ b/config.js @@ -113,7 +113,7 @@ var config = { * */ "sparkplug": { - "spBKafkaProduce": parsedConfig.spbEnable || false, + "spBKafkaProduce": parsedConfig.spbEnable || true, "spBkafKaTopic": parsedConfig.spbTopic || "sparkplugB", "ngsildKafkaProduce": parsedConfig.ngsildEnable || false, "ngsildKafkaTopic": parsedConfig.ngsildTopic || "ngsildSpB", diff --git a/test/unit/api_spb_data_ingestionTest.js b/test/unit/api_spb_data_ingestionTest.js index d697d8f6..d430a9f7 100644 --- a/test/unit/api_spb_data_ingestionTest.js +++ b/test/unit/api_spb_data_ingestionTest.js @@ -246,21 +246,6 @@ describe(fileToTest, function() { assert.equal(type, "seq", "Wrong key value"); return 0; }, - getValues(key) { - assert.deepEqual(key, cid, "Wrong CID cache value received."); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "componentId": cid, - "on": 12345, - "value": "value", - "systemOn": 2 - } - }; - return didAndDataType; - }, setValue(key, type) { assert.oneOf(key, ["accountId/eonID",cid], "Wrong key on cache value received."); assert.oneOf(type, [ "seq", "id", "dataType" ], "Wrong type to se value received."); @@ -317,7 +302,6 @@ describe(fileToTest, function() { timestamp: 12345, metrics: [{ name : "bdseq", - alias : cid, timestamp : 12345, dataType : "Uint64", value: 123 @@ -341,7 +325,6 @@ describe(fileToTest, function() { timestamp: 12345, metrics: [{ name : "temp", - alias : cid, timestamp : 12345, dataType : "Uint64", value: 123 @@ -457,78 +440,6 @@ describe(fileToTest, function() { .catch((e) => done(e)); }); - it('Verify SparkplugB metric existence and get its DeviceId and dataType from cache using alias Id', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var DataMessage = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - }], - seq: 2 - }; - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - } - }; - spbdataIngestion.getSpBDidAndDataType(DataMessage.metrics[0]) - .then((result) => { - assert.deepEqual(result, didAndDataType, "Invalid alias ID specific to device, not in cache/DB "); - done(); - }) - .catch((e) => done(e)); - }); - - it('KafkaProduce for SparkplugB metric fails due to Invalid CID/Alias Id', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var DataMessage = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : "c574252-31d5-4b76-bce6-53f2c56b", - timestamp : 12345, - dataType : "Uint64", - value: "value" - }], - seq: 2 - }; - var validateSpbDevSeq = function() { - return Promise.resolve (true); - }; - var errorMessage = "cid not UUID. Rejected!"; - spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType(DataMessage.metrics[0]) - .then(() => { - done(); - }) - .catch((err) => { - assert.deepEqual(err, errorMessage, "Valid alias ID specific to device "); - done(); - }); - var kafkaPubReturn= spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DBIRTH/eonID/deviceId", DataMessage); - assert.oneOf(kafkaPubReturn, [ false, undefined ], " Possible to produce kafka message for wrong alias/CID id: " + kafkaPubReturn); - done(); - }); - it('KafkaProduce for SparkplugB metric fails due to empty sparkplugB message', function (done) { config.sparkplug.spBKafkaProduce = true; ToTest.__set__("Kafka", Kafka); @@ -617,22 +528,6 @@ describe(fileToTest, function() { return Promise.resolve (true); }; - var getSpBDidAndDataType = async function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - } - }; - return didAndDataType; - - }; config.sparkplug.spBKafkaProduce = true; ToTest.__set__("Kafka", Kafka); ToTest.__set__("config", config); @@ -641,7 +536,6 @@ describe(fileToTest, function() { var spbdataIngestion = new ToTest(logger); spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; var message = { timestamp: 12345, metrics: [{ @@ -658,127 +552,6 @@ describe(fileToTest, function() { spbdataIngestion.processDataIngestion("spBv1.0/accountId/DDATA/eonID/deviceId", message); }); - it('Create Kafka publish data on kafka metric topic', function (done) { - var Kafka = function() { - return { - producer: function(){ - return { - connect: function() { - }, - on: function() { - }, - send: function(payload) { - message = payload.messages[0]; - assert.equal(message.key, "accountId." + cid,"Received Kafka payload key not correct"); - var value = { - dataType: "String", - aid: "accountId", - cid: cid, - value:"value", - systemOn:1, - on:1, - loc:null - }; - assert.deepEqual(JSON.parse(message.value), value, "Received Kafke message not correct"); - spbdataIngestion.stopAggregator(); - done(); - return new Promise(() => {}); - }, - events: "event" - }; - } - }; - }; - var prepareKafkaPayload = function(){ - return {"dataType":"String", "aid":"accountId", "cid":cid, "value":"value", "systemOn": 1, "on": 1, "loc": null}; - }; - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - } - }; - return didAndDataType; - }; - - config.sparkplug.spBKafkaProduce = false; - config.sparkplug.ngsildKafkaProduce = false; - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("config", config); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("KafkaAggregator", origKafkaAggregator); - - var spbdataIngestion = new ToTest(logger); - - spbdataIngestion.prepareKafkaPayload = prepareKafkaPayload; - - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; - var message = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Uint64", - value: "value" - }], - seq: 0 - }; - spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DBIRTH/eonID/deviceId", message); - spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DDATA/eonID/deviceId", message); - }); - - it('Shall Kafka publish data on kafka metric topic return undefined due to mismatch of datatype', function (done) { - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "Boolean", - on: 1, - dataElement: - { - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Boolean", - value: "NoTrue" - } - }; - return didAndDataType; - }; - - config.sparkplug.spBKafkaProduce = false; - config.sparkplug.ngsildKafkaProduce = false; - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("config", config); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - - var spbdataIngestion = new ToTest(logger); - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; - var message = { - timestamp: 12345, - metrics: [{ - name : "temp", - alias : cid, - timestamp : 12345, - dataType : "Boolean", - value: "NoTrue" - }], - seq: 0 - }; - var KafkaPubDataReturnNbirth = spbdataIngestion.createKafakaPubData("spBv1.0/accountId/NBIRTH/eonID", message); - assert.deepEqual(KafkaPubDataReturnNbirth, true, "Unable to send NBIRTH Message"); - var KafkaPubDataReturnDdata = spbdataIngestion.createKafakaPubData("spBv1.0/accountId/DDATA/eonID/deviceId", message); - assert.deepEqual(KafkaPubDataReturnDdata, undefined, "Unable to validate createkafkaPubData with wrong datatype"); - done(); - }); - it('Create Kafka publish Relationship data on NGSI-LD Spb topic', function (done) { var Kafka = function() { return { @@ -816,22 +589,6 @@ describe(fileToTest, function() { return Promise.resolve (true); }; - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "Relationship/https://industry-fusion.com/types/v0.9/hasFilter", - alias : cid, - timestamp : 12345, - dataType : "string", - value: "value" - } - }; - return Promise.resolve (didAndDataType); - - }; config.sparkplug.spBKafkaProduce = false; config.sparkplug.ngsildKafkaProduce = true; ToTest.__set__("Kafka", Kafka); @@ -841,7 +598,6 @@ describe(fileToTest, function() { var spbdataIngestion = new ToTest(logger); spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; var message = { timestamp: 12345, metrics: [{ @@ -893,22 +649,6 @@ describe(fileToTest, function() { return Promise.resolve (true); }; - var getSpBDidAndDataType = function(){ - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - name : "Property/https://industry-fusion.com/types/v0.9/hasFilter", - alias : cid, - timestamp : 12345, - dataType : "string", - value: "value" - } - }; - return Promise.resolve (didAndDataType); - - }; config.sparkplug.spBKafkaProduce = true; config.sparkplug.ngsildKafkaProduce = true; ToTest.__set__("Kafka", Kafka); @@ -918,7 +658,6 @@ describe(fileToTest, function() { var spbdataIngestion = new ToTest(logger); spbdataIngestion.validateSpbDevSeq =validateSpbDevSeq; - spbdataIngestion.getSpBDidAndDataType= getSpBDidAndDataType; var message = { timestamp: 12345, metrics: [{ @@ -1179,79 +918,4 @@ describe(fileToTest, function() { done(); }); - it('Shall prepare Kafka payload for kafka topic', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "alias": cid, - "on": 1, - "value": "value", - "timestamp": 1 - } - }; - - var msg = spbdataIngestion.prepareKafkaPayload(didAndDataType, "accountId"); - var expectedMsg = { - dataType: "String", - aid: "accountId", - value: "value", - cid: cid, - on: 1, - systemOn: 1 - }; - assert.deepEqual(msg, expectedMsg, "Wrong kafka payload"); - done(); - }); - - it('Shall prepare Kafka payload for kafka topic with loc and attributes', function (done) { - ToTest.__set__("Kafka", Kafka); - ToTest.__set__("CacheFactory", CacheFactory); - ToTest.__set__("config", config); - ToTest.__set__("KafkaAggregator", KafkaAggregator); - var spbdataIngestion = new ToTest(logger); - var didAndDataType = { - dataType: "String", - on: 1, - dataElement: - { - "alias": cid, - "on": 1, - "value": "value", - "timestamp": 1, - "attributes": { - hardware_model : "linux" - }, - loc : { - lat : 88, - long : 64 - } - } - }; - - var msg = spbdataIngestion.prepareKafkaPayload(didAndDataType, "accountId"); - var expectedMsg = { - dataType: "String", - aid: "accountId", - value: "value", - cid: cid, - on: 1, - systemOn: 1, - attributes: { - hardware_model : "linux" - }, - loc : { - lat : 88, - long : 64 - } - }; - assert.deepEqual(msg, expectedMsg, "Wrong kafka payload"); - done(); - }); });