Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
oisp cid:componenet id check removed
Browse files Browse the repository at this point in the history
  • Loading branch information
yshashix committed Sep 22, 2023
1 parent 98402c9 commit cf8e10b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 480 deletions.
163 changes: 20 additions & 143 deletions api/sparkplug_data.ingestion.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,41 +278,7 @@ module.exports = function(logger) {
}

};
/**Retrive data from SQL to cache if not present in cache to enhance performance
* SQL check also is useful for verify if the alias/cid sent is previously registered alias or not
*/

me.getSpBDidAndDataType = async function (item) {
var cid = item.alias;
//check whether cid = uuid
if (!uuidValidate(cid)) {
throw new Error("cid not UUID. Rejected!");
}
var value = await me.cache.getValues(cid);
var didAndDataType;
if (value === null || (Array.isArray(value) && value.length === 1 && value[0] === null)) {
me.logger.debug("Could not find " + cid + "in cache. Now trying sql query.");
var sqlquery='SELECT devices.id,"dataType" FROM dashboard.device_components,dashboard.devices,dashboard.component_types WHERE "componentId"::text=\'' + cid + '\' and "deviceUID"::text=devices.uid::text and device_components."componentTypeId"::text=component_types.id::text';
me.logger.debug("Applying SQL query: " + sqlquery);
didAndDataType = await me.cache.sequelize.query(sqlquery, { type: QueryTypes.SELECT });
me.logger.debug("Result of sql query: " + JSON.stringify(didAndDataType));
} else {
me.logger.debug("Found in cache: " + JSON.stringify(value) + ", Component is valid");
didAndDataType = [value];
}
if (didAndDataType === undefined || didAndDataType === null) {
throw new Error("DB lookup failed!");
}
var redisResult = await me.cache.setValue(cid, "id", didAndDataType[0].id) && me.cache.setValue(cid, "dataType", didAndDataType[0].dataType);
didAndDataType[0].dataElement = item;
if (redisResult) {
return didAndDataType[0];
} else {
me.logger.warn("Could not store db value in redis. This will significantly reduce performance");
return didAndDataType[0];
}
};


me.createKafakaPubData = function(topic,bodyMessage){
/*** For forwarding sparkplugB data directly without kafka metrics format
* @param spBkafkaProduce is set in the config file
Expand All @@ -329,29 +295,13 @@ module.exports = function(logger) {
*/
bodyMessage.metrics.forEach(item => {
var kafkaMessage = item;
if (subTopic[2] === "NBIRTH") {
if (subTopic[2] === "NBIRTH" || subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") {
var key = topic;
var message = {key, value: JSON.stringify(kafkaMessage)};
me.logger.debug("Selecting kafka message topic SparkplugB with spB format payload for data type: "+subTopic[2]);
me.kafkaAggregator.addMessage(message,config.sparkplug.spBkafKaTopic);
return true;
}
/**Validating component id in the database to check for DDATA */
else if (subTopic[2] === "DDATA" || subTopic[2] === "DBIRTH") {
me.getSpBDidAndDataType(item).then(values => {
if (values) {
me.logger.debug("SpB payload is valid with component registered." + kafkaMessage.name);
var key = topic;
var addMessageTopic = config.sparkplug.spBkafKaTopic;
let message = {key, value: JSON.stringify(kafkaMessage)};
me.kafkaAggregator.addMessage(message, addMessageTopic);
return true;
}
}).catch(function(err) {
me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err);
return false;
});
}
}
});
}
if ( config.sparkplug.ngsildKafkaProduce) {
Expand All @@ -367,71 +317,25 @@ module.exports = function(logger) {
/**Validating component id in the database to check for DDATA */
else if (subTopic[2] === "DDATA") {
let metricName = kafkaMessage.name.split("/");
let metricType = metricName[0];
me.getSpBDidAndDataType(item).then(values => {
if (values) {
me.logger.debug("SpB payload is valid with component registered for :" + kafkaMessage.name);
var key = topic;
var ngsiMappedKafkaMessage;
if (metricType === "Relationship") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else if (metricType === "Property") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else {
me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: ");
}
return true;
}
}).catch(function(err) {
me.logger.warn("Could not send data to Kafka due to SpB not availiable in DB/cache " + err);
return false;
});
let metricType = metricName[0];
var key = topic;
var ngsiMappedKafkaMessage;
if (metricType === "Relationship") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbRelationshipToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Relationship data to NGSI-LD relationship type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else if (metricType === "Property") {
ngsiMappedKafkaMessage = ngsildMapper.mapSpbPropertyToKafka(devID, kafkaMessage);
me.logger.debug(" Mapped SpB Properties data to NGSI-LD properties type: "+ JSON.stringify(ngsiMappedKafkaMessage));
let message = {key, value: JSON.stringify(ngsiMappedKafkaMessage)};
me.kafkaAggregator.addMessage(message, config.sparkplug.ngsildKafkaTopic);
} else {
me.logger.debug(" Unable to create kafka message topic for SpBNGSI-LD topic for Metric Name: "+ kafkaMessage.name + " ,doesn't match NGSI-LD Name type: ");
}
return true;
}
});
} else if (!config.sparkplug.spBKafkaProduce && !config.sparkplug.ngsildKafkaProduce) {
/*** For forwarding sparkplugB data according to kafka metrics format to be shown in dashboard
* @param kafkaProduce is set in the config file
* */
me.logger.debug("Kafka metric topic is selected as to meet frontend dashboard data format");
// For NBIRTH message type with kafka topic, ignoring the kafka send
let subTopic = topic.split("/");
if (subTopic[2] === "NBIRTH") {
me.logger.info("Received spB NBIRTH message, ignoring currently kafka forward for metric topic");
return true;
}
// Go through payload metrics and check whether cid/alias is correct
// Check CID in catch or SQL table for security check of component registered
var promarray = [];
bodyMessage.metrics.forEach(item => {
var value = me.getSpBDidAndDataType(item);
promarray.push(value);
});
Promise.all(promarray)
.then(values =>
values.map(item => {
var kafkaMessage = me.prepareKafkaPayload(item, accountId);
if (kafkaMessage === null) {
var msg = "Validation of " + JSON.stringify(item.dataElement) + " for type " + item.dataType + " failed!";
me.logger.warn(msg);
me.sendErrorOverChannel(accountId, devID, msg);
return;
}
var key = accountId + "." + kafkaMessage.cid;
var message = {key, value: JSON.stringify(kafkaMessage)};
me.logger.debug("Received spB message, kafka payload created to forward for metric topic :" + JSON.stringify(message));
me.kafkaAggregator.addMessage(message, config.kafka.metricsTopic);
})
)
.catch(function(err) {
me.logger.warn("Could not send data to Kafka " + err);
return;
});
}
};

Expand Down Expand Up @@ -500,31 +404,4 @@ module.exports = function(logger) {
me.connectTopics();
};

/**
* Prepare datapoint from frontend data structure to Kafka like the following example:
* {"dataType":"Number", "aid":"account_id", "cid":"component_id", "value":"1",
* "systemOn": 1574262569420, "on": 1574262569420, "loc": null}
*/
me.prepareKafkaPayload = function(didAndDataType, accountId){
var dataElement = didAndDataType.dataElement;
var value = normalizeBoolean(dataElement.value.toString(), didAndDataType.dataType);
if (!validate(value, didAndDataType.dataType)) {
return null;
}
const msg = {
dataType: didAndDataType.dataType,
aid: accountId,
cid: dataElement.alias,
on: dataElement.timestamp,
systemOn : dataElement.timestamp,
value: value
};
if (undefined !== dataElement.attributes) {
msg.attributes = dataElement.attributes;
}
if (undefined !== dataElement.loc) {
msg.loc = dataElement.loc;
}
return msg;
};
};
2 changes: 1 addition & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var config = {
*
*/
"sparkplug": {
"spBKafkaProduce": parsedConfig.spbEnable || false,
"spBKafkaProduce": parsedConfig.spbEnable || true,
"spBkafKaTopic": parsedConfig.spbTopic || "sparkplugB",
"ngsildKafkaProduce": parsedConfig.ngsildEnable || false,
"ngsildKafkaTopic": parsedConfig.ngsildTopic || "ngsildSpB",
Expand Down
Loading

0 comments on commit cf8e10b

Please sign in to comment.