diff --git a/C/common/asset_tracking.cpp b/C/common/asset_tracking.cpp index 5fda168187..4329c540d0 100644 --- a/C/common/asset_tracking.cpp +++ b/C/common/asset_tracking.cpp @@ -41,6 +41,8 @@ AssetTracker::AssetTracker(ManagementClient *mgtClient, string service) /** * Fetch all asset tracking tuples from DB and populate local cache * + * Return the vector of deprecated asset names + * * @param plugin Plugin name * @param event Event name */ @@ -51,7 +53,9 @@ void AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event* for (AssetTrackingTuple* & rec : vec) { assetTrackerTuplesCache.insert(rec); - //Logger::getLogger()->info("Added asset tracker tuple to cache: '%s'", rec->assetToString().c_str()); + + Logger::getLogger()->debug("Added asset tracker tuple to cache: '%s'", + rec->assetToString().c_str()); } delete (&vec); } @@ -60,8 +64,9 @@ void AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event* Logger::getLogger()->error("Failed to populate asset tracking tuples' cache"); return; } -} + return; +} /** * Check local cache for a given asset tracking tuple @@ -81,6 +86,19 @@ bool AssetTracker::checkAssetTrackingCache(AssetTrackingTuple& tuple) return true; } +AssetTrackingTuple* AssetTracker::findAssetTrackingCache(AssetTrackingTuple& tuple) +{ + AssetTrackingTuple *ptr = &tuple; + std::unordered_set::const_iterator it = assetTrackerTuplesCache.find(ptr); + if (it == assetTrackerTuplesCache.end()) + { + return NULL; + } + else + { + return *it; + } +} /** * Add asset tracking tuple via microservice management API and in cache diff --git a/C/common/include/asset_tracking.h b/C/common/include/asset_tracking.h index be1601660d..b6e2ace2d5 100644 --- a/C/common/include/asset_tracking.h +++ b/C/common/include/asset_tracking.h @@ -32,20 +32,40 @@ class AssetTrackingTuple { std::string assetToString() { std::ostringstream o; - o << "service:" << m_serviceName << ", plugin:" << m_pluginName << ", asset:" << m_assetName << ", event:" << m_eventName; + o << "service:" << m_serviceName << + ", plugin:" << m_pluginName << + ", asset:" << m_assetName << + ", event:" << m_eventName << + ", deprecated:" << m_deprecated; return o.str(); } inline bool operator==(const AssetTrackingTuple& x) const { - return ( x.m_serviceName==m_serviceName && x.m_pluginName==m_pluginName && x.m_assetName==m_assetName && x.m_eventName==m_eventName); + return ( x.m_serviceName==m_serviceName && + x.m_pluginName==m_pluginName && + x.m_assetName==m_assetName && + x.m_eventName==m_eventName); } - AssetTrackingTuple(const std::string& service, const std::string& plugin, - const std::string& asset, const std::string& event) : - m_serviceName(service), m_pluginName(plugin), - m_assetName(asset), m_eventName(event) + AssetTrackingTuple(const std::string& service, + const std::string& plugin, + const std::string& asset, + const std::string& event, + const bool& deprecated = false) : + m_serviceName(service), + m_pluginName(plugin), + m_assetName(asset), + m_eventName(event), + m_deprecated(deprecated) {} + + std::string& getAssetName() { return m_assetName; }; + bool isDeprecated() { return m_deprecated; }; + void unDeprecate() { m_deprecated = false; }; + +private: + bool m_deprecated; }; struct AssetTrackingTuplePtrEqual { @@ -90,6 +110,8 @@ class AssetTracker { static AssetTracker *getAssetTracker(); void populateAssetTrackingCache(std::string plugin, std::string event); bool checkAssetTrackingCache(AssetTrackingTuple& tuple); + AssetTrackingTuple* + findAssetTrackingCache(AssetTrackingTuple& tuple); void addAssetTrackingTuple(AssetTrackingTuple& tuple); void addAssetTrackingTuple(std::string plugin, std::string asset, std::string event); std::string diff --git a/C/common/include/expression.h b/C/common/include/expression.h index 9f2b73c16c..6e1fbd462d 100644 --- a/C/common/include/expression.h +++ b/C/common/include/expression.h @@ -43,6 +43,7 @@ class Expression { case JSON_COLUMN: case BOOL_COLUMN: case STRING_COLUMN: + case NULL_COLUMN: break; case INT_COLUMN: json << m_value.ival; diff --git a/C/common/include/insert.h b/C/common/include/insert.h index 959512d686..3b0dd82689 100644 --- a/C/common/include/insert.h +++ b/C/common/include/insert.h @@ -61,6 +61,15 @@ class InsertValue { strncpy(m_value.str, s.c_str(), s.length() + 1); m_type = JSON_COLUMN; }; + + // Insert a NULL value for the given column + InsertValue(const std::string& column) : + m_column(column) + { + m_type = NULL_COLUMN; + m_value.str = NULL; + } + InsertValue(const InsertValue& rhs) : m_column(rhs.m_column) { m_type = rhs.m_type; @@ -78,6 +87,9 @@ class InsertValue { case JSON_COLUMN: // Internally stored a a string m_value.str = strdup(rhs.m_value.str); break; + case NULL_COLUMN: + m_value.str = NULL; + break; case BOOL_COLUMN: // TODO break; @@ -112,6 +124,10 @@ class InsertValue { case STRING_COLUMN: json << "\"" << m_value.str << "\""; break; + case NULL_COLUMN: + // JSON output for NULL value + json << "null"; + break; } return json.str(); } diff --git a/C/common/include/management_client.h b/C/common/include/management_client.h index 8a73cc3a23..c400a60075 100644 --- a/C/common/include/management_client.h +++ b/C/common/include/management_client.h @@ -92,6 +92,9 @@ class ManagementClient { std::vector > >& endpoints); bool deleteProxy(const std::string& serviceName); const std::string getUrlbase() { return m_urlbase.str(); } + AssetTrackingTuple* getAssetTrackingTuple(const std::string& serviceName, + const std::string& assetName, + const std::string& event); private: std::ostringstream m_urlbase; diff --git a/C/common/include/resultset.h b/C/common/include/resultset.h index 3b693b2e20..b10906d9a6 100644 --- a/C/common/include/resultset.h +++ b/C/common/include/resultset.h @@ -21,7 +21,8 @@ typedef enum column_type { NUMBER_COLUMN, STRING_COLUMN, BOOL_COLUMN, - JSON_COLUMN + JSON_COLUMN, + NULL_COLUMN } ColumnType; diff --git a/C/common/management_client.cpp b/C/common/management_client.cpp index 46ced267b2..818735a618 100644 --- a/C/common/management_client.cpp +++ b/C/common/management_client.cpp @@ -778,7 +778,25 @@ std::vector& ManagementClient::getAssetTrackingTuples(const { throw runtime_error("Expected asset tracker tuple to be an object"); } - AssetTrackingTuple *tuple = new AssetTrackingTuple(rec["service"].GetString(), rec["plugin"].GetString(), rec["asset"].GetString(), rec["event"].GetString()); + + // Note: deprecatedTimestamp NULL value is returned as "" + // otherwise it's a string DATE + bool deprecated = rec.HasMember("deprecatedTimestamp") && + strlen(rec["deprecatedTimestamp"].GetString()); + + AssetTrackingTuple *tuple = new AssetTrackingTuple(rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); + + m_logger->debug("Adding AssetTracker tuple for service %s: %s:%s:%s, " \ + "deprecated state is %d", + rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); vec->push_back(tuple); } } @@ -809,7 +827,9 @@ std::vector& ManagementClient::getAssetTrackingTuples(const * @return whether operation was successful */ bool ManagementClient::addAssetTrackingTuple(const std::string& service, - const std::string& plugin, const std::string& asset, const std::string& event) + const std::string& plugin, + const std::string& asset, + const std::string& event) { ostringstream convert; @@ -1327,3 +1347,102 @@ bool ManagementClient::deleteProxy(const std::string& serviceName) } return false; } +/** + * Get the asset tracking tuple + * for a service and asset name + * + * @param serviceName The serviceName to restrict data fetch + * @param assetName The asset name that belongs to the service + * @param event The associated event type + * @return A vector of pointers to AssetTrackingTuple objects allocated on heap + */ +AssetTrackingTuple* ManagementClient::getAssetTrackingTuple(const std::string& serviceName, + const std::string& assetName, + const std::string& event) +{ + AssetTrackingTuple* tuple = NULL; + try { + string url = "/fledge/track"; + if (serviceName == "" && assetName == "" && event == "") + { + m_logger->error("Failed to fetch asset tracking tuple: " \ + "service name, asset name and event type are required."); + throw new exception(); + } + + url += "?service=" + urlEncode(serviceName); + url += "&asset=" + urlEncode(assetName) + "&event=" + event; + + auto res = this->getHttpClient()->request("GET", url.c_str()); + Document doc; + string response = res->content.string(); + doc.Parse(response.c_str()); + if (doc.HasParseError()) + { + bool httpError = (isdigit(response[0]) && + isdigit(response[1]) && + isdigit(response[2]) && + response[3]==':'); + m_logger->error("%s fetch asset tracking tuple: %s\n", + httpError?"HTTP error during":"Failed to parse result of", + response.c_str()); + throw new exception(); + } + else if (doc.HasMember("message")) + { + m_logger->error("Failed to fetch asset tracking tuple: %s.", + doc["message"].GetString()); + throw new exception(); + } + else + { + const rapidjson::Value& trackArray = doc["track"]; + if (trackArray.IsArray()) + { + // Process every row and create the AssetTrackingTuple object + for (auto& rec : trackArray.GetArray()) + { + if (!rec.IsObject()) + { + throw runtime_error("Expected asset tracker tuple to be an object"); + } + + // Note: deprecatedTimestamp NULL value is returned as "" + // otherwise it's a string DATE + bool deprecated = rec.HasMember("deprecatedTimestamp") && + strlen(rec["deprecatedTimestamp"].GetString()); + + // Create a new AssetTrackingTuple object, to be freed by the caller + tuple = new AssetTrackingTuple(rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); + + m_logger->debug("Adding AssetTracker tuple for service %s: %s:%s:%s, " \ + "deprecated state is %d", + rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); + } + } + else + { + throw runtime_error("Expected array of rows in asset track tuples array"); + } + + return tuple; + } + } catch (const SimpleWeb::system_error &e) { + m_logger->error("Fetch/parse of asset tracking tuples for service %s failed: %s.", + serviceName.c_str(), + e.what()); + } catch (...) { + m_logger->error("Unexpected exception when retrieving asset tuples for service %s", + serviceName.c_str()); + } + + return tuple; +} diff --git a/C/plugins/storage/postgres/connection.cpp b/C/plugins/storage/postgres/connection.cpp index 8ee5424b56..0c8e335ad2 100644 --- a/C/plugins/storage/postgres/connection.cpp +++ b/C/plugins/storage/postgres/connection.cpp @@ -1048,6 +1048,11 @@ SQLBuffer sql; sql.append(escape(buffer.GetString())); sql.append('\''); } + // Handle JSON value null: "item" : null + else if (itr->value.IsNull()) + { + sql.append("NULL"); + } col++; } } @@ -2752,7 +2757,9 @@ bool Connection::jsonModifiers(const Value& payload, SQLBuffer& sql) * Convert a JSON where clause into a PostresSQL where clause * */ -bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, const string& prefix) +bool Connection::jsonWhereClause(const Value& whereClause, + SQLBuffer& sql, + const string& prefix) { if (!whereClause.IsObject()) { @@ -2769,11 +2776,6 @@ bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, const raiseError("where clause", "The \"where\" object is missing a \"condition\" property"); return false; } - if (!whereClause.HasMember("value")) - { - raiseError("where clause", "The \"where\" object is missing a \"value\" property"); - return false; - } // Handle WHERE 1 = 1, 0.55 = 0.55 etc string whereColumnName = whereClause["column"].GetString(); @@ -2796,100 +2798,117 @@ bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, const sql.append(' '); string cond = whereClause["condition"].GetString(); - if (!cond.compare("older")) + + if (cond.compare("isnull") == 0) { - if (!whereClause["value"].IsInt()) - { - raiseError("where clause", "The \"value\" of an \"older\" condition must be an integer"); - return false; - } - sql.append("< now() - INTERVAL '"); - sql.append(whereClause["value"].GetInt()); - sql.append(" seconds'"); + sql.append("isnull "); } - else if (!cond.compare("newer")) + else if (cond.compare("notnull") == 0) { - if (!whereClause["value"].IsInt()) + sql.append("notnull "); + } + else + { + if (!whereClause.HasMember("value")) { - raiseError("where clause", "The \"value\" of an \"newer\" condition must be an integer"); + raiseError("where clause", "The \"where\" object is missing a \"value\" property"); return false; } - sql.append("> now() - INTERVAL '"); - sql.append(whereClause["value"].GetInt()); - sql.append(" seconds'"); - } - else if (!cond.compare("in") || !cond.compare("not in")) - { - // Check we have a non empty array - if (whereClause["value"].IsArray() && - whereClause["value"].Size()) + if (!cond.compare("older")) { - sql.append(cond); - sql.append(" ( "); - int field = 0; - for (Value::ConstValueIterator itr = whereClause["value"].Begin(); - itr != whereClause["value"].End(); - ++itr) + if (!whereClause["value"].IsInt()) { - if (field) - { - sql.append(", "); - } - field++; - if (itr->IsNumber()) + raiseError("where clause", "The \"value\" of an \"older\" condition must be an integer"); + return false; + } + sql.append("< now() - INTERVAL '"); + sql.append(whereClause["value"].GetInt()); + sql.append(" seconds'"); + } + else if (!cond.compare("newer")) + { + if (!whereClause["value"].IsInt()) + { + raiseError("where clause", "The \"value\" of an \"newer\" condition must be an integer"); + return false; + } + sql.append("> now() - INTERVAL '"); + sql.append(whereClause["value"].GetInt()); + sql.append(" seconds'"); + } + else if (!cond.compare("in") || !cond.compare("not in")) + { + // Check we have a non empty array + if (whereClause["value"].IsArray() && + whereClause["value"].Size()) + { + sql.append(cond); + sql.append(" ( "); + int field = 0; + for (Value::ConstValueIterator itr = whereClause["value"].Begin(); + itr != whereClause["value"].End(); + ++itr) { - if (itr->IsInt()) + if (field) { - sql.append(itr->GetInt()); + sql.append(", "); + } + field++; + if (itr->IsNumber()) + { + if (itr->IsInt()) + { + sql.append(itr->GetInt()); + } + else if (itr->IsInt64()) + { + sql.append((long)itr->GetInt64()); + } + else + { + sql.append(itr->GetDouble()); + } } - else if (itr->IsInt64()) + else if (itr->IsString()) { - sql.append((long)itr->GetInt64()); + sql.append('\''); + sql.append(escape(itr->GetString())); + sql.append('\''); } else { - sql.append(itr->GetDouble()); + string message("The \"value\" of a \"" + \ + cond + \ + "\" condition array element must be " \ + "a string, integer or double."); + raiseError("where clause", message.c_str()); + return false; } } - else if (itr->IsString()) - { - sql.append('\''); - sql.append(escape(itr->GetString())); - sql.append('\''); - } - else - { - string message("The \"value\" of a \"" + \ - cond + \ - "\" condition array element must be " \ - "a string, integer or double."); - raiseError("where clause", message.c_str()); - return false; - } + sql.append(" )"); + } + else + { + string message("The \"value\" of a \"" + \ + cond + "\" condition must be an array " \ + "and must not be empty."); + raiseError("where clause", message.c_str()); + return false; } - sql.append(" )"); } else { - string message("The \"value\" of a \"" + \ - cond + "\" condition must be an array " \ - "and must not be empty."); - raiseError("where clause", message.c_str()); - return false; - } - } - else - { - sql.append(cond); - sql.append(' '); - if (whereClause["value"].IsInt()) - { - sql.append(whereClause["value"].GetInt()); - } else if (whereClause["value"].IsString()) - { - sql.append('\''); - sql.append(escape(whereClause["value"].GetString())); - sql.append('\''); + sql.append(cond); + sql.append(' '); + if (whereClause["value"].IsInt()) + { + sql.append(whereClause["value"].GetInt()); + } else if (whereClause["value"].IsString()) + { + sql.append('\''); + sql.append(escape(whereClause["value"].GetString())); + sql.append('\''); + } } } diff --git a/C/plugins/storage/sqlite/common/connection.cpp b/C/plugins/storage/sqlite/common/connection.cpp index 3c2d4bc5ca..c4c81c76a2 100644 --- a/C/plugins/storage/sqlite/common/connection.cpp +++ b/C/plugins/storage/sqlite/common/connection.cpp @@ -1473,6 +1473,11 @@ vector asset_codes; sql.append(escape(buffer.GetString())); sql.append('\''); } + // Handle JSON value null: "item" : null + else if (itr->value.IsNull()) + { + sql.append("NULL"); + } col++; } } diff --git a/C/plugins/storage/sqlitelb/common/connection.cpp b/C/plugins/storage/sqlitelb/common/connection.cpp index dc4f5cfca1..1546dbdf94 100644 --- a/C/plugins/storage/sqlitelb/common/connection.cpp +++ b/C/plugins/storage/sqlitelb/common/connection.cpp @@ -1398,6 +1398,11 @@ SQLBuffer sql; sql.append(escape(buffer.GetString())); sql.append('\''); } + // Handle JSON value null: "item" : null + else if (itr->value.IsNull()) + { + sql.append("NULL"); + } col++; } } @@ -2513,7 +2518,8 @@ bool Connection::jsonModifiers(const Value& payload, * */ bool Connection::jsonWhereClause(const Value& whereClause, - SQLBuffer& sql, bool convertLocaltime) + SQLBuffer& sql, + bool convertLocaltime) { if (!whereClause.IsObject()) { @@ -2530,118 +2536,131 @@ bool Connection::jsonWhereClause(const Value& whereClause, raiseError("where clause", "The \"where\" object is missing a \"condition\" property"); return false; } - if (!whereClause.HasMember("value")) - { - raiseError("where clause", - "The \"where\" object is missing a \"value\" property"); - return false; - } sql.append(whereClause["column"].GetString()); sql.append(' '); string cond = whereClause["condition"].GetString(); - if (!cond.compare("older")) + + if (cond.compare("isnull") == 0) { - if (!whereClause["value"].IsInt()) - { - raiseError("where clause", - "The \"value\" of an \"older\" condition must be an integer"); - return false; - } - sql.append("< datetime('now', '-"); - sql.append(whereClause["value"].GetInt()); - if (convertLocaltime) - sql.append(" seconds', 'localtime')"); // Get value in localtime - else - sql.append(" seconds')"); // Get value in UTC by asking for no timezone + sql.append("isnull "); } - else if (!cond.compare("newer")) + else if (cond.compare("notnull") == 0) { - if (!whereClause["value"].IsInt()) + sql.append("notnull "); + } + else + { + if (!whereClause.HasMember("value")) { raiseError("where clause", - "The \"value\" of an \"newer\" condition must be an integer"); + "The \"where\" object is missing a \"value\" property"); return false; } - sql.append("> datetime('now', '-"); - sql.append(whereClause["value"].GetInt()); - if (convertLocaltime) - sql.append(" seconds', 'localtime')"); // Get value in localtime - else - sql.append(" seconds')"); // Get value in UTC by asking for no timezone - } - else if (!cond.compare("in") || !cond.compare("not in")) - { - // Check we have a non empty array - if (whereClause["value"].IsArray() && - whereClause["value"].Size()) + + if (!cond.compare("older")) { - sql.append(cond); - sql.append(" ( "); - int field = 0; - for (Value::ConstValueIterator itr = whereClause["value"].Begin(); - itr != whereClause["value"].End(); - ++itr) + if (!whereClause["value"].IsInt()) { - if (field) - { - sql.append(", "); - } - field++; - if (itr->IsNumber()) + raiseError("where clause", + "The \"value\" of an \"older\" condition must be an integer"); + return false; + } + sql.append("< datetime('now', '-"); + sql.append(whereClause["value"].GetInt()); + if (convertLocaltime) + sql.append(" seconds', 'localtime')"); // Get value in localtime + else + sql.append(" seconds')"); // Get value in UTC by asking for no timezone + } + else if (!cond.compare("newer")) + { + if (!whereClause["value"].IsInt()) + { + raiseError("where clause", + "The \"value\" of an \"newer\" condition must be an integer"); + return false; + } + sql.append("> datetime('now', '-"); + sql.append(whereClause["value"].GetInt()); + if (convertLocaltime) + sql.append(" seconds', 'localtime')"); // Get value in localtime + else + sql.append(" seconds')"); // Get value in UTC by asking for no timezone + } + else if (!cond.compare("in") || !cond.compare("not in")) + { + // Check we have a non empty array + if (whereClause["value"].IsArray() && + whereClause["value"].Size()) + { + sql.append(cond); + sql.append(" ( "); + int field = 0; + for (Value::ConstValueIterator itr = whereClause["value"].Begin(); + itr != whereClause["value"].End(); + ++itr) { - if (itr->IsInt()) + if (field) { - sql.append(itr->GetInt()); + sql.append(", "); } - else if (itr->IsInt64()) + field++; + if (itr->IsNumber()) { - sql.append((long)itr->GetInt64()); + if (itr->IsInt()) + { + sql.append(itr->GetInt()); + } + else if (itr->IsInt64()) + { + sql.append((long)itr->GetInt64()); + } + else + { + sql.append(itr->GetDouble()); + } + } + else if (itr->IsString()) + { + sql.append('\''); + sql.append(escape(itr->GetString())); + sql.append('\''); } else { - sql.append(itr->GetDouble()); + string message("The \"value\" of a \"" + \ + cond + \ + "\" condition array element must be " \ + "a string, integer or double."); + raiseError("where clause", message.c_str()); + return false; } } - else if (itr->IsString()) - { - sql.append('\''); - sql.append(escape(itr->GetString())); - sql.append('\''); - } - else - { - string message("The \"value\" of a \"" + \ - cond + \ - "\" condition array element must be " \ - "a string, integer or double."); - raiseError("where clause", message.c_str()); - return false; - } + sql.append(" )"); + } + else + { + string message("The \"value\" of a \"" + \ + cond + "\" condition must be an array " \ + "and must not be empty."); + raiseError("where clause", message.c_str()); + return false; } - sql.append(" )"); } else { - string message("The \"value\" of a \"" + \ - cond + "\" condition must be an array " \ - "and must not be empty."); - raiseError("where clause", message.c_str()); - return false; - } - } - else - { - sql.append(cond); - sql.append(' '); - if (whereClause["value"].IsInt()) - { - sql.append(whereClause["value"].GetInt()); - } else if (whereClause["value"].IsString()) - { - sql.append('\''); - sql.append(escape(whereClause["value"].GetString())); - sql.append('\''); + sql.append(cond); + sql.append(' '); + if (whereClause["value"].IsInt()) + { + sql.append(whereClause["value"].GetInt()); + } else if (whereClause["value"].IsString()) + { + sql.append('\''); + sql.append(escape(whereClause["value"].GetString())); + sql.append('\''); + } } } diff --git a/C/services/south/include/ingest.h b/C/services/south/include/ingest.h index a552a31c54..69f08d0a47 100644 --- a/C/services/south/include/ingest.h +++ b/C/services/south/include/ingest.h @@ -67,8 +67,11 @@ class Ingest : public ServiceHandler { void setThreshold(const unsigned int threshold) { m_queueSizeThreshold = threshold; }; void configChange(const std::string&, const std::string&); void configChildCreate(const std::string& , const std::string&, const std::string&){}; - void configChildDelete(const std::string& , const std::string&){}; + void configChildDelete(const std::string& , const std::string&){}; void shutdown() {}; // Satisfy ServiceHandler + void unDeprecateAssetTrackingRecord(AssetTrackingTuple* currentTuple, + const std::string& assetName, + const std::string& event); private: void signalStatsUpdate() { diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index d72c2f4548..0ea209c238 100755 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -268,9 +268,8 @@ Ingest::Ingest(StorageClient& storage, m_data = NULL; m_discardedReadings = 0; m_highLatency = false; - + // populate asset tracking cache - //m_assetTracker = new AssetTracker(m_mgtClient); AssetTracker::getAssetTracker()->populateAssetTrackingCache(m_pluginName, "Ingest"); // Create the stats entry for the service @@ -500,6 +499,7 @@ void Ingest::processQueue() m_failCnt = 0; std::map statsEntriesCurrQueue; AssetTracker *tracker = AssetTracker::getAssetTracker(); + string lastAsset = ""; int *lastStat = NULL; for (vector::iterator it = q->begin(); @@ -509,11 +509,25 @@ void Ingest::processQueue() string assetName = reading->getAssetName(); if (lastAsset.compare(assetName)) { - AssetTrackingTuple tuple(m_serviceName, m_pluginName, assetName, "Ingest"); - if (!tracker->checkAssetTrackingCache(tuple)) + AssetTrackingTuple tuple(m_serviceName, + m_pluginName, + assetName, + "Ingest"); + + // Check Asset record exists + AssetTrackingTuple* res = tracker->findAssetTrackingCache(tuple); + if (res == NULL) { + // Record non in cache, add it tracker->addAssetTrackingTuple(tuple); } + else + { + // Possibly Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, + assetName, + "Ingest"); + } lastAsset = assetName; lastStat = &(statsEntriesCurrQueue[assetName]); (*lastStat)++; @@ -660,7 +674,8 @@ void Ingest::processQueue() // check if this requires addition of a new asset tracker tuple // Remove the Readings in the vector AssetTracker *tracker = AssetTracker::getAssetTracker(); - string lastAsset = ""; + + string lastAsset; int *lastStat = NULL; for (vector::iterator it = m_data->begin(); it != m_data->end(); ++it) { @@ -668,11 +683,25 @@ void Ingest::processQueue() string assetName = reading->getAssetName(); if (lastAsset.compare(assetName)) { - AssetTrackingTuple tuple(m_serviceName, m_pluginName, assetName, "Ingest"); - if (!tracker->checkAssetTrackingCache(tuple)) + AssetTrackingTuple tuple(m_serviceName, + m_pluginName, + assetName, + "Ingest"); + + // Check Asset record exists + AssetTrackingTuple* res = tracker->findAssetTrackingCache(tuple); + if (res == NULL) { + // Record not in cache, add it tracker->addAssetTrackingTuple(tuple); } + else + { + // Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, + assetName, + "Ingest"); + } lastAsset = assetName; lastStat = &statsEntriesCurrQueue[assetName]; (*lastStat)++; @@ -896,3 +925,76 @@ size_t Ingest::queueLength() return len; } + +/** + * Load an up-to-date AssetTracking record for the given parameters + * and un-deprecate AssetTracking record it has been found as deprecated + * Existing cache element is updated + * + * @param currentTuple Current AssetTracking record for given assetName + * @param assetName AssetName to fetch from AssetTracking + * @param event The event type to fetch + */ +void Ingest::unDeprecateAssetTrackingRecord(AssetTrackingTuple* currentTuple, + const string& assetName, + const string& event) +{ + // Get up-to-date Asset Tracking record + AssetTrackingTuple* updatedTuple = + m_mgtClient->getAssetTrackingTuple( + m_serviceName, + assetName, + event); + + if (updatedTuple) + { + if (updatedTuple->isDeprecated()) + { + // Update un-deprecated state in cached object + currentTuple->unDeprecate(); + + m_logger->debug("Asset '%s' is being un-deprecated", + assetName.c_str()); + + // Prepare UPDATE query + const Condition conditionParams(Equals); + Where * wAsset = new Where("asset", + conditionParams, + assetName); + Where *wService = new Where("service", + conditionParams, + m_serviceName, + wAsset); + Where *wEvent = new Where("event", + conditionParams, + event, + wService); + + InsertValues unDeprecated; + + // Set NULL value + unDeprecated.push_back(InsertValue("deprecated_ts")); + + // Update storage with NULL value + int rv = m_storage.updateTable("asset_tracker", + unDeprecated, + *wEvent); + + // Check update operation + if (rv < 0) + { + m_logger->error("Failure while un-deprecating asset '%s'", + assetName.c_str()); + } + } + } + else + { + m_logger->error("Failure to get AssetTracking record " + "for service '%s', asset '%s'", + m_serviceName.c_str(), + assetName.c_str()); + } + + delete updatedTuple; +} diff --git a/VERSION b/VERSION index c5b641f4c8..780f614317 100755 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ fledge_version=1.9.2 -fledge_schema=52 +fledge_schema=53 diff --git a/python/fledge/services/core/api/asset_tracker.py b/python/fledge/services/core/api/asset_tracker.py index 9c5df57a08..105c924e44 100644 --- a/python/fledge/services/core/api/asset_tracker.py +++ b/python/fledge/services/core/api/asset_tracker.py @@ -3,10 +3,13 @@ # FLEDGE_BEGIN # See: http://fledge-iot.readthedocs.io/ # FLEDGE_END +import json from aiohttp import web import urllib.parse +from fledge.common import utils as common_utils +from fledge.common.storage_client.exceptions import StorageServerError from fledge.common.storage_client.payload_builder import PayloadBuilder from fledge.services.core import connect @@ -16,13 +19,14 @@ __version__ = "${VERSION}" _help = """ - ------------------------------------------------------------------------------- - | GET | /fledge/track | - ------------------------------------------------------------------------------- + ----------------------------------------------------------------------------------------- + | GET | /fledge/track | + | PUT | /fledge/track/service/{service}/asset/{asset}/event/{event} | + ----------------------------------------------------------------------------------------- """ -async def get_asset_tracker_events(request): +async def get_asset_tracker_events(request: web.Request) -> web.Response: """ Args: request: @@ -31,13 +35,15 @@ async def get_asset_tracker_events(request): asset track records :Example: - curl -X GET http://localhost:8081/fledge/track - curl -X GET http://localhost:8081/fledge/track?asset=XXX - curl -X GET http://localhost:8081/fledge/track?event=XXX - curl -X GET http://localhost:8081/fledge/track?service=XXX + curl -sX GET http://localhost:8081/fledge/track + curl -sX GET http://localhost:8081/fledge/track?asset=XXX + curl -sX GET http://localhost:8081/fledge/track?event=XXX + curl -sX GET http://localhost:8081/fledge/track?service=XXX + curl -sX GET http://localhost:8081/fledge/track?event=XXX&asset=XXX&service=XXX """ - payload = PayloadBuilder().SELECT("asset", "event", "service", "fledge", "plugin", "ts") \ + payload = PayloadBuilder().SELECT("asset", "event", "service", "fledge", "plugin", "ts", "deprecated_ts") \ .ALIAS("return", ("ts", 'timestamp')).FORMAT("return", ("ts", "YYYY-MM-DD HH24:MI:SS.MS")) \ + .ALIAS("return", ("deprecated_ts", 'deprecatedTimestamp')) \ .WHERE(['1', '=', 1]) if 'asset' in request.query and request.query['asset'] != '': asset = urllib.parse.unquote(request.query['asset']) @@ -55,8 +61,71 @@ async def get_asset_tracker_events(request): result = await storage_client.query_tbl_with_payload('asset_tracker', payload.payload()) response = result['rows'] except KeyError: - raise web.HTTPBadRequest(reason=result['message']) + msg = result['message'] + raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) except Exception as ex: - raise web.HTTPInternalServerError(reason=ex) + msg = str(ex) + raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) + else: + return web.json_response({'track': response}) - return web.json_response({'track': response}) + +async def deprecate_asset_track_entry(request: web.Request) -> web.Response: + """ + Args: + request: + + Returns: + message + + :Example: + curl -sX PUT http://localhost:8081/fledge/track/service/XXX/asset/XXX/event/XXXX + """ + svc_name = request.match_info.get('service', None) + asset_name = request.match_info.get('asset', None) + event_name = request.match_info.get('event', None) + try: + storage_client = connect.get_storage_async() + # TODO: FOGL-6749 Once rows affected with 0 case handled at Storage side then we can remove SELECT call + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE( + ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( + ['event', '=', event_name]).payload() + get_result = await storage_client.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + if response[0]['deprecated_ts'] == "": + # Update deprecated_ts column entry + current_time = common_utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( + ['event', '=', event_name]).AND_WHERE(['deprecated_ts', 'isnull']).payload() + update_result = await storage_client.update_tbl("asset_tracker", update_payload) + if 'response' in update_result: + response = update_result['response'] + if response != 'updated': + raise KeyError('Update failure in asset tracker for service: {} asset: {} event: {}'.format( + svc_name, asset_name, event_name)) + else: + raise StorageServerError + else: + raise KeyError('Asset record already deprecated.') + else: + raise ValueError('No record found in asset tracker for given service: {} asset: {} event: {}'.format( + svc_name, asset_name, event_name)) + else: + raise StorageServerError + except StorageServerError as err: + msg = str(err) + raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": "Storage error: {}".format(msg)})) + except KeyError as err: + msg = str(err) + raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) + except ValueError as err: + msg = str(err) + raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) + except Exception as ex: + msg = str(ex) + raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) + else: + return web.json_response({'success': "Asset record entry has been deprecated."}) diff --git a/python/fledge/services/core/api/filters.py b/python/fledge/services/core/api/filters.py index 618d3506e1..1506a838b9 100644 --- a/python/fledge/services/core/api/filters.py +++ b/python/fledge/services/core/api/filters.py @@ -13,7 +13,7 @@ from fledge.common.configuration_manager import ConfigurationManager from fledge.services.core import connect from fledge.services.core.api import utils as apiutils -from fledge.common import logger +from fledge.common import logger, utils from fledge.common.storage_client.payload_builder import PayloadBuilder from fledge.common.storage_client.exceptions import StorageServerError from fledge.common.storage_client.storage_client import StorageClientAsync @@ -406,6 +406,26 @@ async def delete_filter(request: web.Request) -> web.Response: # Delete configuration for filter await _delete_configuration_category(storage, filter_name) + + # Update deprecated timestamp in asset_tracker + """ + TODO: FOGL-6749 + Once rows affected with 0 case handled at Storage side + then we will need to update the query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + Also added SELECT query before UPDATE to avoid BadCase when there is no asset track entry exists for the filter. + This should also be removed when given JIRA is fixed. + """ + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE(['plugin', '=', filter_name]).payload() + get_result = await storage.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # AND_WHERE(['deprecated_ts', 'isnull']) once FOGL-6749 is done + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['plugin', '=', filter_name]).payload() + await storage.update_tbl("asset_tracker", update_payload) except StorageServerError as ex: _LOGGER.exception("Delete filter: %s, caught exception: %s", filter_name, str(ex.error)) raise web.HTTPInternalServerError(reason=str(ex.error)) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index ddb65e9730..9a58eac121 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -169,27 +169,52 @@ async def delete_service(request): except service_registry_exceptions.DoesNotExist: pass + # Delete streams and plugin data await delete_streams(storage, svc) await delete_plugin_data(storage, svc) # Delete schedule await server.Server.scheduler.delete_schedule(sch_id) + + # Update deprecated timestamp in asset_tracker + await update_deprecated_ts_in_asset_tracker(storage, svc) except Exception as ex: raise web.HTTPInternalServerError(reason=str(ex)) else: return web.json_response({'result': 'Service {} deleted successfully.'.format(svc)}) -async def delete_streams(storage, north_instance): - payload = PayloadBuilder().WHERE(["description", "=", north_instance]).payload() +async def delete_streams(storage, svc): + payload = PayloadBuilder().WHERE(["description", "=", svc]).payload() await storage.delete_from_tbl("streams", payload) -async def delete_plugin_data(storage, north_instance): - payload = PayloadBuilder().WHERE(["key", "like", north_instance + "%"]).payload() +async def delete_plugin_data(storage, svc): + payload = PayloadBuilder().WHERE(["key", "like", svc + "%"]).payload() await storage.delete_from_tbl("plugin_data", payload) +async def update_deprecated_ts_in_asset_tracker(storage, svc): + """ + TODO: FOGL-6749 + Once rows affected with 0 case handled at Storage side + then we will need to update the query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + Also added SELECT query before UPDATE to avoid BadCase when there is no asset track entry exists for the instance. + This should also be removed when given JIRA is fixed. + """ + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE(['service', '=', svc]).payload() + get_result = await storage.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # AND_WHERE(['deprecated_ts', 'isnull']) once FOGL-6749 is done + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', svc]).payload() + await storage.update_tbl("asset_tracker", update_payload) + + async def add_service(request): """ Create a new service to run a specific plugin diff --git a/python/fledge/services/core/api/south.py b/python/fledge/services/core/api/south.py index d658566215..36a01d3f43 100644 --- a/python/fledge/services/core/api/south.py +++ b/python/fledge/services/core/api/south.py @@ -116,7 +116,7 @@ async def _get_tracked_plugin_assets_and_readings(storage_client, cf_mgr, svc_na plugin_value = await cf_mgr.get_category_item(svc_name, 'plugin') plugin = plugin_value['value'] if plugin_value is not None else '' payload = PayloadBuilder().SELECT(["asset", "plugin"]).WHERE(['service', '=', svc_name]).AND_WHERE( - ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).payload() + ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).AND_WHERE(['deprecated_ts', 'isnull']).payload() try: result = await storage_client.query_tbl_with_payload('asset_tracker', payload) # TODO: FOGL-2549 diff --git a/python/fledge/services/core/api/task.py b/python/fledge/services/core/api/task.py index f042939123..819518d736 100644 --- a/python/fledge/services/core/api/task.py +++ b/python/fledge/services/core/api/task.py @@ -319,11 +319,12 @@ async def delete_task(request): config_mgr = ConfigurationManager(storage) await config_mgr.delete_category_and_children_recursively(north_instance) - # delete statistics key + # delete statistics key, streams, plugin data await delete_statistics_key(storage, north_instance) - await delete_streams(storage, north_instance) await delete_plugin_data(storage, north_instance) + # update deprecated timestamp in asset_tracker + await update_deprecated_ts_in_asset_tracker(storage, north_instance) except Exception as ex: raise web.HTTPInternalServerError(reason=ex) @@ -358,10 +359,33 @@ async def delete_task_entry_with_schedule_id(storage, sch_id): payload = PayloadBuilder().WHERE(["schedule_id", "=", str(sch_id)]).payload() await storage.delete_from_tbl("tasks", payload) + async def delete_streams(storage, north_instance): payload = PayloadBuilder().WHERE(["description", "=", north_instance]).payload() await storage.delete_from_tbl("streams", payload) + async def delete_plugin_data(storage, north_instance): payload = PayloadBuilder().WHERE(["key", "like", north_instance + "%"]).payload() await storage.delete_from_tbl("plugin_data", payload) + + +async def update_deprecated_ts_in_asset_tracker(storage, north_instance): + """ + TODO: FOGL-6749 + Once rows affected with 0 case handled at Storage side + then we will need to update the query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + Also added SELECT query before UPDATE to avoid BadCase when there is no asset track entry exists for the instance. + This should also be removed when given JIRA is fixed. + """ + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE(['service', '=', north_instance]).payload() + get_result = await storage.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # AND_WHERE(['deprecated_ts', 'isnull']) once FOGL-6749 is done + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', north_instance]).payload() + await storage.update_tbl("asset_tracker", update_payload) diff --git a/python/fledge/services/core/routes.py b/python/fledge/services/core/routes.py index 52a5919fb0..a0b1466aaa 100644 --- a/python/fledge/services/core/routes.py +++ b/python/fledge/services/core/routes.py @@ -135,7 +135,9 @@ def setup(app): browser.setup(app) # asset tracker - app.router.add_route('GET', '/fledge/track', asset_tracker.get_asset_tracker_events) + app.router.add_route('GET', '/fledge/track', asset_tracker.get_asset_tracker_events) + app.router.add_route('PUT', '/fledge/track/service/{service}/asset/{asset}/event/{event}', + asset_tracker.deprecate_asset_track_entry) # Statistics - As per doc app.router.add_route('GET', '/fledge/statistics', api_statistics.get_statistics) diff --git a/scripts/plugins/storage/postgres/downgrade/52.sql b/scripts/plugins/storage/postgres/downgrade/52.sql new file mode 100644 index 0000000000..1ee0dc809d --- /dev/null +++ b/scripts/plugins/storage/postgres/downgrade/52.sql @@ -0,0 +1,2 @@ +--Remove deprecated_ts column from asset_tracker table +ALTER TABLE fledge.asset_tracker DROP COLUMN IF EXISTS deprecated_ts; \ No newline at end of file diff --git a/scripts/plugins/storage/postgres/init.sql b/scripts/plugins/storage/postgres/init.sql index 00c31702a1..9b25883554 100644 --- a/scripts/plugins/storage/postgres/init.sql +++ b/scripts/plugins/storage/postgres/init.sql @@ -766,13 +766,15 @@ CREATE UNIQUE INDEX config_children_ix1 ON fledge.category_children(parent, chil -- Create the asset_tracker table CREATE TABLE fledge.asset_tracker ( - id integer NOT NULL DEFAULT nextval('fledge.asset_tracker_id_seq'::regclass), - asset character(255) NOT NULL, - event character varying(50) NOT NULL, - service character varying(255) NOT NULL, - fledge character varying(50) NOT NULL, - plugin character varying(50) NOT NULL, - ts timestamp(6) with time zone NOT NULL DEFAULT now() ); + id integer NOT NULL DEFAULT nextval('fledge.asset_tracker_id_seq'::regclass), + asset character(255) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + deprecated_ts timestamp(6) with time zone , -- When an asset record is removed then time will be set else empty and that mean entry has not been deprecated + ts timestamp(6) with time zone NOT NULL DEFAULT now() +); CREATE INDEX asset_tracker_ix1 ON fledge.asset_tracker USING btree (asset); CREATE INDEX asset_tracker_ix2 ON fledge.asset_tracker USING btree (service); diff --git a/scripts/plugins/storage/postgres/upgrade/53.sql b/scripts/plugins/storage/postgres/upgrade/53.sql new file mode 100644 index 0000000000..e436c99a09 --- /dev/null +++ b/scripts/plugins/storage/postgres/upgrade/53.sql @@ -0,0 +1,3 @@ +-- Add new column name 'deprecated_ts' for asset_tracker + +ALTER TABLE fledge.asset_tracker ADD COLUMN deprecated_ts timestamp(6) with time zone; diff --git a/scripts/plugins/storage/sqlite/downgrade/52.sql b/scripts/plugins/storage/sqlite/downgrade/52.sql new file mode 100644 index 0000000000..caa33f35e7 --- /dev/null +++ b/scripts/plugins/storage/sqlite/downgrade/52.sql @@ -0,0 +1,36 @@ +-- From: http://www.sqlite.org/faq.html: +-- SQLite has limited ALTER TABLE support that you can use to change type of column. +-- If you want to change the type of any column you will have to recreate the table. +-- You can save existing data to a temporary table and then drop the old table +-- Now, create the new table, then copy the data back in from the temporary table + + +-- Remove deprecated_ts column in fledge.asset_tracker + +-- Drop existing index +DROP INDEX IF EXISTS asset_tracker_ix1; +DROP INDEX IF EXISTS asset_tracker_ix2; + +-- Rename existing table into a temp one +ALTER TABLE fledge.asset_tracker RENAME TO asset_tracker_old; + +-- Create new table +CREATE TABLE IF NOT EXISTS fledge.asset_tracker ( + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); + +-- Copy data +INSERT INTO fledge.asset_tracker ( id, asset, event, service, fledge, plugin, ts ) SELECT id, asset, event, service, fledge, plugin, ts FROM fledge.asset_tracker_old; + +-- Create Index +CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); +CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); + +-- Remote old table +DROP TABLE IF EXISTS fledge.asset_tracker_old; diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index 2fb5e447af..eaf68e3809 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -559,13 +559,15 @@ CREATE UNIQUE INDEX config_children_idx1 -- Create the asset_tracker table CREATE TABLE fledge.asset_tracker ( - id integer PRIMARY KEY AUTOINCREMENT, - asset character(50) NOT NULL, - event character varying(50) NOT NULL, - service character varying(255) NOT NULL, - fledge character varying(50) NOT NULL, - plugin character varying(50) NOT NULL, - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) ); + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + deprecated_ts DATETIME , -- When an asset record is removed then time will be set else empty and that mean entry has not been deprecated + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); diff --git a/scripts/plugins/storage/sqlite/upgrade/53.sql b/scripts/plugins/storage/sqlite/upgrade/53.sql new file mode 100644 index 0000000000..939be8d200 --- /dev/null +++ b/scripts/plugins/storage/sqlite/upgrade/53.sql @@ -0,0 +1,3 @@ +-- Add new column name 'deprecated_ts' for asset_tracker + +ALTER TABLE fledge.asset_tracker ADD COLUMN deprecated_ts DATETIME; diff --git a/scripts/plugins/storage/sqlitelb/downgrade/52.sql b/scripts/plugins/storage/sqlitelb/downgrade/52.sql new file mode 100644 index 0000000000..caa33f35e7 --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/downgrade/52.sql @@ -0,0 +1,36 @@ +-- From: http://www.sqlite.org/faq.html: +-- SQLite has limited ALTER TABLE support that you can use to change type of column. +-- If you want to change the type of any column you will have to recreate the table. +-- You can save existing data to a temporary table and then drop the old table +-- Now, create the new table, then copy the data back in from the temporary table + + +-- Remove deprecated_ts column in fledge.asset_tracker + +-- Drop existing index +DROP INDEX IF EXISTS asset_tracker_ix1; +DROP INDEX IF EXISTS asset_tracker_ix2; + +-- Rename existing table into a temp one +ALTER TABLE fledge.asset_tracker RENAME TO asset_tracker_old; + +-- Create new table +CREATE TABLE IF NOT EXISTS fledge.asset_tracker ( + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); + +-- Copy data +INSERT INTO fledge.asset_tracker ( id, asset, event, service, fledge, plugin, ts ) SELECT id, asset, event, service, fledge, plugin, ts FROM fledge.asset_tracker_old; + +-- Create Index +CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); +CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); + +-- Remote old table +DROP TABLE IF EXISTS fledge.asset_tracker_old; diff --git a/scripts/plugins/storage/sqlitelb/init.sql b/scripts/plugins/storage/sqlitelb/init.sql index 1b89f49da0..e44c77c3ae 100644 --- a/scripts/plugins/storage/sqlitelb/init.sql +++ b/scripts/plugins/storage/sqlitelb/init.sql @@ -559,13 +559,15 @@ CREATE UNIQUE INDEX config_children_idx1 -- Create the asset_tracker table CREATE TABLE fledge.asset_tracker ( - id integer PRIMARY KEY AUTOINCREMENT, - asset character(50) NOT NULL, - event character varying(50) NOT NULL, - service character varying(255) NOT NULL, - fledge character varying(50) NOT NULL, - plugin character varying(50) NOT NULL, - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) ); + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + deprecated_ts DATETIME , -- When an asset record is removed then time will be set else empty and that mean entry has not been deprecated + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); diff --git a/scripts/plugins/storage/sqlitelb/upgrade/53.sql b/scripts/plugins/storage/sqlitelb/upgrade/53.sql new file mode 100644 index 0000000000..939be8d200 --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/upgrade/53.sql @@ -0,0 +1,3 @@ +-- Add new column name 'deprecated_ts' for asset_tracker + +ALTER TABLE fledge.asset_tracker ADD COLUMN deprecated_ts DATETIME; diff --git a/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py b/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py index 2f8ced4716..1b60fdb022 100644 --- a/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py +++ b/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py @@ -41,15 +41,23 @@ async def async_mock(): return {"rows": rows, 'count': 1} storage_client_mock = MagicMock(StorageClientAsync) - rows = [{'asset': 'AirIntake', 'event': 'Ingest', 'fledge': 'Booth1', 'service': 'PT100_In1', 'plugin': 'PT100', "timestamp": "2018-08-13 15:39:48.796263"}, - {'asset': 'AirIntake', 'event': 'Egress', 'fledge': 'Booth1', 'service': 'Display', 'plugin': 'ShopFloorDisplay', "timestamp": "2018-08-13 16:00:00.134563"}] - payload = {'where': {'condition': '=', 'value': 1, 'column': '1'}, 'return': ['asset', 'event', 'service', 'fledge', 'plugin', {'alias': 'timestamp', 'column': 'ts', 'format': 'YYYY-MM-DD HH24:MI:SS.MS'}]} + rows = [{'asset': 'AirIntake', 'event': 'Ingest', 'fledge': 'Booth1', 'service': 'PT100_In1', + 'plugin': 'PT100', "timestamp": "2018-08-13 15:39:48.796263", "deprecatedTimestamp": "" + }, + {'asset': 'AirIntake', 'event': 'Egress', 'fledge': 'Booth1', 'service': 'Display', + 'plugin': 'ShopFloorDisplay', "timestamp": "2018-08-13 16:00:00.134563", "deprecatedTimestamp": "" + } + ] + payload = {'where': {'condition': '=', 'value': 1, 'column': '1'}, + 'return': ['asset', 'event', 'service', 'fledge', 'plugin', + {'alias': 'timestamp', 'column': 'ts', 'format': 'YYYY-MM-DD HH24:MI:SS.MS'}, + {'alias': 'deprecatedTimestamp', 'column': 'deprecated_ts'} + ] + } # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. - if sys.version_info.major == 3 and sys.version_info.minor >= 8: - _rv = await async_mock() - else: - _rv = asyncio.ensure_future(async_mock()) + _rv = await async_mock() if sys.version_info.major == 3 and sys.version_info.minor >= 8 \ + else asyncio.ensure_future(async_mock()) with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(storage_client_mock, 'query_tbl_with_payload', return_value=_rv) as patch_query_payload: diff --git a/tests/unit/python/fledge/services/core/api/test_filters.py b/tests/unit/python/fledge/services/core/api/test_filters.py index a322b6d370..5eff7d9180 100644 --- a/tests/unit/python/fledge/services/core/api/test_filters.py +++ b/tests/unit/python/fledge/services/core/api/test_filters.py @@ -430,27 +430,38 @@ def q_result(*args): assert {"where": {"column": "name", "condition": "=", "value": filter_name}} == json.loads(payload) return {'count': 0, 'rows': []} + if table == 'asset_tracker': + assert {"return": ["deprecated_ts"], + "where": {"column": "plugin", "condition": "=", "value": filter_name}} == json.loads(payload) + return {'count': 1, 'rows': [{'deprecated_ts': ''}]} + filter_name = "AssetFilter" delete_result = {'response': 'deleted', 'rows_affected': 1} + update_result = {'rows_affected': 1, "response": "updated"} storage_client_mock = MagicMock(StorageClientAsync) # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: _rv1 = await self.async_mock(None) _rv2 = await self.async_mock(delete_result) + _rv3 = await self.async_mock(update_result) else: _rv1 = asyncio.ensure_future(self.async_mock(None)) _rv2 = asyncio.ensure_future(self.async_mock(delete_result)) - + _rv3 = asyncio.ensure_future(self.async_mock(update_result)) + with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): with patch.object(storage_client_mock, 'delete_from_tbl', return_value=_rv2) as delete_tbl_patch: with patch.object(filters, '_delete_configuration_category', return_value=_rv1) as delete_cfg_patch: - resp = await client.delete('/fledge/filter/{}'.format(filter_name)) - assert 200 == resp.status - r = await resp.text() - json_response = json.loads(r) - assert {'result': 'Filter AssetFilter deleted successfully'} == json_response + with patch.object(storage_client_mock, 'update_tbl', return_value=_rv3) as update_tbl_patch: + resp = await client.delete('/fledge/filter/{}'.format(filter_name)) + assert 200 == resp.status + r = await resp.text() + json_response = json.loads(r) + assert {'result': 'Filter AssetFilter deleted successfully'} == json_response + args, kwargs = update_tbl_patch.call_args + assert 'asset_tracker' == args[0] args, kwargs = delete_cfg_patch.call_args assert filter_name == args[1] delete_tbl_patch.assert_called_once_with('filters', '{"where": {"column": "name", "condition": "=", "value": "AssetFilter"}}') diff --git a/tests/unit/python/fledge/services/core/api/test_service.py b/tests/unit/python/fledge/services/core/api/test_service.py index 63ad5cdb76..794cbd03f5 100644 --- a/tests/unit/python/fledge/services/core/api/test_service.py +++ b/tests/unit/python/fledge/services/core/api/test_service.py @@ -747,24 +747,33 @@ async def mock_result(): }, ] } - + + delete_result = {'response': 'deleted', 'rows_affected': 1} + update_result = {'rows_affected': 1, "response": "updated"} + # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: - _rv = await mock_result() + _rv1 = await mock_result() + _rv3 = await self.async_mock(delete_result) + _rv4 = await self.async_mock(update_result) else: - _rv = asyncio.ensure_future(mock_result()) + _rv1 = asyncio.ensure_future(mock_result()) + _rv3 = asyncio.ensure_future(self.async_mock(delete_result)) + _rv4 = asyncio.ensure_future(self.async_mock(update_result)) _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) - mocker.patch.object(connect, 'get_storage_async') - get_schedule = mocker.patch.object(service, "get_schedule", return_value=_rv) + get_schedule = mocker.patch.object(service, "get_schedule", return_value=_rv1) scheduler = mocker.patch.object(server.Server, "scheduler", MagicMock()) delete_schedule = mocker.patch.object(scheduler, "delete_schedule", return_value=_rv2) disable_schedule = mocker.patch.object(scheduler, "disable_schedule", return_value=_rv2) - delete_configuration = mocker.patch.object(ConfigurationManager, "delete_category_and_children_recursively", return_value=_rv2) + delete_configuration = mocker.patch.object(ConfigurationManager, "delete_category_and_children_recursively", + return_value=_rv2) get_registry = mocker.patch.object(ServiceRegistry, 'get', return_value=mock_registry) remove_registry = mocker.patch.object(ServiceRegistry, 'remove_from_registry') - delete_streams = mocker.patch.object(service, "delete_streams", return_value=_rv) - delete_plugin_data = mocker.patch.object(service, "delete_plugin_data", return_value=_rv) + delete_streams = mocker.patch.object(service, "delete_streams", return_value=_rv3) + delete_plugin_data = mocker.patch.object(service, "delete_plugin_data", return_value=_rv3) + update_deprecated_ts_in_asset_tracker = mocker.patch.object(service, "update_deprecated_ts_in_asset_tracker", + return_value=_rv4) mock_registry[0]._status = ServiceRecord.Status.Shutdown @@ -805,6 +814,10 @@ async def mock_result(): args, kwargs = delete_plugin_data.call_args_list[0] assert sch_name in args + assert 1 == update_deprecated_ts_in_asset_tracker.call_count + args, kwargs = update_deprecated_ts_in_asset_tracker.call_args_list[0] + assert sch_name in args + async def test_delete_service_exception(self, mocker, client): resp = await client.delete("/fledge/service") assert 405 == resp.status diff --git a/tests/unit/python/fledge/services/core/api/test_task.py b/tests/unit/python/fledge/services/core/api/test_task.py index 35fc1e39dd..f30a4a904a 100644 --- a/tests/unit/python/fledge/services/core/api/test_task.py +++ b/tests/unit/python/fledge/services/core/api/test_task.py @@ -502,14 +502,19 @@ async def mock_result(): ] } + delete_result = {'response': 'deleted', 'rows_affected': 1} + update_result = {'rows_affected': 1, "response": "updated"} + # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: _rv1 = await mock_result() - _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) + _rv3 = await self.async_mock(delete_result) + _rv4 = await self.async_mock(update_result) else: _rv1 = asyncio.ensure_future(mock_result()) - _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) - + _rv3 = asyncio.ensure_future(self.async_mock(delete_result)) + _rv4 = asyncio.ensure_future(self.async_mock(update_result)) + _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) storage_client_mock = MagicMock(StorageClientAsync) mocker.patch.object(connect, 'get_storage_async', storage_client_mock) get_schedule = mocker.patch.object(task, "get_schedule", return_value=_rv1) @@ -520,10 +525,11 @@ async def mock_result(): return_value=_rv2) delete_configuration = mocker.patch.object(ConfigurationManager, "delete_category_and_children_recursively", return_value=_rv2) - delete_statistics_key = mocker.patch.object(task, "delete_statistics_key", return_value=_rv2) - - delete_streams = mocker.patch.object(task, "delete_streams", return_value=_rv2) - delete_plugin_data = mocker.patch.object(task, "delete_plugin_data", return_value=_rv2) + delete_statistics_key = mocker.patch.object(task, "delete_statistics_key", return_value=_rv3) + delete_streams = mocker.patch.object(task, "delete_streams", return_value=_rv3) + delete_plugin_data = mocker.patch.object(task, "delete_plugin_data", return_value=_rv3) + update_deprecated_ts_in_asset_tracker = mocker.patch.object(task, "update_deprecated_ts_in_asset_tracker", + return_value=_rv4) resp = await client.delete("/fledge/scheduled/task/{}".format(sch_name)) assert 200 == resp.status @@ -554,6 +560,18 @@ async def mock_result(): args, kwargs = delete_statistics_key.call_args_list[0] assert sch_name in args + assert 1 == delete_streams.call_count + args, kwargs = delete_streams.call_args_list[0] + assert sch_name in args + + assert 1 == delete_plugin_data.call_count + args, kwargs = delete_plugin_data.call_args_list[0] + assert sch_name in args + + assert 1 == update_deprecated_ts_in_asset_tracker.call_count + args, kwargs = update_deprecated_ts_in_asset_tracker.call_args_list[0] + assert sch_name in args + async def test_delete_task_exception(self, mocker, client): resp = await client.delete("/fledge/scheduled/task") assert 405 == resp.status