diff --git a/.gitignore b/.gitignore index 9feef4b..0bab246 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,7 @@ build/* dist/* *.egg* .coverage -config.cfg +oracle.cfg aws-auth *.log *.certs* diff --git a/README.MD b/README.MD index a5f1bb1..80d3dee 100644 --- a/README.MD +++ b/README.MD @@ -2,4 +2,16 @@ [![tests badge](https://github.com/NERC-CEH/iot-swarm/actions/workflows/test.yml/badge.svg)](https://github.com/NERC-CEH/iot-swarm/actions) [![docs badge](https://github.com/NERC-CEH/iot-swarm/actions/workflows/doc-deployment.yml/badge.svg)](https://nerc-ceh.github.io/iot-swarm/) -This is a Python package intended to simulate a swarm of IoT device communications via MQTT, enabling stress testing of cloud infrastructure with loads close to production level. [Read the docs](https://nerc-ceh.github.io/iot-swarm/) \ No newline at end of file +This is a Python package intended to simulate a swarm of IoT device communications via MQTT, enabling stress testing of cloud infrastructure with loads close to production level. [Read the docs](https://nerc-ceh.github.io/iot-swarm/) + +# Live Cosmos Data + +To use the live cosmos data tool you must create a config file in the format: + +```yaml +dsn='' +user='' +pass='' +``` +The module script can then be triggered by running the module with an argument: +`python -m iotswarm.livecosmos ` \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..3e40cbb --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,24 @@ +services: + # local stack container for local testing of AWS services + # intialises with localscript-setup.sh + localstack: + container_name: "swarm_localstack" + image: localstack/localstack:3.4 + ports: + - "127.0.0.1:4566:4566" # LocalStack Gateway + - "127.0.0.1:4510-4559:4510-4559" # external services port range + environment: + - SERVICES=s3,sqs + - DEBUG=${DEBUG:-0} + - PATH=$PATH:/var/lib/localstack/bin + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" + - "./bin/localstack-setup.sh:/etc/localstack/init/ready.d/init-aws.sh" + - "./bin:/var/lib/localstack/bin" + # profiles: + # - localstack + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4566"] + interval: 10s + timeout: 5s + retries: 2 \ No newline at end of file diff --git a/src/iotswarm/db.py b/src/iotswarm/db.py index 62a75cb..1862700 100644 --- a/src/iotswarm/db.py +++ b/src/iotswarm/db.py @@ -13,6 +13,7 @@ from math import nan import sqlite3 from typing import List +from datetime import datetime logger = logging.getLogger(__name__) @@ -200,6 +201,33 @@ async def query_latest_from_site(self, site_id: str, table: CosmosTable) -> dict return dict(zip(columns, data)) + async def query_datetime_gt_from_site( + self, site_id: str, date: datetime, table: CosmosTable + ): + """Returns a list of rows from a table for a specific site where the datetime is greater than + the value given + + Args: + site_id: ID of the site to retrieve records from. + date: The date that results are filtered by + table: A valid table from the database + + Returns: + List[dict] | None: A list of dicts containing the database columns as keys, and the values as values. + Returns `None` if no data retrieved. + """ + + query = self._fill_query(CosmosQuery.ORACLE_DATE_GREATER_THAN, table) + + with self.connection.cursor() as cursor: + await cursor.execute(query, site_id=site_id, date_time=date) + + columns = [i[0] for i in cursor.description] + data = await cursor.fetchall() + + if data: + return [dict(zip(columns, data_row)) for data_row in data] + async def query_site_ids( self, table: CosmosTable, max_sites: int | None = None ) -> list: diff --git a/src/iotswarm/livecosmos/__init__.py b/src/iotswarm/livecosmos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/iotswarm/livecosmos/__main__.py b/src/iotswarm/livecosmos/__main__.py new file mode 100644 index 0000000..faedf07 --- /dev/null +++ b/src/iotswarm/livecosmos/__main__.py @@ -0,0 +1,116 @@ +"""This is the main module invocation for sending live COSMOS data to AWS""" + +from config import Config +from pathlib import Path +import sys +from iotswarm.db import Oracle +from iotswarm.queries import CosmosTable +from iotswarm.devices import CR1000XDevice +from iotswarm.messaging.core import MockMessageConnection +import asyncio +from typing import List +from datetime import datetime, timedelta +import logging +import logging.config + +logging.config.fileConfig( + fname=Path(__file__).parents[1] / "__assets__" / "loggers.ini" +) + +logger = logging.getLogger(__name__) + +MOCK_CONNECTION = MockMessageConnection() + + +async def get_latest_payloads_for_table( + oracle: Oracle, table: CosmosTable, datetime_gt: datetime +) -> List[dict]: + """Gets all payloads after the datetime for a given Oracle table + Iterates through all sites found in the table and filters by datetimes + after the specified timestamp. + + Args: + oracle: The oracle database connection + table: The database table to search + datetime_gt: The datetime that values must be greater than. + + Returns: + A list dictionaries where each dictionary is a payload. + """ + + sites = await oracle.query_site_ids(table) + + logger.debug(f"Found {len(sites)} sites IDs for table: {table}") + + payloads = await asyncio.gather( + *[ + get_latest_payloads_for_site(oracle, table, datetime_gt, site) + for site in sites + ] + ) + + # Flatten lists and return + return [item for row in payloads for item in row] + + +async def get_latest_payloads_for_site( + oracle: Oracle, table: CosmosTable, datetime_gt: datetime, site: str +) -> List[dict]: + """Gets all payloads after the datetime for a given site from an Oracle table. + + Args: + oracle: The oracle database connection + table: The database table to search + datetime_gt: The datetime that values must be greater than + site: The name of the site + + Returns: + A list dictionaries where each dictionary is a payload. + """ + latest = await oracle.query_datetime_gt_from_site(site, datetime_gt, table) + + if not latest: + logger.debug(f"Got 0 rows for site {site} in table: {table}") + return [] + + device = CR1000XDevice( + device_id=site, + data_source=oracle, + connection=MOCK_CONNECTION, + table=table, + ) + + logger.debug(f"Got {len(latest)} rows for site {site} in table: {table}") + + payloads = [device._format_payload(x) for x in latest] + + return payloads + + +async def main(config_file: Path) -> List[dict]: + """The main invocation method. + Initialises the Oracle connection and defines which data the query. + + Args: + config_file: Path to the *.cfg file that contains oracle credentials. + """ + oracle_creds = Config(str(config_file)) + + oracle = await Oracle.create( + oracle_creds["dsn"], oracle_creds["user"], oracle_creds["pass"] + ) + tables = [CosmosTable.LEVEL_1_SOILMET_30MIN, CosmosTable.LEVEL_1_NMDB_1HOUR] + + date_gt = datetime.now() - timedelta(hours=3) + result = await asyncio.gather( + *[get_latest_payloads_for_table(oracle, table, date_gt) for table in tables] + ) + + table_data = dict(zip(tables, result)) + print(table_data) + + +if __name__ == "__main__": + if len(sys.argv) == 1: + sys.argv.append(str(Path(__file__).parents[3] / "oracle.cfg")) + asyncio.run(main(*sys.argv[1:])) diff --git a/src/iotswarm/livecosmos/utils.py b/src/iotswarm/livecosmos/utils.py new file mode 100644 index 0000000..6ff2662 --- /dev/null +++ b/src/iotswarm/livecosmos/utils.py @@ -0,0 +1,23 @@ +"""Utility methods for the module""" + +import boto3 + + +def get_alphabetically_last_s3_object(s3_client, bucket_name, prefix=""): + """Returns the alohabetically last object in an s3 bucket""" + paginator = s3_client.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix) + + last_key = None + + for page in pages: + if "Contents" in page: + # The last key in the current page (sorted lexicographically within the page) + page_last_key = page["Contents"][-1]["Key"] + + # Update the global last key if this page's last key is greater + if last_key is None or page_last_key > last_key: + last_key = page_last_key + print(page_last_key) + + return last_key \ No newline at end of file diff --git a/src/iotswarm/queries.py b/src/iotswarm/queries.py index 7b57459..68e2aa7 100644 --- a/src/iotswarm/queries.py +++ b/src/iotswarm/queries.py @@ -63,3 +63,17 @@ class CosmosQuery(StrEnum): SELECT UNQIUE(site_id) FROM """ + + ORACLE_DATE_GREATER_THAN = """SELECT * FROM COSMOS.{table} +WHERE site_id = :site_id +AND date_time > :date_time""" + + """Query for retreiving data from a given table in oracle format + that is greater than a given datetime. + + .. code-block:: sql + + SELECT * FROM
+ WHERE site_id = + AND date_time > + """ diff --git a/tests/livecosmos/test_livecosmos_aws.py b/tests/livecosmos/test_livecosmos_aws.py new file mode 100644 index 0000000..9ca3c61 --- /dev/null +++ b/tests/livecosmos/test_livecosmos_aws.py @@ -0,0 +1 @@ +import unittest \ No newline at end of file