From ae2d075cebd73a6790bbc9a872b9732226c36f6a Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Tue, 26 Jul 2022 19:17:04 +0530 Subject: [PATCH 01/26] schema changes in asset_tracker table with migration up/down scripts only for sqlite engine Signed-off-by: ashish-jabble --- VERSION | 2 +- .../plugins/storage/sqlite/downgrade/52.sql | 36 +++++++++++++++++++ scripts/plugins/storage/sqlite/init.sql | 16 +++++---- scripts/plugins/storage/sqlite/upgrade/53.sql | 3 ++ 4 files changed, 49 insertions(+), 8 deletions(-) create mode 100644 scripts/plugins/storage/sqlite/downgrade/52.sql create mode 100644 scripts/plugins/storage/sqlite/upgrade/53.sql diff --git a/VERSION b/VERSION index c5b641f4c..780f61431 100755 --- a/VERSION +++ b/VERSION @@ -1,2 +1,2 @@ fledge_version=1.9.2 -fledge_schema=52 +fledge_schema=53 diff --git a/scripts/plugins/storage/sqlite/downgrade/52.sql b/scripts/plugins/storage/sqlite/downgrade/52.sql new file mode 100644 index 000000000..caa33f35e --- /dev/null +++ b/scripts/plugins/storage/sqlite/downgrade/52.sql @@ -0,0 +1,36 @@ +-- From: http://www.sqlite.org/faq.html: +-- SQLite has limited ALTER TABLE support that you can use to change type of column. +-- If you want to change the type of any column you will have to recreate the table. +-- You can save existing data to a temporary table and then drop the old table +-- Now, create the new table, then copy the data back in from the temporary table + + +-- Remove deprecated_ts column in fledge.asset_tracker + +-- Drop existing index +DROP INDEX IF EXISTS asset_tracker_ix1; +DROP INDEX IF EXISTS asset_tracker_ix2; + +-- Rename existing table into a temp one +ALTER TABLE fledge.asset_tracker RENAME TO asset_tracker_old; + +-- Create new table +CREATE TABLE IF NOT EXISTS fledge.asset_tracker ( + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); + +-- Copy data +INSERT INTO fledge.asset_tracker ( id, asset, event, service, fledge, plugin, ts ) SELECT id, asset, event, service, fledge, plugin, ts FROM fledge.asset_tracker_old; + +-- Create Index +CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); +CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); + +-- Remote old table +DROP TABLE IF EXISTS fledge.asset_tracker_old; diff --git a/scripts/plugins/storage/sqlite/init.sql b/scripts/plugins/storage/sqlite/init.sql index 2fb5e447a..eaf68e380 100644 --- a/scripts/plugins/storage/sqlite/init.sql +++ b/scripts/plugins/storage/sqlite/init.sql @@ -559,13 +559,15 @@ CREATE UNIQUE INDEX config_children_idx1 -- Create the asset_tracker table CREATE TABLE fledge.asset_tracker ( - id integer PRIMARY KEY AUTOINCREMENT, - asset character(50) NOT NULL, - event character varying(50) NOT NULL, - service character varying(255) NOT NULL, - fledge character varying(50) NOT NULL, - plugin character varying(50) NOT NULL, - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) ); + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + deprecated_ts DATETIME , -- When an asset record is removed then time will be set else empty and that mean entry has not been deprecated + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); diff --git a/scripts/plugins/storage/sqlite/upgrade/53.sql b/scripts/plugins/storage/sqlite/upgrade/53.sql new file mode 100644 index 000000000..939be8d20 --- /dev/null +++ b/scripts/plugins/storage/sqlite/upgrade/53.sql @@ -0,0 +1,3 @@ +-- Add new column name 'deprecated_ts' for asset_tracker + +ALTER TABLE fledge.asset_tracker ADD COLUMN deprecated_ts DATETIME; From 5616e1ecd2cd85a758851ea559687a5609c49899 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 12:06:54 +0530 Subject: [PATCH 02/26] New PUT endpoint added for deprecate asset track entry and added this new column in GET track entry Signed-off-by: ashish-jabble --- .../fledge/services/core/api/asset_tracker.py | 88 ++++++++++++++++--- python/fledge/services/core/routes.py | 4 +- 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/python/fledge/services/core/api/asset_tracker.py b/python/fledge/services/core/api/asset_tracker.py index 9c5df57a0..344c2a390 100644 --- a/python/fledge/services/core/api/asset_tracker.py +++ b/python/fledge/services/core/api/asset_tracker.py @@ -3,10 +3,13 @@ # FLEDGE_BEGIN # See: http://fledge-iot.readthedocs.io/ # FLEDGE_END +import json from aiohttp import web import urllib.parse +from fledge.common import utils as common_utils +from fledge.common.storage_client.exceptions import StorageServerError from fledge.common.storage_client.payload_builder import PayloadBuilder from fledge.services.core import connect @@ -16,13 +19,14 @@ __version__ = "${VERSION}" _help = """ - ------------------------------------------------------------------------------- - | GET | /fledge/track | - ------------------------------------------------------------------------------- + ----------------------------------------------------------------------------------------- + | GET | /fledge/track | + | PUT | /fledge/track/service/{service}/asset/{asset}/event/{event} | + ----------------------------------------------------------------------------------------- """ -async def get_asset_tracker_events(request): +async def get_asset_tracker_events(request: web.Request) -> web.Response: """ Args: request: @@ -31,12 +35,13 @@ async def get_asset_tracker_events(request): asset track records :Example: - curl -X GET http://localhost:8081/fledge/track - curl -X GET http://localhost:8081/fledge/track?asset=XXX - curl -X GET http://localhost:8081/fledge/track?event=XXX - curl -X GET http://localhost:8081/fledge/track?service=XXX + curl -sX GET http://localhost:8081/fledge/track + curl -sX GET http://localhost:8081/fledge/track?asset=XXX + curl -sX GET http://localhost:8081/fledge/track?event=XXX + curl -sX GET http://localhost:8081/fledge/track?service=XXX + curl -sX GET http://localhost:8081/fledge/track?event=XXX&asset=XXX&service=XXX """ - payload = PayloadBuilder().SELECT("asset", "event", "service", "fledge", "plugin", "ts") \ + payload = PayloadBuilder().SELECT("asset", "event", "service", "fledge", "plugin", "ts", "deprecated_ts") \ .ALIAS("return", ("ts", 'timestamp')).FORMAT("return", ("ts", "YYYY-MM-DD HH24:MI:SS.MS")) \ .WHERE(['1', '=', 1]) if 'asset' in request.query and request.query['asset'] != '': @@ -55,8 +60,67 @@ async def get_asset_tracker_events(request): result = await storage_client.query_tbl_with_payload('asset_tracker', payload.payload()) response = result['rows'] except KeyError: - raise web.HTTPBadRequest(reason=result['message']) + msg = result['message'] + raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) except Exception as ex: - raise web.HTTPInternalServerError(reason=ex) + msg = str(ex) + raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) + else: + return web.json_response({'track': response}) - return web.json_response({'track': response}) + +async def deprecate_asset_track_entry(request: web.Request) -> web.Response: + """ + Args: + request: + + Returns: + message + + :Example: + curl -sX PUT http://localhost:8081/fledge/track/service/XXX/asset/XXX/event/XXXX + """ + svc_name = request.match_info.get('service', None) + asset_name = request.match_info.get('asset', None) + event_name = request.match_info.get('event', None) + try: + storage_client = connect.get_storage_async() + select_payload = PayloadBuilder().SELECT("service").WHERE( + ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( + ['event', '=', event_name]).payload() + get_result = await storage_client.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # Update deprecated ts column entry + current_time = common_utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( + ['event', '=', event_name]).payload() + update_result = await storage_client.update_tbl("asset_tracker", update_payload) + if 'response' in update_result: + response = update_result['response'] + if response != 'updated': + raise KeyError('Update failure in asset tracker for service: {} asset: {} event: {}'.format( + svc_name, asset_name, event_name)) + else: + raise StorageServerError + else: + raise ValueError('No record found in asset tracker for given service: {} asset: {} event: {}'.format( + svc_name, asset_name, event_name)) + else: + raise StorageServerError + except StorageServerError as err: + msg = str(err) + raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": "Storage error: {}".format(msg)})) + except KeyError as err: + msg = str(err) + raise web.HTTPBadRequest(reason=msg, body=json.dumps({"message": msg})) + except ValueError as err: + msg = str(err) + raise web.HTTPNotFound(reason=msg, body=json.dumps({"message": msg})) + except Exception as ex: + msg = str(ex) + raise web.HTTPInternalServerError(reason=msg, body=json.dumps({"message": msg})) + else: + return web.json_response({'success': "Asset record entry has been deprecated."}) diff --git a/python/fledge/services/core/routes.py b/python/fledge/services/core/routes.py index 52a5919fb..a0b1466aa 100644 --- a/python/fledge/services/core/routes.py +++ b/python/fledge/services/core/routes.py @@ -135,7 +135,9 @@ def setup(app): browser.setup(app) # asset tracker - app.router.add_route('GET', '/fledge/track', asset_tracker.get_asset_tracker_events) + app.router.add_route('GET', '/fledge/track', asset_tracker.get_asset_tracker_events) + app.router.add_route('PUT', '/fledge/track/service/{service}/asset/{asset}/event/{event}', + asset_tracker.deprecate_asset_track_entry) # Statistics - As per doc app.router.add_route('GET', '/fledge/statistics', api_statistics.get_statistics) From 47410939db548d850262eaf438f09ce94ebccf05 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 13:08:01 +0530 Subject: [PATCH 03/26] Get track plugin asset query updated with deprecated_ts in south API Signed-off-by: ashish-jabble --- python/fledge/services/core/api/south.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fledge/services/core/api/south.py b/python/fledge/services/core/api/south.py index d65856621..9d537122d 100644 --- a/python/fledge/services/core/api/south.py +++ b/python/fledge/services/core/api/south.py @@ -116,7 +116,7 @@ async def _get_tracked_plugin_assets_and_readings(storage_client, cf_mgr, svc_na plugin_value = await cf_mgr.get_category_item(svc_name, 'plugin') plugin = plugin_value['value'] if plugin_value is not None else '' payload = PayloadBuilder().SELECT(["asset", "plugin"]).WHERE(['service', '=', svc_name]).AND_WHERE( - ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).payload() + ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).AND_WHERE(['deprecated_ts', '=', ""]).payload() try: result = await storage_client.query_tbl_with_payload('asset_tracker', payload) # TODO: FOGL-2549 From 91bc8e4e372ebfad9808064efa14bb72a91fe26e Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 15:11:42 +0530 Subject: [PATCH 04/26] ALIAS added for deprecated in GET track call Signed-off-by: ashish-jabble --- python/fledge/services/core/api/asset_tracker.py | 1 + python/fledge/services/core/api/south.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/fledge/services/core/api/asset_tracker.py b/python/fledge/services/core/api/asset_tracker.py index 344c2a390..d21fb754c 100644 --- a/python/fledge/services/core/api/asset_tracker.py +++ b/python/fledge/services/core/api/asset_tracker.py @@ -43,6 +43,7 @@ async def get_asset_tracker_events(request: web.Request) -> web.Response: """ payload = PayloadBuilder().SELECT("asset", "event", "service", "fledge", "plugin", "ts", "deprecated_ts") \ .ALIAS("return", ("ts", 'timestamp')).FORMAT("return", ("ts", "YYYY-MM-DD HH24:MI:SS.MS")) \ + .ALIAS("return", ("deprecated_ts", 'deprecatedTimestamp')) \ .WHERE(['1', '=', 1]) if 'asset' in request.query and request.query['asset'] != '': asset = urllib.parse.unquote(request.query['asset']) diff --git a/python/fledge/services/core/api/south.py b/python/fledge/services/core/api/south.py index 9d537122d..d65856621 100644 --- a/python/fledge/services/core/api/south.py +++ b/python/fledge/services/core/api/south.py @@ -116,7 +116,7 @@ async def _get_tracked_plugin_assets_and_readings(storage_client, cf_mgr, svc_na plugin_value = await cf_mgr.get_category_item(svc_name, 'plugin') plugin = plugin_value['value'] if plugin_value is not None else '' payload = PayloadBuilder().SELECT(["asset", "plugin"]).WHERE(['service', '=', svc_name]).AND_WHERE( - ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).AND_WHERE(['deprecated_ts', '=', ""]).payload() + ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).payload() try: result = await storage_client.query_tbl_with_payload('asset_tracker', payload) # TODO: FOGL-2549 From 0c1fb70f020ea6a7101046f5a15a420b5f97472a Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 15:32:26 +0530 Subject: [PATCH 05/26] deprecated timestamp updated on deleting a service Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index ddb65e973..47f760b6e 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -174,6 +174,11 @@ async def delete_service(request): # Delete schedule await server.Server.scheduler.delete_schedule(sch_id) + + # update deprecated_ts entry in asset tracker + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE(['service', '=', svc]).payload() + await storage.update_tbl("asset_tracker", update_payload) except Exception as ex: raise web.HTTPInternalServerError(reason=str(ex)) else: From d8f8a6f3e5e01b84372796e786089c6a2c5d5dc4 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 15:48:43 +0530 Subject: [PATCH 06/26] deprecated timestamp updated on deleting a task Signed-off-by: ashish-jabble --- python/fledge/services/core/api/task.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/fledge/services/core/api/task.py b/python/fledge/services/core/api/task.py index f04293912..e3d4ae6e9 100644 --- a/python/fledge/services/core/api/task.py +++ b/python/fledge/services/core/api/task.py @@ -325,6 +325,11 @@ async def delete_task(request): await delete_streams(storage, north_instance) await delete_plugin_data(storage, north_instance) + # update deprecated_ts entry in asset tracker + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', north_instance]).payload() + await storage.update_tbl("asset_tracker", update_payload) except Exception as ex: raise web.HTTPInternalServerError(reason=ex) else: From 533d86f11163cab28d234806fc898d7059e640be Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 16:09:00 +0530 Subject: [PATCH 07/26] deprecated timestamp updated on deleting a filter Signed-off-by: ashish-jabble --- python/fledge/services/core/api/filters.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/fledge/services/core/api/filters.py b/python/fledge/services/core/api/filters.py index 618d3506e..8b5128d9c 100644 --- a/python/fledge/services/core/api/filters.py +++ b/python/fledge/services/core/api/filters.py @@ -13,7 +13,7 @@ from fledge.common.configuration_manager import ConfigurationManager from fledge.services.core import connect from fledge.services.core.api import utils as apiutils -from fledge.common import logger +from fledge.common import logger, utils from fledge.common.storage_client.payload_builder import PayloadBuilder from fledge.common.storage_client.exceptions import StorageServerError from fledge.common.storage_client.storage_client import StorageClientAsync @@ -406,6 +406,12 @@ async def delete_filter(request: web.Request) -> web.Response: # Delete configuration for filter await _delete_configuration_category(storage, filter_name) + + # update deprecated_ts entry in asset tracker + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['plugin', '=', filter_name]).payload() + await storage.update_tbl("asset_tracker", update_payload) except StorageServerError as ex: _LOGGER.exception("Delete filter: %s, caught exception: %s", filter_name, str(ex.error)) raise web.HTTPInternalServerError(reason=str(ex.error)) From d95012fbc7c33c56294f411da03a68ed4f858726 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 17:51:20 +0530 Subject: [PATCH 08/26] asset tracker and filter unit tests fixes Signed-off-by: ashish-jabble --- .../core/api/test_asset_tracker_api.py | 22 +++++++++++++------ .../fledge/services/core/api/test_filters.py | 18 ++++++++++----- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py b/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py index 2f8ced471..1b60fdb02 100644 --- a/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py +++ b/tests/unit/python/fledge/services/core/api/test_asset_tracker_api.py @@ -41,15 +41,23 @@ async def async_mock(): return {"rows": rows, 'count': 1} storage_client_mock = MagicMock(StorageClientAsync) - rows = [{'asset': 'AirIntake', 'event': 'Ingest', 'fledge': 'Booth1', 'service': 'PT100_In1', 'plugin': 'PT100', "timestamp": "2018-08-13 15:39:48.796263"}, - {'asset': 'AirIntake', 'event': 'Egress', 'fledge': 'Booth1', 'service': 'Display', 'plugin': 'ShopFloorDisplay', "timestamp": "2018-08-13 16:00:00.134563"}] - payload = {'where': {'condition': '=', 'value': 1, 'column': '1'}, 'return': ['asset', 'event', 'service', 'fledge', 'plugin', {'alias': 'timestamp', 'column': 'ts', 'format': 'YYYY-MM-DD HH24:MI:SS.MS'}]} + rows = [{'asset': 'AirIntake', 'event': 'Ingest', 'fledge': 'Booth1', 'service': 'PT100_In1', + 'plugin': 'PT100', "timestamp": "2018-08-13 15:39:48.796263", "deprecatedTimestamp": "" + }, + {'asset': 'AirIntake', 'event': 'Egress', 'fledge': 'Booth1', 'service': 'Display', + 'plugin': 'ShopFloorDisplay', "timestamp": "2018-08-13 16:00:00.134563", "deprecatedTimestamp": "" + } + ] + payload = {'where': {'condition': '=', 'value': 1, 'column': '1'}, + 'return': ['asset', 'event', 'service', 'fledge', 'plugin', + {'alias': 'timestamp', 'column': 'ts', 'format': 'YYYY-MM-DD HH24:MI:SS.MS'}, + {'alias': 'deprecatedTimestamp', 'column': 'deprecated_ts'} + ] + } # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. - if sys.version_info.major == 3 and sys.version_info.minor >= 8: - _rv = await async_mock() - else: - _rv = asyncio.ensure_future(async_mock()) + _rv = await async_mock() if sys.version_info.major == 3 and sys.version_info.minor >= 8 \ + else asyncio.ensure_future(async_mock()) with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(storage_client_mock, 'query_tbl_with_payload', return_value=_rv) as patch_query_payload: diff --git a/tests/unit/python/fledge/services/core/api/test_filters.py b/tests/unit/python/fledge/services/core/api/test_filters.py index a322b6d37..124aa4247 100644 --- a/tests/unit/python/fledge/services/core/api/test_filters.py +++ b/tests/unit/python/fledge/services/core/api/test_filters.py @@ -432,25 +432,31 @@ def q_result(*args): filter_name = "AssetFilter" delete_result = {'response': 'deleted', 'rows_affected': 1} + update_result = {'rows_affected': 1, "response": "updated"} storage_client_mock = MagicMock(StorageClientAsync) # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: _rv1 = await self.async_mock(None) _rv2 = await self.async_mock(delete_result) + _rv3 = await self.async_mock(update_result) else: _rv1 = asyncio.ensure_future(self.async_mock(None)) _rv2 = asyncio.ensure_future(self.async_mock(delete_result)) - + _rv3 = asyncio.ensure_future(self.async_mock(update_result)) + with patch.object(connect, 'get_storage_async', return_value=storage_client_mock): with patch.object(storage_client_mock, 'query_tbl_with_payload', side_effect=q_result): with patch.object(storage_client_mock, 'delete_from_tbl', return_value=_rv2) as delete_tbl_patch: with patch.object(filters, '_delete_configuration_category', return_value=_rv1) as delete_cfg_patch: - resp = await client.delete('/fledge/filter/{}'.format(filter_name)) - assert 200 == resp.status - r = await resp.text() - json_response = json.loads(r) - assert {'result': 'Filter AssetFilter deleted successfully'} == json_response + with patch.object(storage_client_mock, 'update_tbl', return_value=_rv3) as update_tbl_patch: + resp = await client.delete('/fledge/filter/{}'.format(filter_name)) + assert 200 == resp.status + r = await resp.text() + json_response = json.loads(r) + assert {'result': 'Filter AssetFilter deleted successfully'} == json_response + args, kwargs = update_tbl_patch.call_args + assert 'asset_tracker' == args[0] args, kwargs = delete_cfg_patch.call_args assert filter_name == args[1] delete_tbl_patch.assert_called_once_with('filters', '{"where": {"column": "name", "condition": "=", "value": "AssetFilter"}}') From 70082fc5a90f1a3637e7ad13272e180f89fc1d8a Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 17:52:00 +0530 Subject: [PATCH 09/26] service unit tests fixes and other code refactoring Signed-off-by: ashish-jabble --- python/fledge/services/core/api/service.py | 22 +++++++++----- .../fledge/services/core/api/test_service.py | 29 ++++++++++++++----- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 47f760b6e..9f5f251c2 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -169,32 +169,38 @@ async def delete_service(request): except service_registry_exceptions.DoesNotExist: pass + # Delete streams and plugin data await delete_streams(storage, svc) await delete_plugin_data(storage, svc) # Delete schedule await server.Server.scheduler.delete_schedule(sch_id) - # update deprecated_ts entry in asset tracker - current_time = utils.local_timestamp() - update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE(['service', '=', svc]).payload() - await storage.update_tbl("asset_tracker", update_payload) + # Update deprecated timestamp in asset_tracker + await update_deprecated_ts_in_asset_tracker(storage, svc) except Exception as ex: raise web.HTTPInternalServerError(reason=str(ex)) else: return web.json_response({'result': 'Service {} deleted successfully.'.format(svc)}) -async def delete_streams(storage, north_instance): - payload = PayloadBuilder().WHERE(["description", "=", north_instance]).payload() +async def delete_streams(storage, svc): + payload = PayloadBuilder().WHERE(["description", "=", svc]).payload() await storage.delete_from_tbl("streams", payload) -async def delete_plugin_data(storage, north_instance): - payload = PayloadBuilder().WHERE(["key", "like", north_instance + "%"]).payload() +async def delete_plugin_data(storage, svc): + payload = PayloadBuilder().WHERE(["key", "like", svc + "%"]).payload() await storage.delete_from_tbl("plugin_data", payload) +async def update_deprecated_ts_in_asset_tracker(storage, svc): + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', svc]).payload() + await storage.update_tbl("asset_tracker", update_payload) + + async def add_service(request): """ Create a new service to run a specific plugin diff --git a/tests/unit/python/fledge/services/core/api/test_service.py b/tests/unit/python/fledge/services/core/api/test_service.py index 63ad5cdb7..794cbd03f 100644 --- a/tests/unit/python/fledge/services/core/api/test_service.py +++ b/tests/unit/python/fledge/services/core/api/test_service.py @@ -747,24 +747,33 @@ async def mock_result(): }, ] } - + + delete_result = {'response': 'deleted', 'rows_affected': 1} + update_result = {'rows_affected': 1, "response": "updated"} + # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: - _rv = await mock_result() + _rv1 = await mock_result() + _rv3 = await self.async_mock(delete_result) + _rv4 = await self.async_mock(update_result) else: - _rv = asyncio.ensure_future(mock_result()) + _rv1 = asyncio.ensure_future(mock_result()) + _rv3 = asyncio.ensure_future(self.async_mock(delete_result)) + _rv4 = asyncio.ensure_future(self.async_mock(update_result)) _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) - mocker.patch.object(connect, 'get_storage_async') - get_schedule = mocker.patch.object(service, "get_schedule", return_value=_rv) + get_schedule = mocker.patch.object(service, "get_schedule", return_value=_rv1) scheduler = mocker.patch.object(server.Server, "scheduler", MagicMock()) delete_schedule = mocker.patch.object(scheduler, "delete_schedule", return_value=_rv2) disable_schedule = mocker.patch.object(scheduler, "disable_schedule", return_value=_rv2) - delete_configuration = mocker.patch.object(ConfigurationManager, "delete_category_and_children_recursively", return_value=_rv2) + delete_configuration = mocker.patch.object(ConfigurationManager, "delete_category_and_children_recursively", + return_value=_rv2) get_registry = mocker.patch.object(ServiceRegistry, 'get', return_value=mock_registry) remove_registry = mocker.patch.object(ServiceRegistry, 'remove_from_registry') - delete_streams = mocker.patch.object(service, "delete_streams", return_value=_rv) - delete_plugin_data = mocker.patch.object(service, "delete_plugin_data", return_value=_rv) + delete_streams = mocker.patch.object(service, "delete_streams", return_value=_rv3) + delete_plugin_data = mocker.patch.object(service, "delete_plugin_data", return_value=_rv3) + update_deprecated_ts_in_asset_tracker = mocker.patch.object(service, "update_deprecated_ts_in_asset_tracker", + return_value=_rv4) mock_registry[0]._status = ServiceRecord.Status.Shutdown @@ -805,6 +814,10 @@ async def mock_result(): args, kwargs = delete_plugin_data.call_args_list[0] assert sch_name in args + assert 1 == update_deprecated_ts_in_asset_tracker.call_count + args, kwargs = update_deprecated_ts_in_asset_tracker.call_args_list[0] + assert sch_name in args + async def test_delete_service_exception(self, mocker, client): resp = await client.delete("/fledge/service") assert 405 == resp.status From 3f4d25f18f690dbbb41535c20c15108ae73d0f84 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Wed, 27 Jul 2022 17:52:26 +0530 Subject: [PATCH 10/26] task unit tests fixes and other code refactoring Signed-off-by: ashish-jabble --- python/fledge/services/core/api/task.py | 19 +++++++---- .../fledge/services/core/api/test_task.py | 32 +++++++++++++++---- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/python/fledge/services/core/api/task.py b/python/fledge/services/core/api/task.py index e3d4ae6e9..8bf9d129a 100644 --- a/python/fledge/services/core/api/task.py +++ b/python/fledge/services/core/api/task.py @@ -319,17 +319,13 @@ async def delete_task(request): config_mgr = ConfigurationManager(storage) await config_mgr.delete_category_and_children_recursively(north_instance) - # delete statistics key + # delete statistics key, streams, plugin data await delete_statistics_key(storage, north_instance) - await delete_streams(storage, north_instance) await delete_plugin_data(storage, north_instance) + # update deprecated timestamp in asset_tracker + await update_deprecated_ts_in_asset_tracker(storage, north_instance) - # update deprecated_ts entry in asset tracker - current_time = utils.local_timestamp() - update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( - ['service', '=', north_instance]).payload() - await storage.update_tbl("asset_tracker", update_payload) except Exception as ex: raise web.HTTPInternalServerError(reason=ex) else: @@ -363,10 +359,19 @@ async def delete_task_entry_with_schedule_id(storage, sch_id): payload = PayloadBuilder().WHERE(["schedule_id", "=", str(sch_id)]).payload() await storage.delete_from_tbl("tasks", payload) + async def delete_streams(storage, north_instance): payload = PayloadBuilder().WHERE(["description", "=", north_instance]).payload() await storage.delete_from_tbl("streams", payload) + async def delete_plugin_data(storage, north_instance): payload = PayloadBuilder().WHERE(["key", "like", north_instance + "%"]).payload() await storage.delete_from_tbl("plugin_data", payload) + + +async def update_deprecated_ts_in_asset_tracker(storage, north_instance): + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', north_instance]).payload() + await storage.update_tbl("asset_tracker", update_payload) diff --git a/tests/unit/python/fledge/services/core/api/test_task.py b/tests/unit/python/fledge/services/core/api/test_task.py index 35fc1e39d..f30a4a904 100644 --- a/tests/unit/python/fledge/services/core/api/test_task.py +++ b/tests/unit/python/fledge/services/core/api/test_task.py @@ -502,14 +502,19 @@ async def mock_result(): ] } + delete_result = {'response': 'deleted', 'rows_affected': 1} + update_result = {'rows_affected': 1, "response": "updated"} + # Changed in version 3.8: patch() now returns an AsyncMock if the target is an async function. if sys.version_info.major == 3 and sys.version_info.minor >= 8: _rv1 = await mock_result() - _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) + _rv3 = await self.async_mock(delete_result) + _rv4 = await self.async_mock(update_result) else: _rv1 = asyncio.ensure_future(mock_result()) - _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) - + _rv3 = asyncio.ensure_future(self.async_mock(delete_result)) + _rv4 = asyncio.ensure_future(self.async_mock(update_result)) + _rv2 = asyncio.ensure_future(asyncio.sleep(.1)) storage_client_mock = MagicMock(StorageClientAsync) mocker.patch.object(connect, 'get_storage_async', storage_client_mock) get_schedule = mocker.patch.object(task, "get_schedule", return_value=_rv1) @@ -520,10 +525,11 @@ async def mock_result(): return_value=_rv2) delete_configuration = mocker.patch.object(ConfigurationManager, "delete_category_and_children_recursively", return_value=_rv2) - delete_statistics_key = mocker.patch.object(task, "delete_statistics_key", return_value=_rv2) - - delete_streams = mocker.patch.object(task, "delete_streams", return_value=_rv2) - delete_plugin_data = mocker.patch.object(task, "delete_plugin_data", return_value=_rv2) + delete_statistics_key = mocker.patch.object(task, "delete_statistics_key", return_value=_rv3) + delete_streams = mocker.patch.object(task, "delete_streams", return_value=_rv3) + delete_plugin_data = mocker.patch.object(task, "delete_plugin_data", return_value=_rv3) + update_deprecated_ts_in_asset_tracker = mocker.patch.object(task, "update_deprecated_ts_in_asset_tracker", + return_value=_rv4) resp = await client.delete("/fledge/scheduled/task/{}".format(sch_name)) assert 200 == resp.status @@ -554,6 +560,18 @@ async def mock_result(): args, kwargs = delete_statistics_key.call_args_list[0] assert sch_name in args + assert 1 == delete_streams.call_count + args, kwargs = delete_streams.call_args_list[0] + assert sch_name in args + + assert 1 == delete_plugin_data.call_count + args, kwargs = delete_plugin_data.call_args_list[0] + assert sch_name in args + + assert 1 == update_deprecated_ts_in_asset_tracker.call_count + args, kwargs = update_deprecated_ts_in_asset_tracker.call_args_list[0] + assert sch_name in args + async def test_delete_task_exception(self, mocker, client): resp = await client.delete("/fledge/scheduled/task") assert 405 == resp.status From 6062b949d53d5be988575729ccda7935514cb184 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 10:42:19 +0530 Subject: [PATCH 11/26] south API changes for to show asset track entry only if deprecated_ts is null Signed-off-by: ashish-jabble --- python/fledge/services/core/api/south.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fledge/services/core/api/south.py b/python/fledge/services/core/api/south.py index d65856621..36a01d3f4 100644 --- a/python/fledge/services/core/api/south.py +++ b/python/fledge/services/core/api/south.py @@ -116,7 +116,7 @@ async def _get_tracked_plugin_assets_and_readings(storage_client, cf_mgr, svc_na plugin_value = await cf_mgr.get_category_item(svc_name, 'plugin') plugin = plugin_value['value'] if plugin_value is not None else '' payload = PayloadBuilder().SELECT(["asset", "plugin"]).WHERE(['service', '=', svc_name]).AND_WHERE( - ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).payload() + ['event', '=', 'Ingest']).AND_WHERE(['plugin', '=', plugin]).AND_WHERE(['deprecated_ts', 'isnull']).payload() try: result = await storage_client.query_tbl_with_payload('asset_tracker', payload) # TODO: FOGL-2549 From 92005be75e5339568e58011e6145925a6a626a99 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 11:07:43 +0530 Subject: [PATCH 12/26] case handled for already deprecated asset track entry Signed-off-by: ashish-jabble --- .../fledge/services/core/api/asset_tracker.py | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/python/fledge/services/core/api/asset_tracker.py b/python/fledge/services/core/api/asset_tracker.py index d21fb754c..c7a29cd4b 100644 --- a/python/fledge/services/core/api/asset_tracker.py +++ b/python/fledge/services/core/api/asset_tracker.py @@ -86,26 +86,29 @@ async def deprecate_asset_track_entry(request: web.Request) -> web.Response: event_name = request.match_info.get('event', None) try: storage_client = connect.get_storage_async() - select_payload = PayloadBuilder().SELECT("service").WHERE( + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE( ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( ['event', '=', event_name]).payload() get_result = await storage_client.query_tbl_with_payload('asset_tracker', select_payload) if 'rows' in get_result: response = get_result['rows'] if response: - # Update deprecated ts column entry - current_time = common_utils.local_timestamp() - update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( - ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( - ['event', '=', event_name]).payload() - update_result = await storage_client.update_tbl("asset_tracker", update_payload) - if 'response' in update_result: - response = update_result['response'] - if response != 'updated': - raise KeyError('Update failure in asset tracker for service: {} asset: {} event: {}'.format( - svc_name, asset_name, event_name)) + if response[0]['deprecated_ts'] == "": + # Update deprecated_ts column entry + current_time = common_utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( + ['event', '=', event_name]).AND_WHERE(['deprecated_ts', 'isnull']).payload() + update_result = await storage_client.update_tbl("asset_tracker", update_payload) + if 'response' in update_result: + response = update_result['response'] + if response != 'updated': + raise KeyError('Update failure in asset tracker for service: {} asset: {} event: {}'.format( + svc_name, asset_name, event_name)) + else: + raise StorageServerError else: - raise StorageServerError + raise KeyError('Asset record already deprecated.') else: raise ValueError('No record found in asset tracker for given service: {} asset: {} event: {}'.format( svc_name, asset_name, event_name)) From 02eb7007a56e772d459d2087369268bbdf448606 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 12:43:37 +0530 Subject: [PATCH 13/26] more comments are update inline with FOGL-6749 Signed-off-by: ashish-jabble --- python/fledge/services/core/api/asset_tracker.py | 1 + python/fledge/services/core/api/service.py | 6 ++++++ python/fledge/services/core/api/task.py | 6 ++++++ 3 files changed, 13 insertions(+) diff --git a/python/fledge/services/core/api/asset_tracker.py b/python/fledge/services/core/api/asset_tracker.py index c7a29cd4b..105c924e4 100644 --- a/python/fledge/services/core/api/asset_tracker.py +++ b/python/fledge/services/core/api/asset_tracker.py @@ -86,6 +86,7 @@ async def deprecate_asset_track_entry(request: web.Request) -> web.Response: event_name = request.match_info.get('event', None) try: storage_client = connect.get_storage_async() + # TODO: FOGL-6749 Once rows affected with 0 case handled at Storage side then we can remove SELECT call select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE( ['service', '=', svc_name]).AND_WHERE(['asset', '=', asset_name]).AND_WHERE( ['event', '=', event_name]).payload() diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 9f5f251c2..6db67a544 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -195,6 +195,12 @@ async def delete_plugin_data(storage, svc): async def update_deprecated_ts_in_asset_tracker(storage, svc): + """ + TODO: FOGL-6749 Once rows affected with 0 case handled at Storage side + then we will update query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + And we do not want to add expensive calls or workaround to restrict the UPDATE. + """ current_time = utils.local_timestamp() update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( ['service', '=', svc]).payload() diff --git a/python/fledge/services/core/api/task.py b/python/fledge/services/core/api/task.py index 8bf9d129a..151ea548e 100644 --- a/python/fledge/services/core/api/task.py +++ b/python/fledge/services/core/api/task.py @@ -371,6 +371,12 @@ async def delete_plugin_data(storage, north_instance): async def update_deprecated_ts_in_asset_tracker(storage, north_instance): + """ + TODO: FOGL-6749 Once rows affected with 0 case handled at Storage side + then we will update query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + And we do not want to add expensive calls or workaround to restrict the UPDATE. + """ current_time = utils.local_timestamp() update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( ['service', '=', north_instance]).payload() From 03a4d5cf4b84df8e23fba32c68c95ce76e2c0a1b Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 12:48:00 +0530 Subject: [PATCH 14/26] sqlitelb deprecated_ts schema changes along with up/down Sql Signed-off-by: ashish-jabble --- .../plugins/storage/sqlitelb/downgrade/52.sql | 36 +++++++++++++++++++ scripts/plugins/storage/sqlitelb/init.sql | 16 +++++---- .../plugins/storage/sqlitelb/upgrade/53.sql | 3 ++ 3 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 scripts/plugins/storage/sqlitelb/downgrade/52.sql create mode 100644 scripts/plugins/storage/sqlitelb/upgrade/53.sql diff --git a/scripts/plugins/storage/sqlitelb/downgrade/52.sql b/scripts/plugins/storage/sqlitelb/downgrade/52.sql new file mode 100644 index 000000000..caa33f35e --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/downgrade/52.sql @@ -0,0 +1,36 @@ +-- From: http://www.sqlite.org/faq.html: +-- SQLite has limited ALTER TABLE support that you can use to change type of column. +-- If you want to change the type of any column you will have to recreate the table. +-- You can save existing data to a temporary table and then drop the old table +-- Now, create the new table, then copy the data back in from the temporary table + + +-- Remove deprecated_ts column in fledge.asset_tracker + +-- Drop existing index +DROP INDEX IF EXISTS asset_tracker_ix1; +DROP INDEX IF EXISTS asset_tracker_ix2; + +-- Rename existing table into a temp one +ALTER TABLE fledge.asset_tracker RENAME TO asset_tracker_old; + +-- Create new table +CREATE TABLE IF NOT EXISTS fledge.asset_tracker ( + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); + +-- Copy data +INSERT INTO fledge.asset_tracker ( id, asset, event, service, fledge, plugin, ts ) SELECT id, asset, event, service, fledge, plugin, ts FROM fledge.asset_tracker_old; + +-- Create Index +CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); +CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); + +-- Remote old table +DROP TABLE IF EXISTS fledge.asset_tracker_old; diff --git a/scripts/plugins/storage/sqlitelb/init.sql b/scripts/plugins/storage/sqlitelb/init.sql index 1b89f49da..e44c77c3a 100644 --- a/scripts/plugins/storage/sqlitelb/init.sql +++ b/scripts/plugins/storage/sqlitelb/init.sql @@ -559,13 +559,15 @@ CREATE UNIQUE INDEX config_children_idx1 -- Create the asset_tracker table CREATE TABLE fledge.asset_tracker ( - id integer PRIMARY KEY AUTOINCREMENT, - asset character(50) NOT NULL, - event character varying(50) NOT NULL, - service character varying(255) NOT NULL, - fledge character varying(50) NOT NULL, - plugin character varying(50) NOT NULL, - ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) ); + id integer PRIMARY KEY AUTOINCREMENT, + asset character(50) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + deprecated_ts DATETIME , -- When an asset record is removed then time will be set else empty and that mean entry has not been deprecated + ts DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', 'localtime')) +); CREATE INDEX asset_tracker_ix1 ON asset_tracker (asset); CREATE INDEX asset_tracker_ix2 ON asset_tracker (service); diff --git a/scripts/plugins/storage/sqlitelb/upgrade/53.sql b/scripts/plugins/storage/sqlitelb/upgrade/53.sql new file mode 100644 index 000000000..939be8d20 --- /dev/null +++ b/scripts/plugins/storage/sqlitelb/upgrade/53.sql @@ -0,0 +1,3 @@ +-- Add new column name 'deprecated_ts' for asset_tracker + +ALTER TABLE fledge.asset_tracker ADD COLUMN deprecated_ts DATETIME; From cb197bd5ff78f6c39538ea63dd7cb7ac969edb1d Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 13:02:54 +0530 Subject: [PATCH 15/26] deprecated_ts schema changes along with up/down Sql added in PostgreSQL Signed-off-by: ashish-jabble --- .../plugins/storage/postgres/downgrade/52.sql | 2 ++ scripts/plugins/storage/postgres/init.sql | 16 +++++++++------- scripts/plugins/storage/postgres/upgrade/53.sql | 3 +++ 3 files changed, 14 insertions(+), 7 deletions(-) create mode 100644 scripts/plugins/storage/postgres/downgrade/52.sql create mode 100644 scripts/plugins/storage/postgres/upgrade/53.sql diff --git a/scripts/plugins/storage/postgres/downgrade/52.sql b/scripts/plugins/storage/postgres/downgrade/52.sql new file mode 100644 index 000000000..1ee0dc809 --- /dev/null +++ b/scripts/plugins/storage/postgres/downgrade/52.sql @@ -0,0 +1,2 @@ +--Remove deprecated_ts column from asset_tracker table +ALTER TABLE fledge.asset_tracker DROP COLUMN IF EXISTS deprecated_ts; \ No newline at end of file diff --git a/scripts/plugins/storage/postgres/init.sql b/scripts/plugins/storage/postgres/init.sql index 00c31702a..9b2588355 100644 --- a/scripts/plugins/storage/postgres/init.sql +++ b/scripts/plugins/storage/postgres/init.sql @@ -766,13 +766,15 @@ CREATE UNIQUE INDEX config_children_ix1 ON fledge.category_children(parent, chil -- Create the asset_tracker table CREATE TABLE fledge.asset_tracker ( - id integer NOT NULL DEFAULT nextval('fledge.asset_tracker_id_seq'::regclass), - asset character(255) NOT NULL, - event character varying(50) NOT NULL, - service character varying(255) NOT NULL, - fledge character varying(50) NOT NULL, - plugin character varying(50) NOT NULL, - ts timestamp(6) with time zone NOT NULL DEFAULT now() ); + id integer NOT NULL DEFAULT nextval('fledge.asset_tracker_id_seq'::regclass), + asset character(255) NOT NULL, -- asset name + event character varying(50) NOT NULL, -- event name + service character varying(255) NOT NULL, -- service name + fledge character varying(50) NOT NULL, -- FL service name + plugin character varying(50) NOT NULL, -- Plugin name + deprecated_ts timestamp(6) with time zone , -- When an asset record is removed then time will be set else empty and that mean entry has not been deprecated + ts timestamp(6) with time zone NOT NULL DEFAULT now() +); CREATE INDEX asset_tracker_ix1 ON fledge.asset_tracker USING btree (asset); CREATE INDEX asset_tracker_ix2 ON fledge.asset_tracker USING btree (service); diff --git a/scripts/plugins/storage/postgres/upgrade/53.sql b/scripts/plugins/storage/postgres/upgrade/53.sql new file mode 100644 index 000000000..e436c99a0 --- /dev/null +++ b/scripts/plugins/storage/postgres/upgrade/53.sql @@ -0,0 +1,3 @@ +-- Add new column name 'deprecated_ts' for asset_tracker + +ALTER TABLE fledge.asset_tracker ADD COLUMN deprecated_ts timestamp(6) with time zone; From 6b2878023d3a44d605b0ab7cf271579678980815 Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 15:48:32 +0530 Subject: [PATCH 16/26] workaround fixes to update deprecated_ts in asset tracker only if record exists; this is a limitation on storage service side, once FOGL-6749 is done we will remove this Signed-off-by: ashish-jabble --- python/fledge/services/core/api/filters.py | 24 +++++++++++++++++----- python/fledge/services/core/api/service.py | 24 ++++++++++++++-------- python/fledge/services/core/api/task.py | 24 ++++++++++++++-------- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/python/fledge/services/core/api/filters.py b/python/fledge/services/core/api/filters.py index 8b5128d9c..1506a838b 100644 --- a/python/fledge/services/core/api/filters.py +++ b/python/fledge/services/core/api/filters.py @@ -407,11 +407,25 @@ async def delete_filter(request: web.Request) -> web.Response: # Delete configuration for filter await _delete_configuration_category(storage, filter_name) - # update deprecated_ts entry in asset tracker - current_time = utils.local_timestamp() - update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( - ['plugin', '=', filter_name]).payload() - await storage.update_tbl("asset_tracker", update_payload) + # Update deprecated timestamp in asset_tracker + """ + TODO: FOGL-6749 + Once rows affected with 0 case handled at Storage side + then we will need to update the query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + Also added SELECT query before UPDATE to avoid BadCase when there is no asset track entry exists for the filter. + This should also be removed when given JIRA is fixed. + """ + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE(['plugin', '=', filter_name]).payload() + get_result = await storage.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # AND_WHERE(['deprecated_ts', 'isnull']) once FOGL-6749 is done + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['plugin', '=', filter_name]).payload() + await storage.update_tbl("asset_tracker", update_payload) except StorageServerError as ex: _LOGGER.exception("Delete filter: %s, caught exception: %s", filter_name, str(ex.error)) raise web.HTTPInternalServerError(reason=str(ex.error)) diff --git a/python/fledge/services/core/api/service.py b/python/fledge/services/core/api/service.py index 6db67a544..9a58eac12 100644 --- a/python/fledge/services/core/api/service.py +++ b/python/fledge/services/core/api/service.py @@ -196,15 +196,23 @@ async def delete_plugin_data(storage, svc): async def update_deprecated_ts_in_asset_tracker(storage, svc): """ - TODO: FOGL-6749 Once rows affected with 0 case handled at Storage side - then we will update query with AND_WHERE(['deprecated_ts', 'isnull']) - At the moment deprecated_ts is updated even in notnull case. - And we do not want to add expensive calls or workaround to restrict the UPDATE. + TODO: FOGL-6749 + Once rows affected with 0 case handled at Storage side + then we will need to update the query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + Also added SELECT query before UPDATE to avoid BadCase when there is no asset track entry exists for the instance. + This should also be removed when given JIRA is fixed. """ - current_time = utils.local_timestamp() - update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( - ['service', '=', svc]).payload() - await storage.update_tbl("asset_tracker", update_payload) + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE(['service', '=', svc]).payload() + get_result = await storage.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # AND_WHERE(['deprecated_ts', 'isnull']) once FOGL-6749 is done + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', svc]).payload() + await storage.update_tbl("asset_tracker", update_payload) async def add_service(request): diff --git a/python/fledge/services/core/api/task.py b/python/fledge/services/core/api/task.py index 151ea548e..819518d73 100644 --- a/python/fledge/services/core/api/task.py +++ b/python/fledge/services/core/api/task.py @@ -372,12 +372,20 @@ async def delete_plugin_data(storage, north_instance): async def update_deprecated_ts_in_asset_tracker(storage, north_instance): """ - TODO: FOGL-6749 Once rows affected with 0 case handled at Storage side - then we will update query with AND_WHERE(['deprecated_ts', 'isnull']) - At the moment deprecated_ts is updated even in notnull case. - And we do not want to add expensive calls or workaround to restrict the UPDATE. + TODO: FOGL-6749 + Once rows affected with 0 case handled at Storage side + then we will need to update the query with AND_WHERE(['deprecated_ts', 'isnull']) + At the moment deprecated_ts is updated even in notnull case. + Also added SELECT query before UPDATE to avoid BadCase when there is no asset track entry exists for the instance. + This should also be removed when given JIRA is fixed. """ - current_time = utils.local_timestamp() - update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( - ['service', '=', north_instance]).payload() - await storage.update_tbl("asset_tracker", update_payload) + select_payload = PayloadBuilder().SELECT("deprecated_ts").WHERE(['service', '=', north_instance]).payload() + get_result = await storage.query_tbl_with_payload('asset_tracker', select_payload) + if 'rows' in get_result: + response = get_result['rows'] + if response: + # AND_WHERE(['deprecated_ts', 'isnull']) once FOGL-6749 is done + current_time = utils.local_timestamp() + update_payload = PayloadBuilder().SET(deprecated_ts=current_time).WHERE( + ['service', '=', north_instance]).payload() + await storage.update_tbl("asset_tracker", update_payload) From a97b4954c42709e56987d73c5963a84459a1628e Mon Sep 17 00:00:00 2001 From: ashish-jabble Date: Thu, 28 Jul 2022 16:08:14 +0530 Subject: [PATCH 17/26] filters unit tests updated Signed-off-by: ashish-jabble --- tests/unit/python/fledge/services/core/api/test_filters.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/unit/python/fledge/services/core/api/test_filters.py b/tests/unit/python/fledge/services/core/api/test_filters.py index 124aa4247..5eff7d918 100644 --- a/tests/unit/python/fledge/services/core/api/test_filters.py +++ b/tests/unit/python/fledge/services/core/api/test_filters.py @@ -430,6 +430,11 @@ def q_result(*args): assert {"where": {"column": "name", "condition": "=", "value": filter_name}} == json.loads(payload) return {'count': 0, 'rows': []} + if table == 'asset_tracker': + assert {"return": ["deprecated_ts"], + "where": {"column": "plugin", "condition": "=", "value": filter_name}} == json.loads(payload) + return {'count': 1, 'rows': [{'deprecated_ts': ''}]} + filter_name = "AssetFilter" delete_result = {'response': 'deleted', 'rows_affected': 1} update_result = {'rows_affected': 1, "response": "updated"} From 429b2d193817771eff3795dcc3144a102947734b Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Mon, 1 Aug 2022 15:03:25 +0200 Subject: [PATCH 18/26] FOGL-6721: initial implementation of un-deprecate assets FOGL-6721: initial implementation of un-deprecate assets deprecated assets are un-deprecated only at service start. Current change is for "sqlite" storage plugin --- C/common/asset_tracking.cpp | 17 +++++++- C/common/include/asset_tracking.h | 27 ++++++++++--- C/common/include/expression.h | 1 + C/common/include/insert.h | 16 ++++++++ C/common/include/resultset.h | 3 +- C/common/management_client.cpp | 24 ++++++++++- .../storage/sqlite/common/connection.cpp | 10 +++++ C/services/south/ingest.cpp | 40 +++++++++++++++++-- 8 files changed, 123 insertions(+), 15 deletions(-) diff --git a/C/common/asset_tracking.cpp b/C/common/asset_tracking.cpp index 5fda16818..45b62fbe8 100644 --- a/C/common/asset_tracking.cpp +++ b/C/common/asset_tracking.cpp @@ -41,16 +41,27 @@ AssetTracker::AssetTracker(ManagementClient *mgtClient, string service) /** * Fetch all asset tracking tuples from DB and populate local cache * + * Return the vector of deprecated asset names + * * @param plugin Plugin name * @param event Event name + * @return Vector of deprecated asset names */ -void AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event*/) +vector AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event*/) { + vector deprecated; try { std::vector& vec = m_mgtClient->getAssetTrackingTuples(m_service); for (AssetTrackingTuple* & rec : vec) { assetTrackerTuplesCache.insert(rec); + + // Add a deprecated asset into the output + if (rec->isDeprecated()) + { + deprecated.push_back(rec->getAssetName()); + } + //Logger::getLogger()->info("Added asset tracker tuple to cache: '%s'", rec->assetToString().c_str()); } delete (&vec); @@ -58,8 +69,10 @@ void AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event* catch (...) { Logger::getLogger()->error("Failed to populate asset tracking tuples' cache"); - return; + return deprecated; } + + return deprecated; } diff --git a/C/common/include/asset_tracking.h b/C/common/include/asset_tracking.h index be1601660..9c13b2c87 100644 --- a/C/common/include/asset_tracking.h +++ b/C/common/include/asset_tracking.h @@ -28,6 +28,7 @@ class AssetTrackingTuple { std::string m_pluginName; std::string m_assetName; std::string m_eventName; + bool m_deprecated; std::string assetToString() { @@ -38,14 +39,27 @@ class AssetTrackingTuple { inline bool operator==(const AssetTrackingTuple& x) const { - return ( x.m_serviceName==m_serviceName && x.m_pluginName==m_pluginName && x.m_assetName==m_assetName && x.m_eventName==m_eventName); + return ( x.m_serviceName==m_serviceName && + x.m_pluginName==m_pluginName && + x.m_assetName==m_assetName && + x.m_eventName==m_eventName && + x.m_deprecated==m_deprecated); } - AssetTrackingTuple(const std::string& service, const std::string& plugin, - const std::string& asset, const std::string& event) : - m_serviceName(service), m_pluginName(plugin), - m_assetName(asset), m_eventName(event) + AssetTrackingTuple(const std::string& service, + const std::string& plugin, + const std::string& asset, + const std::string& event, + const bool& deprecated = false) : + m_serviceName(service), + m_pluginName(plugin), + m_assetName(asset), + m_eventName(event), + m_deprecated(deprecated) {} + + std::string& getAssetName() { return m_assetName; }; + bool isDeprecated() { return m_deprecated; }; }; struct AssetTrackingTuplePtrEqual { @@ -88,7 +102,8 @@ class AssetTracker { AssetTracker(ManagementClient *mgtClient, std::string service); ~AssetTracker() {} static AssetTracker *getAssetTracker(); - void populateAssetTrackingCache(std::string plugin, std::string event); + std::vector + populateAssetTrackingCache(std::string plugin, std::string event); bool checkAssetTrackingCache(AssetTrackingTuple& tuple); void addAssetTrackingTuple(AssetTrackingTuple& tuple); void addAssetTrackingTuple(std::string plugin, std::string asset, std::string event); diff --git a/C/common/include/expression.h b/C/common/include/expression.h index 9f2b73c16..6e1fbd462 100644 --- a/C/common/include/expression.h +++ b/C/common/include/expression.h @@ -43,6 +43,7 @@ class Expression { case JSON_COLUMN: case BOOL_COLUMN: case STRING_COLUMN: + case NULL_COLUMN: break; case INT_COLUMN: json << m_value.ival; diff --git a/C/common/include/insert.h b/C/common/include/insert.h index 959512d68..3b0dd8268 100644 --- a/C/common/include/insert.h +++ b/C/common/include/insert.h @@ -61,6 +61,15 @@ class InsertValue { strncpy(m_value.str, s.c_str(), s.length() + 1); m_type = JSON_COLUMN; }; + + // Insert a NULL value for the given column + InsertValue(const std::string& column) : + m_column(column) + { + m_type = NULL_COLUMN; + m_value.str = NULL; + } + InsertValue(const InsertValue& rhs) : m_column(rhs.m_column) { m_type = rhs.m_type; @@ -78,6 +87,9 @@ class InsertValue { case JSON_COLUMN: // Internally stored a a string m_value.str = strdup(rhs.m_value.str); break; + case NULL_COLUMN: + m_value.str = NULL; + break; case BOOL_COLUMN: // TODO break; @@ -112,6 +124,10 @@ class InsertValue { case STRING_COLUMN: json << "\"" << m_value.str << "\""; break; + case NULL_COLUMN: + // JSON output for NULL value + json << "null"; + break; } return json.str(); } diff --git a/C/common/include/resultset.h b/C/common/include/resultset.h index 3b693b2e2..b10906d9a 100644 --- a/C/common/include/resultset.h +++ b/C/common/include/resultset.h @@ -21,7 +21,8 @@ typedef enum column_type { NUMBER_COLUMN, STRING_COLUMN, BOOL_COLUMN, - JSON_COLUMN + JSON_COLUMN, + NULL_COLUMN } ColumnType; diff --git a/C/common/management_client.cpp b/C/common/management_client.cpp index 46ced267b..d5186b323 100644 --- a/C/common/management_client.cpp +++ b/C/common/management_client.cpp @@ -778,7 +778,25 @@ std::vector& ManagementClient::getAssetTrackingTuples(const { throw runtime_error("Expected asset tracker tuple to be an object"); } - AssetTrackingTuple *tuple = new AssetTrackingTuple(rec["service"].GetString(), rec["plugin"].GetString(), rec["asset"].GetString(), rec["event"].GetString()); + + // Note: deprecatedTimestamp NULL value is returned as "" + // otherwise it's a string DATE + bool deprecated = rec.HasMember("deprecatedTimestamp") && + strlen(rec["deprecatedTimestamp"].GetString()); + + AssetTrackingTuple *tuple = new AssetTrackingTuple(rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); + + m_logger->debug("Adding AssetTracker tuple for service %s: %s:%s:%s, " \ + "deprecated state is %d", + rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); vec->push_back(tuple); } } @@ -809,7 +827,9 @@ std::vector& ManagementClient::getAssetTrackingTuples(const * @return whether operation was successful */ bool ManagementClient::addAssetTrackingTuple(const std::string& service, - const std::string& plugin, const std::string& asset, const std::string& event) + const std::string& plugin, + const std::string& asset, + const std::string& event) { ostringstream convert; diff --git a/C/plugins/storage/sqlite/common/connection.cpp b/C/plugins/storage/sqlite/common/connection.cpp index 3c2d4bc5c..8753912c3 100644 --- a/C/plugins/storage/sqlite/common/connection.cpp +++ b/C/plugins/storage/sqlite/common/connection.cpp @@ -1379,6 +1379,11 @@ Document document; SQLBuffer sql; vector asset_codes; + Logger::getLogger()->fatal("---- update %s, %s: '%s'", + schema.c_str(), + table.c_str(), + payload.c_str()); + int row = 0; ostringstream convert; @@ -1473,6 +1478,11 @@ vector asset_codes; sql.append(escape(buffer.GetString())); sql.append('\''); } + // Hanlde JSON value null: "item" : null + else if (itr->value.IsNull()) + { + sql.append("NULL"); + } col++; } } diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index d72c2f454..a5abbac74 100755 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -270,8 +270,34 @@ Ingest::Ingest(StorageClient& storage, m_highLatency = false; // populate asset tracking cache - //m_assetTracker = new AssetTracker(m_mgtClient); - AssetTracker::getAssetTracker()->populateAssetTrackingCache(m_pluginName, "Ingest"); + // and get list of assets to un-deprecate + AssetTracker* assetTracker = AssetTracker::getAssetTracker(); + if (assetTracker) + { + vector deprecated = + assetTracker->populateAssetTrackingCache(m_pluginName, "Ingest"); + for (auto d : deprecated) + { + m_logger->debug("Need to un-deprecate asset '%s'", d.c_str()); + + const Condition conditionParams(Equals); + Where * wAsset = new Where("asset", conditionParams, d); + Where *wService = new Where("service", conditionParams, m_serviceName, wAsset); + Where *wEvent = new Where("event", conditionParams, "Ingest", wService); + + InsertValues unDeprecated; + + // Set NULL value + unDeprecated.push_back(InsertValue("deprecated_ts")); + + // Update storage with NULL value + int rv = m_storage.updateTable(string("asset_tracker"), unDeprecated, *wEvent); + if (rv < 0) + { + m_logger->error("Failure while un-deprecating asset '%s', d.c_str()); + } + } + } // Create the stats entry for the service createServiceStatsDbEntry(); @@ -509,7 +535,10 @@ void Ingest::processQueue() string assetName = reading->getAssetName(); if (lastAsset.compare(assetName)) { - AssetTrackingTuple tuple(m_serviceName, m_pluginName, assetName, "Ingest"); + AssetTrackingTuple tuple(m_serviceName, + m_pluginName, + assetName, + "Ingest"); if (!tracker->checkAssetTrackingCache(tuple)) { tracker->addAssetTrackingTuple(tuple); @@ -668,7 +697,10 @@ void Ingest::processQueue() string assetName = reading->getAssetName(); if (lastAsset.compare(assetName)) { - AssetTrackingTuple tuple(m_serviceName, m_pluginName, assetName, "Ingest"); + AssetTrackingTuple tuple(m_serviceName, + m_pluginName, + assetName, + "Ingest"); if (!tracker->checkAssetTrackingCache(tuple)) { tracker->addAssetTrackingTuple(tuple); From 62293ad300c09d45ac507bc0db0b04db5e94908e Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Mon, 1 Aug 2022 15:08:37 +0200 Subject: [PATCH 19/26] Compilation fix Compilation fix --- C/services/south/ingest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index a5abbac74..a35eda00c 100755 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -294,7 +294,7 @@ Ingest::Ingest(StorageClient& storage, int rv = m_storage.updateTable(string("asset_tracker"), unDeprecated, *wEvent); if (rv < 0) { - m_logger->error("Failure while un-deprecating asset '%s', d.c_str()); + m_logger->error("Failure while un-deprecating asset '%s'", d.c_str()); } } } From f1e77f10cf11f372355ee0a207fb47145d4293df Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Mon, 1 Aug 2022 15:16:10 +0200 Subject: [PATCH 20/26] Removed debug code Removed debug code --- C/plugins/storage/sqlite/common/connection.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/C/plugins/storage/sqlite/common/connection.cpp b/C/plugins/storage/sqlite/common/connection.cpp index 8753912c3..ec80223a5 100644 --- a/C/plugins/storage/sqlite/common/connection.cpp +++ b/C/plugins/storage/sqlite/common/connection.cpp @@ -1379,11 +1379,6 @@ Document document; SQLBuffer sql; vector asset_codes; - Logger::getLogger()->fatal("---- update %s, %s: '%s'", - schema.c_str(), - table.c_str(), - payload.c_str()); - int row = 0; ostringstream convert; From c5658c8feb3bfc889a4372e941c102960b426116 Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Mon, 1 Aug 2022 15:30:32 +0200 Subject: [PATCH 21/26] Addition of NULL support in update for Postgres and sqlitelb Addition of NULL support in update for Postgres and sqlitelb --- C/plugins/storage/postgres/connection.cpp | 5 +++++ C/plugins/storage/sqlite/common/connection.cpp | 2 +- C/plugins/storage/sqlitelb/common/connection.cpp | 5 +++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/C/plugins/storage/postgres/connection.cpp b/C/plugins/storage/postgres/connection.cpp index 8ee5424b5..d09733469 100644 --- a/C/plugins/storage/postgres/connection.cpp +++ b/C/plugins/storage/postgres/connection.cpp @@ -1048,6 +1048,11 @@ SQLBuffer sql; sql.append(escape(buffer.GetString())); sql.append('\''); } + // Handle JSON value null: "item" : null + else if (itr->value.IsNull()) + { + sql.append("NULL"); + } col++; } } diff --git a/C/plugins/storage/sqlite/common/connection.cpp b/C/plugins/storage/sqlite/common/connection.cpp index ec80223a5..c4c81c76a 100644 --- a/C/plugins/storage/sqlite/common/connection.cpp +++ b/C/plugins/storage/sqlite/common/connection.cpp @@ -1473,7 +1473,7 @@ vector asset_codes; sql.append(escape(buffer.GetString())); sql.append('\''); } - // Hanlde JSON value null: "item" : null + // Handle JSON value null: "item" : null else if (itr->value.IsNull()) { sql.append("NULL"); diff --git a/C/plugins/storage/sqlitelb/common/connection.cpp b/C/plugins/storage/sqlitelb/common/connection.cpp index dc4f5cfca..c3cc3ecba 100644 --- a/C/plugins/storage/sqlitelb/common/connection.cpp +++ b/C/plugins/storage/sqlitelb/common/connection.cpp @@ -1398,6 +1398,11 @@ SQLBuffer sql; sql.append(escape(buffer.GetString())); sql.append('\''); } + // Handle JSON value null: "item" : null + else if (itr->value.IsNull()) + { + sql.append("NULL"); + } col++; } } From 6baf9874a6e923a6ee1bb9bb32d92a0585e4edf5 Mon Sep 17 00:00:00 2001 From: pintomax Date: Mon, 1 Aug 2022 15:43:33 +0200 Subject: [PATCH 22/26] FOGL-6742: isnull and not null added to sqlitelb and postgres (#756) FOGL-6742: isnull and not null added to sqlitelb and postgres --- C/plugins/storage/postgres/connection.cpp | 172 ++++++++-------- .../storage/sqlitelb/common/connection.cpp | 188 ++++++++++-------- 2 files changed, 194 insertions(+), 166 deletions(-) diff --git a/C/plugins/storage/postgres/connection.cpp b/C/plugins/storage/postgres/connection.cpp index d09733469..0c8e335ad 100644 --- a/C/plugins/storage/postgres/connection.cpp +++ b/C/plugins/storage/postgres/connection.cpp @@ -2757,7 +2757,9 @@ bool Connection::jsonModifiers(const Value& payload, SQLBuffer& sql) * Convert a JSON where clause into a PostresSQL where clause * */ -bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, const string& prefix) +bool Connection::jsonWhereClause(const Value& whereClause, + SQLBuffer& sql, + const string& prefix) { if (!whereClause.IsObject()) { @@ -2774,11 +2776,6 @@ bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, const raiseError("where clause", "The \"where\" object is missing a \"condition\" property"); return false; } - if (!whereClause.HasMember("value")) - { - raiseError("where clause", "The \"where\" object is missing a \"value\" property"); - return false; - } // Handle WHERE 1 = 1, 0.55 = 0.55 etc string whereColumnName = whereClause["column"].GetString(); @@ -2801,100 +2798,117 @@ bool Connection::jsonWhereClause(const Value& whereClause, SQLBuffer& sql, const sql.append(' '); string cond = whereClause["condition"].GetString(); - if (!cond.compare("older")) + + if (cond.compare("isnull") == 0) { - if (!whereClause["value"].IsInt()) - { - raiseError("where clause", "The \"value\" of an \"older\" condition must be an integer"); - return false; - } - sql.append("< now() - INTERVAL '"); - sql.append(whereClause["value"].GetInt()); - sql.append(" seconds'"); + sql.append("isnull "); } - else if (!cond.compare("newer")) + else if (cond.compare("notnull") == 0) { - if (!whereClause["value"].IsInt()) + sql.append("notnull "); + } + else + { + if (!whereClause.HasMember("value")) { - raiseError("where clause", "The \"value\" of an \"newer\" condition must be an integer"); + raiseError("where clause", "The \"where\" object is missing a \"value\" property"); return false; } - sql.append("> now() - INTERVAL '"); - sql.append(whereClause["value"].GetInt()); - sql.append(" seconds'"); - } - else if (!cond.compare("in") || !cond.compare("not in")) - { - // Check we have a non empty array - if (whereClause["value"].IsArray() && - whereClause["value"].Size()) + if (!cond.compare("older")) { - sql.append(cond); - sql.append(" ( "); - int field = 0; - for (Value::ConstValueIterator itr = whereClause["value"].Begin(); - itr != whereClause["value"].End(); - ++itr) + if (!whereClause["value"].IsInt()) { - if (field) - { - sql.append(", "); - } - field++; - if (itr->IsNumber()) + raiseError("where clause", "The \"value\" of an \"older\" condition must be an integer"); + return false; + } + sql.append("< now() - INTERVAL '"); + sql.append(whereClause["value"].GetInt()); + sql.append(" seconds'"); + } + else if (!cond.compare("newer")) + { + if (!whereClause["value"].IsInt()) + { + raiseError("where clause", "The \"value\" of an \"newer\" condition must be an integer"); + return false; + } + sql.append("> now() - INTERVAL '"); + sql.append(whereClause["value"].GetInt()); + sql.append(" seconds'"); + } + else if (!cond.compare("in") || !cond.compare("not in")) + { + // Check we have a non empty array + if (whereClause["value"].IsArray() && + whereClause["value"].Size()) + { + sql.append(cond); + sql.append(" ( "); + int field = 0; + for (Value::ConstValueIterator itr = whereClause["value"].Begin(); + itr != whereClause["value"].End(); + ++itr) { - if (itr->IsInt()) + if (field) { - sql.append(itr->GetInt()); + sql.append(", "); + } + field++; + if (itr->IsNumber()) + { + if (itr->IsInt()) + { + sql.append(itr->GetInt()); + } + else if (itr->IsInt64()) + { + sql.append((long)itr->GetInt64()); + } + else + { + sql.append(itr->GetDouble()); + } } - else if (itr->IsInt64()) + else if (itr->IsString()) { - sql.append((long)itr->GetInt64()); + sql.append('\''); + sql.append(escape(itr->GetString())); + sql.append('\''); } else { - sql.append(itr->GetDouble()); + string message("The \"value\" of a \"" + \ + cond + \ + "\" condition array element must be " \ + "a string, integer or double."); + raiseError("where clause", message.c_str()); + return false; } } - else if (itr->IsString()) - { - sql.append('\''); - sql.append(escape(itr->GetString())); - sql.append('\''); - } - else - { - string message("The \"value\" of a \"" + \ - cond + \ - "\" condition array element must be " \ - "a string, integer or double."); - raiseError("where clause", message.c_str()); - return false; - } + sql.append(" )"); + } + else + { + string message("The \"value\" of a \"" + \ + cond + "\" condition must be an array " \ + "and must not be empty."); + raiseError("where clause", message.c_str()); + return false; } - sql.append(" )"); } else { - string message("The \"value\" of a \"" + \ - cond + "\" condition must be an array " \ - "and must not be empty."); - raiseError("where clause", message.c_str()); - return false; - } - } - else - { - sql.append(cond); - sql.append(' '); - if (whereClause["value"].IsInt()) - { - sql.append(whereClause["value"].GetInt()); - } else if (whereClause["value"].IsString()) - { - sql.append('\''); - sql.append(escape(whereClause["value"].GetString())); - sql.append('\''); + sql.append(cond); + sql.append(' '); + if (whereClause["value"].IsInt()) + { + sql.append(whereClause["value"].GetInt()); + } else if (whereClause["value"].IsString()) + { + sql.append('\''); + sql.append(escape(whereClause["value"].GetString())); + sql.append('\''); + } } } diff --git a/C/plugins/storage/sqlitelb/common/connection.cpp b/C/plugins/storage/sqlitelb/common/connection.cpp index c3cc3ecba..1546dbdf9 100644 --- a/C/plugins/storage/sqlitelb/common/connection.cpp +++ b/C/plugins/storage/sqlitelb/common/connection.cpp @@ -2518,7 +2518,8 @@ bool Connection::jsonModifiers(const Value& payload, * */ bool Connection::jsonWhereClause(const Value& whereClause, - SQLBuffer& sql, bool convertLocaltime) + SQLBuffer& sql, + bool convertLocaltime) { if (!whereClause.IsObject()) { @@ -2535,118 +2536,131 @@ bool Connection::jsonWhereClause(const Value& whereClause, raiseError("where clause", "The \"where\" object is missing a \"condition\" property"); return false; } - if (!whereClause.HasMember("value")) - { - raiseError("where clause", - "The \"where\" object is missing a \"value\" property"); - return false; - } sql.append(whereClause["column"].GetString()); sql.append(' '); string cond = whereClause["condition"].GetString(); - if (!cond.compare("older")) + + if (cond.compare("isnull") == 0) { - if (!whereClause["value"].IsInt()) - { - raiseError("where clause", - "The \"value\" of an \"older\" condition must be an integer"); - return false; - } - sql.append("< datetime('now', '-"); - sql.append(whereClause["value"].GetInt()); - if (convertLocaltime) - sql.append(" seconds', 'localtime')"); // Get value in localtime - else - sql.append(" seconds')"); // Get value in UTC by asking for no timezone + sql.append("isnull "); } - else if (!cond.compare("newer")) + else if (cond.compare("notnull") == 0) { - if (!whereClause["value"].IsInt()) + sql.append("notnull "); + } + else + { + if (!whereClause.HasMember("value")) { raiseError("where clause", - "The \"value\" of an \"newer\" condition must be an integer"); + "The \"where\" object is missing a \"value\" property"); return false; } - sql.append("> datetime('now', '-"); - sql.append(whereClause["value"].GetInt()); - if (convertLocaltime) - sql.append(" seconds', 'localtime')"); // Get value in localtime - else - sql.append(" seconds')"); // Get value in UTC by asking for no timezone - } - else if (!cond.compare("in") || !cond.compare("not in")) - { - // Check we have a non empty array - if (whereClause["value"].IsArray() && - whereClause["value"].Size()) + + if (!cond.compare("older")) { - sql.append(cond); - sql.append(" ( "); - int field = 0; - for (Value::ConstValueIterator itr = whereClause["value"].Begin(); - itr != whereClause["value"].End(); - ++itr) + if (!whereClause["value"].IsInt()) { - if (field) - { - sql.append(", "); - } - field++; - if (itr->IsNumber()) + raiseError("where clause", + "The \"value\" of an \"older\" condition must be an integer"); + return false; + } + sql.append("< datetime('now', '-"); + sql.append(whereClause["value"].GetInt()); + if (convertLocaltime) + sql.append(" seconds', 'localtime')"); // Get value in localtime + else + sql.append(" seconds')"); // Get value in UTC by asking for no timezone + } + else if (!cond.compare("newer")) + { + if (!whereClause["value"].IsInt()) + { + raiseError("where clause", + "The \"value\" of an \"newer\" condition must be an integer"); + return false; + } + sql.append("> datetime('now', '-"); + sql.append(whereClause["value"].GetInt()); + if (convertLocaltime) + sql.append(" seconds', 'localtime')"); // Get value in localtime + else + sql.append(" seconds')"); // Get value in UTC by asking for no timezone + } + else if (!cond.compare("in") || !cond.compare("not in")) + { + // Check we have a non empty array + if (whereClause["value"].IsArray() && + whereClause["value"].Size()) + { + sql.append(cond); + sql.append(" ( "); + int field = 0; + for (Value::ConstValueIterator itr = whereClause["value"].Begin(); + itr != whereClause["value"].End(); + ++itr) { - if (itr->IsInt()) + if (field) { - sql.append(itr->GetInt()); + sql.append(", "); } - else if (itr->IsInt64()) + field++; + if (itr->IsNumber()) + { + if (itr->IsInt()) + { + sql.append(itr->GetInt()); + } + else if (itr->IsInt64()) + { + sql.append((long)itr->GetInt64()); + } + else + { + sql.append(itr->GetDouble()); + } + } + else if (itr->IsString()) { - sql.append((long)itr->GetInt64()); + sql.append('\''); + sql.append(escape(itr->GetString())); + sql.append('\''); } else { - sql.append(itr->GetDouble()); + string message("The \"value\" of a \"" + \ + cond + \ + "\" condition array element must be " \ + "a string, integer or double."); + raiseError("where clause", message.c_str()); + return false; } } - else if (itr->IsString()) - { - sql.append('\''); - sql.append(escape(itr->GetString())); - sql.append('\''); - } - else - { - string message("The \"value\" of a \"" + \ - cond + \ - "\" condition array element must be " \ - "a string, integer or double."); - raiseError("where clause", message.c_str()); - return false; - } + sql.append(" )"); + } + else + { + string message("The \"value\" of a \"" + \ + cond + "\" condition must be an array " \ + "and must not be empty."); + raiseError("where clause", message.c_str()); + return false; } - sql.append(" )"); } else { - string message("The \"value\" of a \"" + \ - cond + "\" condition must be an array " \ - "and must not be empty."); - raiseError("where clause", message.c_str()); - return false; - } - } - else - { - sql.append(cond); - sql.append(' '); - if (whereClause["value"].IsInt()) - { - sql.append(whereClause["value"].GetInt()); - } else if (whereClause["value"].IsString()) - { - sql.append('\''); - sql.append(escape(whereClause["value"].GetString())); - sql.append('\''); + sql.append(cond); + sql.append(' '); + if (whereClause["value"].IsInt()) + { + sql.append(whereClause["value"].GetInt()); + } else if (whereClause["value"].IsString()) + { + sql.append('\''); + sql.append(escape(whereClause["value"].GetString())); + sql.append('\''); + } } } From cd0599d859564c6e56b477d4072ffc8e732f2c9f Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Mon, 1 Aug 2022 20:59:20 +0200 Subject: [PATCH 23/26] Asset tracking records are un-deprecated at run-time Asset tracking records are un-deprecated at run-time --- C/common/asset_tracking.cpp | 31 +++--- C/common/include/asset_tracking.h | 19 ++-- C/common/include/management_client.h | 3 + C/common/management_client.cpp | 99 +++++++++++++++++++ C/services/south/include/ingest.h | 5 +- C/services/south/ingest.cpp | 136 +++++++++++++++++++++------ 6 files changed, 242 insertions(+), 51 deletions(-) diff --git a/C/common/asset_tracking.cpp b/C/common/asset_tracking.cpp index 45b62fbe8..4329c540d 100644 --- a/C/common/asset_tracking.cpp +++ b/C/common/asset_tracking.cpp @@ -45,37 +45,29 @@ AssetTracker::AssetTracker(ManagementClient *mgtClient, string service) * * @param plugin Plugin name * @param event Event name - * @return Vector of deprecated asset names */ -vector AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event*/) +void AssetTracker::populateAssetTrackingCache(string /*plugin*/, string /*event*/) { - vector deprecated; try { std::vector& vec = m_mgtClient->getAssetTrackingTuples(m_service); for (AssetTrackingTuple* & rec : vec) { assetTrackerTuplesCache.insert(rec); - // Add a deprecated asset into the output - if (rec->isDeprecated()) - { - deprecated.push_back(rec->getAssetName()); - } - - //Logger::getLogger()->info("Added asset tracker tuple to cache: '%s'", rec->assetToString().c_str()); + Logger::getLogger()->debug("Added asset tracker tuple to cache: '%s'", + rec->assetToString().c_str()); } delete (&vec); } catch (...) { Logger::getLogger()->error("Failed to populate asset tracking tuples' cache"); - return deprecated; + return; } - return deprecated; + return; } - /** * Check local cache for a given asset tracking tuple * @@ -94,6 +86,19 @@ bool AssetTracker::checkAssetTrackingCache(AssetTrackingTuple& tuple) return true; } +AssetTrackingTuple* AssetTracker::findAssetTrackingCache(AssetTrackingTuple& tuple) +{ + AssetTrackingTuple *ptr = &tuple; + std::unordered_set::const_iterator it = assetTrackerTuplesCache.find(ptr); + if (it == assetTrackerTuplesCache.end()) + { + return NULL; + } + else + { + return *it; + } +} /** * Add asset tracking tuple via microservice management API and in cache diff --git a/C/common/include/asset_tracking.h b/C/common/include/asset_tracking.h index 9c13b2c87..7ea053fdf 100644 --- a/C/common/include/asset_tracking.h +++ b/C/common/include/asset_tracking.h @@ -28,12 +28,15 @@ class AssetTrackingTuple { std::string m_pluginName; std::string m_assetName; std::string m_eventName; - bool m_deprecated; std::string assetToString() { std::ostringstream o; - o << "service:" << m_serviceName << ", plugin:" << m_pluginName << ", asset:" << m_assetName << ", event:" << m_eventName; + o << "service:" << m_serviceName << + ", plugin:" << m_pluginName << + ", asset:" << m_assetName << + ", event:" << m_eventName << + ". deprecated:" << m_deprecated; return o.str(); } @@ -42,8 +45,7 @@ class AssetTrackingTuple { return ( x.m_serviceName==m_serviceName && x.m_pluginName==m_pluginName && x.m_assetName==m_assetName && - x.m_eventName==m_eventName && - x.m_deprecated==m_deprecated); + x.m_eventName==m_eventName); } AssetTrackingTuple(const std::string& service, @@ -60,6 +62,10 @@ class AssetTrackingTuple { std::string& getAssetName() { return m_assetName; }; bool isDeprecated() { return m_deprecated; }; + void unDeprecate() { m_deprecated = false; }; + +private: + bool m_deprecated; }; struct AssetTrackingTuplePtrEqual { @@ -102,9 +108,10 @@ class AssetTracker { AssetTracker(ManagementClient *mgtClient, std::string service); ~AssetTracker() {} static AssetTracker *getAssetTracker(); - std::vector - populateAssetTrackingCache(std::string plugin, std::string event); + void populateAssetTrackingCache(std::string plugin, std::string event); bool checkAssetTrackingCache(AssetTrackingTuple& tuple); + AssetTrackingTuple* + findAssetTrackingCache(AssetTrackingTuple& tuple); void addAssetTrackingTuple(AssetTrackingTuple& tuple); void addAssetTrackingTuple(std::string plugin, std::string asset, std::string event); std::string diff --git a/C/common/include/management_client.h b/C/common/include/management_client.h index 8a73cc3a2..c400a6007 100644 --- a/C/common/include/management_client.h +++ b/C/common/include/management_client.h @@ -92,6 +92,9 @@ class ManagementClient { std::vector > >& endpoints); bool deleteProxy(const std::string& serviceName); const std::string getUrlbase() { return m_urlbase.str(); } + AssetTrackingTuple* getAssetTrackingTuple(const std::string& serviceName, + const std::string& assetName, + const std::string& event); private: std::ostringstream m_urlbase; diff --git a/C/common/management_client.cpp b/C/common/management_client.cpp index d5186b323..818735a61 100644 --- a/C/common/management_client.cpp +++ b/C/common/management_client.cpp @@ -1347,3 +1347,102 @@ bool ManagementClient::deleteProxy(const std::string& serviceName) } return false; } +/** + * Get the asset tracking tuple + * for a service and asset name + * + * @param serviceName The serviceName to restrict data fetch + * @param assetName The asset name that belongs to the service + * @param event The associated event type + * @return A vector of pointers to AssetTrackingTuple objects allocated on heap + */ +AssetTrackingTuple* ManagementClient::getAssetTrackingTuple(const std::string& serviceName, + const std::string& assetName, + const std::string& event) +{ + AssetTrackingTuple* tuple = NULL; + try { + string url = "/fledge/track"; + if (serviceName == "" && assetName == "" && event == "") + { + m_logger->error("Failed to fetch asset tracking tuple: " \ + "service name, asset name and event type are required."); + throw new exception(); + } + + url += "?service=" + urlEncode(serviceName); + url += "&asset=" + urlEncode(assetName) + "&event=" + event; + + auto res = this->getHttpClient()->request("GET", url.c_str()); + Document doc; + string response = res->content.string(); + doc.Parse(response.c_str()); + if (doc.HasParseError()) + { + bool httpError = (isdigit(response[0]) && + isdigit(response[1]) && + isdigit(response[2]) && + response[3]==':'); + m_logger->error("%s fetch asset tracking tuple: %s\n", + httpError?"HTTP error during":"Failed to parse result of", + response.c_str()); + throw new exception(); + } + else if (doc.HasMember("message")) + { + m_logger->error("Failed to fetch asset tracking tuple: %s.", + doc["message"].GetString()); + throw new exception(); + } + else + { + const rapidjson::Value& trackArray = doc["track"]; + if (trackArray.IsArray()) + { + // Process every row and create the AssetTrackingTuple object + for (auto& rec : trackArray.GetArray()) + { + if (!rec.IsObject()) + { + throw runtime_error("Expected asset tracker tuple to be an object"); + } + + // Note: deprecatedTimestamp NULL value is returned as "" + // otherwise it's a string DATE + bool deprecated = rec.HasMember("deprecatedTimestamp") && + strlen(rec["deprecatedTimestamp"].GetString()); + + // Create a new AssetTrackingTuple object, to be freed by the caller + tuple = new AssetTrackingTuple(rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); + + m_logger->debug("Adding AssetTracker tuple for service %s: %s:%s:%s, " \ + "deprecated state is %d", + rec["service"].GetString(), + rec["plugin"].GetString(), + rec["asset"].GetString(), + rec["event"].GetString(), + deprecated); + } + } + else + { + throw runtime_error("Expected array of rows in asset track tuples array"); + } + + return tuple; + } + } catch (const SimpleWeb::system_error &e) { + m_logger->error("Fetch/parse of asset tracking tuples for service %s failed: %s.", + serviceName.c_str(), + e.what()); + } catch (...) { + m_logger->error("Unexpected exception when retrieving asset tuples for service %s", + serviceName.c_str()); + } + + return tuple; +} diff --git a/C/services/south/include/ingest.h b/C/services/south/include/ingest.h index a552a31c5..69f08d0a4 100644 --- a/C/services/south/include/ingest.h +++ b/C/services/south/include/ingest.h @@ -67,8 +67,11 @@ class Ingest : public ServiceHandler { void setThreshold(const unsigned int threshold) { m_queueSizeThreshold = threshold; }; void configChange(const std::string&, const std::string&); void configChildCreate(const std::string& , const std::string&, const std::string&){}; - void configChildDelete(const std::string& , const std::string&){}; + void configChildDelete(const std::string& , const std::string&){}; void shutdown() {}; // Satisfy ServiceHandler + void unDeprecateAssetTrackingRecord(AssetTrackingTuple* currentTuple, + const std::string& assetName, + const std::string& event); private: void signalStatsUpdate() { diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index a35eda00c..ae3ec1819 100755 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -268,36 +268,9 @@ Ingest::Ingest(StorageClient& storage, m_data = NULL; m_discardedReadings = 0; m_highLatency = false; - - // populate asset tracking cache - // and get list of assets to un-deprecate - AssetTracker* assetTracker = AssetTracker::getAssetTracker(); - if (assetTracker) - { - vector deprecated = - assetTracker->populateAssetTrackingCache(m_pluginName, "Ingest"); - for (auto d : deprecated) - { - m_logger->debug("Need to un-deprecate asset '%s'", d.c_str()); - - const Condition conditionParams(Equals); - Where * wAsset = new Where("asset", conditionParams, d); - Where *wService = new Where("service", conditionParams, m_serviceName, wAsset); - Where *wEvent = new Where("event", conditionParams, "Ingest", wService); - - InsertValues unDeprecated; - - // Set NULL value - unDeprecated.push_back(InsertValue("deprecated_ts")); - // Update storage with NULL value - int rv = m_storage.updateTable(string("asset_tracker"), unDeprecated, *wEvent); - if (rv < 0) - { - m_logger->error("Failure while un-deprecating asset '%s'", d.c_str()); - } - } - } + // populate asset tracking cache + AssetTracker::getAssetTracker()->populateAssetTrackingCache(m_pluginName, "Ingest"); // Create the stats entry for the service createServiceStatsDbEntry(); @@ -526,7 +499,10 @@ void Ingest::processQueue() m_failCnt = 0; std::map statsEntriesCurrQueue; AssetTracker *tracker = AssetTracker::getAssetTracker(); + + // TODO: save it as member variable string lastAsset = ""; + int *lastStat = NULL; for (vector::iterator it = q->begin(); it != q->end(); ++it) @@ -539,10 +515,21 @@ void Ingest::processQueue() m_pluginName, assetName, "Ingest"); - if (!tracker->checkAssetTrackingCache(tuple)) + + // Check Asset record exists + AssetTrackingTuple* res = tracker->findAssetTrackingCache(tuple); + if (res == NULL) { + // Record non in cache, add it tracker->addAssetTrackingTuple(tuple); } + else + { + // Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, + assetName, + "Ingest"); + } lastAsset = assetName; lastStat = &(statsEntriesCurrQueue[assetName]); (*lastStat)++; @@ -689,7 +676,10 @@ void Ingest::processQueue() // check if this requires addition of a new asset tracker tuple // Remove the Readings in the vector AssetTracker *tracker = AssetTracker::getAssetTracker(); + + // TODO: save it as member variable string lastAsset = ""; + int *lastStat = NULL; for (vector::iterator it = m_data->begin(); it != m_data->end(); ++it) { @@ -701,10 +691,21 @@ void Ingest::processQueue() m_pluginName, assetName, "Ingest"); - if (!tracker->checkAssetTrackingCache(tuple)) + + // Check Asset record exists + AssetTrackingTuple* res = tracker->findAssetTrackingCache(tuple); + if (res == NULL) { + // Record not in cache, add it tracker->addAssetTrackingTuple(tuple); } + else + { + // Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, + assetName, + "Ingest"); + } lastAsset = assetName; lastStat = &statsEntriesCurrQueue[assetName]; (*lastStat)++; @@ -928,3 +929,76 @@ size_t Ingest::queueLength() return len; } + +/** + * Load an up-to-date AssetTracking record for the given parameters + * and un-deprecate AssetTracking record it has been found as deprecated + * Existing cache element is updated + * + * @param currentTuple Current AssetTracking record for given assetName + * @param assetName AssetName to fetch from AssetTracking + * @param event The event type to fetch + */ +void Ingest::unDeprecateAssetTrackingRecord(AssetTrackingTuple* currentTuple, + const string& assetName, + const string& event) +{ + // Get up-to-date Asset Tracking record + AssetTrackingTuple* updatedTuple = + m_mgtClient->getAssetTrackingTuple( + m_serviceName, + assetName, + event); + + if (updatedTuple) + { + if (updatedTuple->isDeprecated()) + { + // Update un-deprecated state in cached object + currentTuple->unDeprecate(); + + m_logger->debug("Asset '%s' is being un-deprecated", + assetName.c_str()); + + // Prepare UPDATE query + const Condition conditionParams(Equals); + Where * wAsset = new Where("asset", + conditionParams, + assetName); + Where *wService = new Where("service", + conditionParams, + m_serviceName, + wAsset); + Where *wEvent = new Where("event", + conditionParams, + event, + wService); + + InsertValues unDeprecated; + + // Set NULL value + unDeprecated.push_back(InsertValue("deprecated_ts")); + + // Update storage with NULL value + int rv = m_storage.updateTable("asset_tracker", + unDeprecated, + *wEvent); + + // Check update operation + if (rv < 0) + { + m_logger->error("Failure while un-deprecating asset '%s'", + assetName.c_str()); + } + } + } + else + { + m_logger->error("Failure to get AssetTracking record " + "for service '%s', asset '%s'", + m_serviceName.c_str(), + assetName.c_str()); + } + + delete updatedTuple; +} From 51d1d24454cf2e19daf42d46ff4f11f552d37400 Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Tue, 2 Aug 2022 09:24:27 +0200 Subject: [PATCH 24/26] Added m_lastAsset member variable Added m_lastAsset member variable --- C/services/south/include/ingest.h | 1 + C/services/south/ingest.cpp | 34 +++++++++++++++++++++---------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/C/services/south/include/ingest.h b/C/services/south/include/ingest.h index 69f08d0a4..b017ba88b 100644 --- a/C/services/south/include/ingest.h +++ b/C/services/south/include/ingest.h @@ -120,6 +120,7 @@ class Ingest : public ServiceHandler { int m_failCnt; bool m_storageFailed; int m_storesFailed; + std::string m_lastAsset; }; #endif diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index ae3ec1819..4c5df3390 100755 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -500,9 +500,7 @@ void Ingest::processQueue() std::map statsEntriesCurrQueue; AssetTracker *tracker = AssetTracker::getAssetTracker(); - // TODO: save it as member variable string lastAsset = ""; - int *lastStat = NULL; for (vector::iterator it = q->begin(); it != q->end(); ++it) @@ -525,11 +523,19 @@ void Ingest::processQueue() } else { - // Un-deprecate asset tracking record - unDeprecateAssetTrackingRecord(res, + if (!m_lastAsset.empty() && + m_lastAsset.compare(assetName)) + { + // Asset name changed: + // Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, assetName, "Ingest"); + } } + // Store last asset + m_lastAsset = assetName; + lastAsset = assetName; lastStat = &(statsEntriesCurrQueue[assetName]); (*lastStat)++; @@ -677,9 +683,7 @@ void Ingest::processQueue() // Remove the Readings in the vector AssetTracker *tracker = AssetTracker::getAssetTracker(); - // TODO: save it as member variable - string lastAsset = ""; - + string lastAsset; int *lastStat = NULL; for (vector::iterator it = m_data->begin(); it != m_data->end(); ++it) { @@ -701,11 +705,19 @@ void Ingest::processQueue() } else { - // Un-deprecate asset tracking record - unDeprecateAssetTrackingRecord(res, - assetName, - "Ingest"); + if (!m_lastAsset.empty() && + m_lastAsset.compare(assetName)) + { + // Asset name changed: + // Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, + assetName, + "Ingest"); + } } + // Store last asset + m_lastAsset = assetName; + lastAsset = assetName; lastStat = &statsEntriesCurrQueue[assetName]; (*lastStat)++; From 785726d9dd2b1d633b2850c3933c628e09cf5ed9 Mon Sep 17 00:00:00 2001 From: Massimiliano Pinto Date: Tue, 2 Aug 2022 11:45:04 +0200 Subject: [PATCH 25/26] String representation fixed String representation fixed --- C/common/include/asset_tracking.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/C/common/include/asset_tracking.h b/C/common/include/asset_tracking.h index 7ea053fdf..b6e2ace2d 100644 --- a/C/common/include/asset_tracking.h +++ b/C/common/include/asset_tracking.h @@ -36,7 +36,7 @@ class AssetTrackingTuple { ", plugin:" << m_pluginName << ", asset:" << m_assetName << ", event:" << m_eventName << - ". deprecated:" << m_deprecated; + ", deprecated:" << m_deprecated; return o.str(); } From ec33e3b5c0a956c19dcd417bf796ff80521a0bb1 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 3 Aug 2022 11:23:20 +0100 Subject: [PATCH 26/26] Remove incorrect optimisation (#764) Signed-off-by: Mark Riddoch --- C/services/south/include/ingest.h | 1 - C/services/south/ingest.cpp | 28 ++++++---------------------- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/C/services/south/include/ingest.h b/C/services/south/include/ingest.h index b017ba88b..69f08d0a4 100644 --- a/C/services/south/include/ingest.h +++ b/C/services/south/include/ingest.h @@ -120,7 +120,6 @@ class Ingest : public ServiceHandler { int m_failCnt; bool m_storageFailed; int m_storesFailed; - std::string m_lastAsset; }; #endif diff --git a/C/services/south/ingest.cpp b/C/services/south/ingest.cpp index 4c5df3390..0ea209c23 100755 --- a/C/services/south/ingest.cpp +++ b/C/services/south/ingest.cpp @@ -523,19 +523,11 @@ void Ingest::processQueue() } else { - if (!m_lastAsset.empty() && - m_lastAsset.compare(assetName)) - { - // Asset name changed: - // Un-deprecate asset tracking record - unDeprecateAssetTrackingRecord(res, + // Possibly Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, assetName, "Ingest"); - } } - // Store last asset - m_lastAsset = assetName; - lastAsset = assetName; lastStat = &(statsEntriesCurrQueue[assetName]); (*lastStat)++; @@ -705,19 +697,11 @@ void Ingest::processQueue() } else { - if (!m_lastAsset.empty() && - m_lastAsset.compare(assetName)) - { - // Asset name changed: - // Un-deprecate asset tracking record - unDeprecateAssetTrackingRecord(res, - assetName, - "Ingest"); - } + // Un-deprecate asset tracking record + unDeprecateAssetTrackingRecord(res, + assetName, + "Ingest"); } - // Store last asset - m_lastAsset = assetName; - lastAsset = assetName; lastStat = &statsEntriesCurrQueue[assetName]; (*lastStat)++;