Skip to content

Commit

Permalink
Merge pull request #763 from fledge-iot/FOGL-6365
Browse files Browse the repository at this point in the history
FOGL-6365: Fix OMF performance issue with AFLocation OMF Hint
  • Loading branch information
RVerhoeff authored Aug 2, 2022
2 parents 17aabe1 + 148a425 commit 29a1278
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 47 deletions.
10 changes: 5 additions & 5 deletions C/plugins/north/OMF/include/omf.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef _OMF_H
#define _OMF_H
/*
* Fledge OSI Soft OMF interface to PI Server.
* Fledge OSIsoft OMF interface to PI Server.
*
* Copyright (c) 2018 Dianomic Systems
*
Expand Down Expand Up @@ -269,7 +269,7 @@ class OMF

string errorMessageHandler(const string &msg);

// Extract assetName from erro message
// Extract assetName from error message
std::string getAssetNameFromError(const char* message);

// Get asset type-id from cached data
Expand All @@ -291,7 +291,7 @@ class OMF
// Add the 1st level of AF hierarchy if the end point is PI Web API
void setAFHierarchy();

bool handleAFHirerarchy();
bool handleAFHierarchy();
bool handleAFHierarchySystemWide();
bool handleOmfHintHierarchies();

Expand All @@ -318,7 +318,7 @@ class OMF
bool HandleAFMapMetedata(Document& JSon);

private:
// Use for the evaluatin of the OMFDataTypes.typesShort
// Use for the evaluation of the OMFDataTypes.typesShort
union t_typeCount {
struct
{
Expand Down Expand Up @@ -352,7 +352,7 @@ class OMF
std::string m_AFHierarchyLevel;
std::string m_prefixAFAsset;

vector<std::string> m_afhHierarchyAlredyCreated={
vector<std::string> m_afhHierarchyAlreadyCreated={

// Asset Framework path
// {""}
Expand Down
83 changes: 42 additions & 41 deletions C/plugins/north/OMF/omf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ OMFData::OMFData(const Reading& reading, string measurementId, const OMF_ENDPOIN
if (typeid(**it) == typeid(OMFTagHint))
{
measurementId = (*it)->getHint();
Logger::getLogger()->info("Using OMF TagName hint: %s", measurementId.c_str());
Logger::getLogger()->info("Using OMF Tag hint: %s", measurementId.c_str());
}
}
}
Expand Down Expand Up @@ -607,7 +607,7 @@ bool OMF::sendDataTypes(const Reading& row, OMFHints *hints)
* @param msgType message type : Type, Data
* @param jsonData OMF message to send
* @param action action to be executed, either "create"or "delete"
* @return true if succeeded
*/
bool OMF::AFHierarchySendMessage(const string& msgType, string& jsonData, const std::string& action)
{
Expand Down Expand Up @@ -830,8 +830,8 @@ bool OMF::handleAFHierarchySystemWide() {
* Creates all the AF hierarchies levels as requested by the input parameter
* Creates the AF hierarchy if it was not already created
*
* @param AFHierarchy Hierarchies levels to be created as relative or absolute path
* @param out true if succeeded
* @param AFHierarchy Hierarchies levels to be created as relative or absolute path
* @return out true if succeeded
*/
bool OMF::sendAFHierarchy(string AFHierarchy)
{
Expand All @@ -841,7 +841,7 @@ bool OMF::sendAFHierarchy(string AFHierarchy)
string parentPath;


if(find(m_afhHierarchyAlredyCreated.begin(), m_afhHierarchyAlredyCreated.end(), AFHierarchy) == m_afhHierarchyAlredyCreated.end()){
if(find(m_afhHierarchyAlreadyCreated.begin(), m_afhHierarchyAlreadyCreated.end(), AFHierarchy) == m_afhHierarchyAlreadyCreated.end()){

Logger::getLogger()->debug("%s - path created :%s:", __FUNCTION__, AFHierarchy.c_str() );

Expand All @@ -858,7 +858,7 @@ bool OMF::sendAFHierarchy(string AFHierarchy)
parentPath = m_DefaultAFLocation;
}

m_afhHierarchyAlredyCreated.push_back(AFHierarchy);
m_afhHierarchyAlreadyCreated.push_back(AFHierarchy);

success = sendAFHierarchyLevels(parentPath, path, dummy);
} else {
Expand All @@ -871,8 +871,10 @@ bool OMF::sendAFHierarchy(string AFHierarchy)
/**
* Creates all the AF hierarchies level as requested by the input parameter
*
* @param path Full path of hierarchies to create
* @param out last level of the created hierarchy
* @param parentPath Parent path
* @param path Full path of hierarchies to create
* @param lastLevel last level of the created hierarchy
* @return true if succeeded
*/
bool OMF::sendAFHierarchyLevels(string parentPath, string path, std::string &lastLevel) {

Expand Down Expand Up @@ -946,9 +948,9 @@ bool OMF::sendAFHierarchyLevels(string parentPath, string path, std::string &las
/**
* Handle the creation of AF hierarchies
*
* @param out true if succeded
*/
bool OMF::handleAFHirerarchy()
* @return true if succeeded
*/
bool OMF::handleAFHierarchy()
{
bool success = true;

Expand Down Expand Up @@ -1000,8 +1002,8 @@ void OMF::setAFHierarchy()
*
* @param readings A vector of readings data pointers
* @param skipSendDataTypes Send datatypes only once (default is true)
* @return != on success, 0 otherwise
* xxx
* @param compression If true, compress JSON payload before sending to PI
* @return Number of readings sent on success, 0 otherwise
*/
uint32_t OMF::sendToServer(const vector<Reading *>& readings,
bool compression, bool skipSentDataTypes)
Expand All @@ -1022,11 +1024,6 @@ uint32_t OMF::sendToServer(const vector<Reading *>& readings,
threadId << std::this_thread::get_id();

struct timeval start, t1, t2, t3, t4, t5;

#endif


#if INSTRUMENT
gettimeofday(&start, NULL);
#endif

Expand Down Expand Up @@ -1157,7 +1154,7 @@ uint32_t OMF::sendToServer(const vector<Reading *>& readings,
/*
* Check the OMFHints, if there are any, to see if we have a
* type name that should be used for this asset.
* We will still create the tyope, but the name will be fixed
* We will still create the type, but the name will be fixed
* as the value of this hint.
*/
bool usingTypeNameHint = false;
Expand Down Expand Up @@ -1215,7 +1212,7 @@ uint32_t OMF::sendToServer(const vector<Reading *>& readings,
// it sends the hierarchy once
if (sendDataTypes and ! AFHierarchySent)
{
handleAFHirerarchy();
handleAFHierarchy();

AFHierarchySent = true;
}
Expand Down Expand Up @@ -1351,12 +1348,13 @@ uint32_t OMF::sendToServer(const vector<Reading *>& readings,
timersub(&t5, &t4, &tm);
timeT5 = tm.tv_sec + ((double)tm.tv_usec / 1000000);

Logger::getLogger()->debug("Timing seconds - thread :%s: - superSet :%6.3f: - Loop :%6.3f: - compress :%6.3f: - send data :%6.3f: - msg size |%d| - msg size compressed |%d| ",
Logger::getLogger()->debug("Timing seconds - thread :%s: - superSet :%6.3f: - Loop :%6.3f: - compress :%6.3f: - send data :%6.3f: - readings |%d| - msg size |%d| - msg size compressed |%d| ",
threadId.str().c_str(),
timeT1,
timeT2,
timeT3,
timeT4,
readings.size(),
json_not_compressed.length(),
json.length()
);
Expand All @@ -1367,7 +1365,7 @@ uint32_t OMF::sendToServer(const vector<Reading *>& readings,
// Return number of sent readings to the caller
return readings.size();
}
// Exception raised fof HTTP 400 Bad Request
// Exception raised for HTTP 400 Bad Request
catch (const BadRequest& e)
{
if (OMF::isDataTypeError(e.what()))
Expand Down Expand Up @@ -1482,11 +1480,12 @@ string OMF::errorMessageHandler(const string &msg)


/**
* Send all the readings to the PI Server
* Send all the readings to the PI Server.
* Note: this overload is never called.
*
* @param readings A vector of readings data
* @param skipSendDataTypes Send datatypes only once (default is true)
* @return != on success, 0 otherwise
* @return Number of readings sent on success, 0 otherwise
*/
uint32_t OMF::sendToServer(const vector<Reading>& readings,
bool skipSentDataTypes)
Expand Down Expand Up @@ -1582,7 +1581,8 @@ uint32_t OMF::sendToServer(const vector<Reading>& readings,
}

/**
* Send a single reading to the PI Server
* Send a single reading to the PI Server.
* Note: this overload is never called.
*
* @param reading A reading to send
* @return 1 = on success, 0 otherwise
Expand All @@ -1594,10 +1594,11 @@ uint32_t OMF::sendToServer(const Reading& reading,
}

/**
* Send a single reading pointer to the PI Server
* Send a single reading pointer to the PI Server.
* Note: this overload is never called.
*
* @param reading A reading pointer to send
* @return != on success, 0 otherwise
* @return 1 = on success, 0 otherwise
*/
uint32_t OMF::sendToServer(const Reading* reading,
bool skipSentDataTypes)
Expand Down Expand Up @@ -1672,7 +1673,7 @@ uint32_t OMF::sendToServer(const Reading* reading,
* Creates a vector of HTTP header to be sent to Server
*
* @param type The message type ('Type', 'Container', 'Data')
# @param action Action to execute, either "create"or "delete"
* @param action Action to execute, either "create" or "delete"
* @return A vector of HTTP Header string pairs
*/
const vector<pair<string, string>> OMF::createMessageHeader(const std::string& type, const std::string& action) const
Expand Down Expand Up @@ -1767,7 +1768,7 @@ const std::string OMF::createTypeData(const Reading& reading, OMFHints *hints)
}
else
{
omfType = omfTypes[((*it)->getData()).getType()];
omfType = omfTypes[((*it)->getData()).getType()];
}
string format = OMF::getFormatType(omfType);
if (hints && (omfType == OMF_TYPE_FLOAT || omfType == OMF_TYPE_INTEGER))
Expand Down Expand Up @@ -1964,7 +1965,7 @@ std::string OMF::generateMeasurementId(const string& assetName)

}

Logger::getLogger()->debug("%s - mamingScheme default :%ld: namingScheme applied :%ld: assetName :%s: typeId :%ld: measurementId :%s:",
Logger::getLogger()->debug("%s - namingScheme default :%ld: namingScheme applied :%ld: assetName :%s: typeId :%ld: measurementId :%s:",
__FUNCTION__,
m_NamingScheme,
namingScheme,
Expand All @@ -1980,7 +1981,7 @@ std::string OMF::generateMeasurementId(const string& assetName)
* Generate a suffix for the given asset in relation to the selected naming schema and the value of the type id
*
* @param assetName Asset for quick the suffix should be generated
* @param typeId TYpe id of the asset
* @param typeId Type id of the asset
* @return Suffix to be used for the given asset
*/
std::string OMF::generateSuffixType(string &assetName, long typeId)
Expand All @@ -2003,7 +2004,7 @@ std::string OMF::generateSuffixType(string &assetName, long typeId)

}

Logger::getLogger()->debug("%s - mamingScheme default :%ld: namingScheme applied :%ld: typeId :%ld: suffix :%s:",
Logger::getLogger()->debug("%s - namingScheme default :%ld: namingScheme applied :%ld: typeId :%ld: suffix :%s:",
__FUNCTION__,
m_NamingScheme,
namingScheme,
Expand Down Expand Up @@ -2203,12 +2204,12 @@ std::string OMF::createLinkData(const Reading& reading, std::string& AFHierarch
}

/**
* Calculate the prefix to be used for AF objects and the last level of the hiererachies
* Calculate the prefix to be used for AF objects and the last level of the hierarchies
* from a given AF path
*
* @param path Path to evaluate
* @param out/prefix Calculated prefix
* @param out/AFHierarchyLevel last level of the hiererachies evaluated form the path
* @param out/AFHierarchyLevel last level of the hierarchies evaluated form the path
*/
void OMF::generateAFHierarchyPrefixLevel(string& path, string& prefix, string& AFHierarchyLevel)
{
Expand Down Expand Up @@ -2310,7 +2311,7 @@ bool OMF::createAFHierarchyOmfHint(const string& assetName, const string &OmfHi
, prefixStored.c_str()
);

if(find(m_afhHierarchyAlredyCreated.begin(), m_afhHierarchyAlredyCreated.end(), pathNew) == m_afhHierarchyAlredyCreated.end()){
if (find(m_afhHierarchyAlreadyCreated.begin(), m_afhHierarchyAlreadyCreated.end(), pathNew) == m_afhHierarchyAlreadyCreated.end()){

Logger::getLogger()->debug("%s - New path requested :%s:", __FUNCTION__, pathNew.c_str());

Expand All @@ -2325,7 +2326,7 @@ bool OMF::createAFHierarchyOmfHint(const string& assetName, const string &OmfHi
m_AssetNamePrefix[assetName].push_back(item);

} else {
if (OmfHintHierarchy.compare(pathStored) != 0) {
if (pathNew.compare(pathStored) != 0) {

Logger::getLogger()->debug("%s - path changed for the assetName :%s: path :%s: previous path :%s:"
, __FUNCTION__
Expand Down Expand Up @@ -2373,8 +2374,7 @@ bool OMF::createAFHierarchyOmfHint(const string& assetName, const string &OmfHi
* @param variable Variable found in the form ${room:unknown}
* @param value Value of the variable, left part , room in this case ${room:unknown}
* @param defaultValue Default value of the variable, right part , unknown in this case ${room:unknown}
*
* @return True a variable is found in the source string
* @return True a variable is found in the source string
*/
bool OMF::extractVariable(string &strToHandle, string &variable, string &value, string &defaultValue)
{
Expand Down Expand Up @@ -3361,11 +3361,10 @@ void OMF::setMapObjectTypes(const vector<Reading*>& readings,

// Fetch and parse any OMFHint for this reading
Datapoint *hintsdp = reading->getDatapoint("OMFHint");
OMFHints *hints = NULL;

if (hintsdp && (omfType == OMF_TYPE_FLOAT || omfType == OMF_TYPE_INTEGER))
{
hints = new OMFHints(hintsdp->getData().toString());
OMFHints *hints = new OMFHints(hintsdp->getData().toString());
const vector<OMFHint *> omfHints = hints->getHints();

for (auto it = omfHints.cbegin(); it != omfHints.cend(); it++)
Expand All @@ -3376,6 +3375,8 @@ void OMF::setMapObjectTypes(const vector<Reading*>& readings,
break;
}
}

delete hints;
}
}

Expand Down Expand Up @@ -3449,7 +3450,7 @@ void OMF::setMapObjectTypes(const vector<Reading*>& readings,
{
Logger::getLogger()->debug("%s - The asset '" + assetName + " has a datapoint having an unsupported type, it will be ignored", __FUNCTION__);

// Avoids to handled PI-Server unsupported datapoint type
// Avoids forcing PI Server to handle unsupported datapoint types
// std::vector<double> vData = {0};
// DatapointValue vArray(vData);
// values.push_back(new Datapoint((*dp).first, vArray));
Expand Down
23 changes: 22 additions & 1 deletion C/plugins/north/OMF/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@


#define VERBOSE_LOG 0
#define INSTRUMENT 0

using namespace std;
using namespace rapidjson;
Expand Down Expand Up @@ -747,6 +748,10 @@ void plugin_start(const PLUGIN_HANDLE handle,
uint32_t plugin_send(const PLUGIN_HANDLE handle,
const vector<Reading *>& readings)
{
#if INSTRUMENT
struct timeval start, end;
gettimeofday(&start, NULL);
#endif
CONNECTOR_INFO* connInfo = (CONNECTOR_INFO *)handle;

/**
Expand Down Expand Up @@ -850,6 +855,14 @@ uint32_t plugin_send(const PLUGIN_HANDLE handle,
delete connInfo->sender;
delete connInfo->omf;

#if INSTRUMENT
gettimeofday(&end, NULL);
struct timeval tm;
timersub(&end, &start, &tm);
double elapsedTime = tm.tv_sec + ((double)tm.tv_usec / 1000000);
Logger::getLogger()->debug("plugin_send elapsed time: %6.3f seconds", elapsedTime);
#endif

// Return sent data ret code
return ret;
}
Expand Down Expand Up @@ -897,6 +910,14 @@ string plugin_shutdown(PLUGIN_HANDLE handle)
// Delete plugin handle
delete connInfo;

#if INSTRUMENT
// For debugging: write plugin's JSON data to a file
string jsonFilePath = getDataDir() + string("/logs/OMFSaveData.json");
ofstream f(jsonFilePath.c_str(), ios_base::trunc);
f << saveData.str();
f.close();
#endif

// Return current plugin data to save
return saveData.str();
}
Expand All @@ -905,7 +926,7 @@ string plugin_shutdown(PLUGIN_HANDLE handle)
};

/**
* Return a JSON string with the dataTypes to save in plugion_data
* Return a JSON string with the dataTypes to save in plugin_data
*
* Note: the entry with FAKE_ASSET_KEY is never saved.
*
Expand Down

0 comments on commit 29a1278

Please sign in to comment.