Skip to content

Commit

Permalink
FW-526: live cosmos data payload MVP (#10)
Browse files Browse the repository at this point in the history
* Implemented module for retreiving last 2 hours of database values for different tables

* Updated instructions and put debug line behind If statement

* Adding in localstack

* Added quotes on config README
  • Loading branch information
lewis-chambers authored Jan 16, 2025
1 parent 7a63300 commit 89cd8e1
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build/*
dist/*
*.egg*
.coverage
config.cfg
oracle.cfg
aws-auth
*.log
*.certs*
Expand Down
14 changes: 13 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,16 @@
[![tests badge](https://github.com/NERC-CEH/iot-swarm/actions/workflows/test.yml/badge.svg)](https://github.com/NERC-CEH/iot-swarm/actions)
[![docs badge](https://github.com/NERC-CEH/iot-swarm/actions/workflows/doc-deployment.yml/badge.svg)](https://nerc-ceh.github.io/iot-swarm/)

This is a Python package intended to simulate a swarm of IoT device communications via MQTT, enabling stress testing of cloud infrastructure with loads close to production level. [Read the docs](https://nerc-ceh.github.io/iot-swarm/)
This is a Python package intended to simulate a swarm of IoT device communications via MQTT, enabling stress testing of cloud infrastructure with loads close to production level. [Read the docs](https://nerc-ceh.github.io/iot-swarm/)

# Live Cosmos Data

To use the live cosmos data tool you must create a config file in the format:

```yaml
dsn='<dsn>'
user='<username>'
pass='<password>'
```
The module script can then be triggered by running the module with an argument:
`python -m iotswarm.livecosmos <config_src>`
24 changes: 24 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
services:
# local stack container for local testing of AWS services
# intialises with localscript-setup.sh
localstack:
container_name: "swarm_localstack"
image: localstack/localstack:3.4
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- SERVICES=s3,sqs
- DEBUG=${DEBUG:-0}
- PATH=$PATH:/var/lib/localstack/bin
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
- "./bin/localstack-setup.sh:/etc/localstack/init/ready.d/init-aws.sh"
- "./bin:/var/lib/localstack/bin"
# profiles:
# - localstack
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:4566"]
interval: 10s
timeout: 5s
retries: 2
28 changes: 28 additions & 0 deletions src/iotswarm/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from math import nan
import sqlite3
from typing import List
from datetime import datetime

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -200,6 +201,33 @@ async def query_latest_from_site(self, site_id: str, table: CosmosTable) -> dict

return dict(zip(columns, data))

async def query_datetime_gt_from_site(
self, site_id: str, date: datetime, table: CosmosTable
):
"""Returns a list of rows from a table for a specific site where the datetime is greater than
the value given
Args:
site_id: ID of the site to retrieve records from.
date: The date that results are filtered by
table: A valid table from the database
Returns:
List[dict] | None: A list of dicts containing the database columns as keys, and the values as values.
Returns `None` if no data retrieved.
"""

query = self._fill_query(CosmosQuery.ORACLE_DATE_GREATER_THAN, table)

with self.connection.cursor() as cursor:
await cursor.execute(query, site_id=site_id, date_time=date)

columns = [i[0] for i in cursor.description]
data = await cursor.fetchall()

if data:
return [dict(zip(columns, data_row)) for data_row in data]

async def query_site_ids(
self, table: CosmosTable, max_sites: int | None = None
) -> list:
Expand Down
Empty file.
116 changes: 116 additions & 0 deletions src/iotswarm/livecosmos/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""This is the main module invocation for sending live COSMOS data to AWS"""

from config import Config
from pathlib import Path
import sys
from iotswarm.db import Oracle
from iotswarm.queries import CosmosTable
from iotswarm.devices import CR1000XDevice
from iotswarm.messaging.core import MockMessageConnection
import asyncio
from typing import List
from datetime import datetime, timedelta
import logging
import logging.config

logging.config.fileConfig(
fname=Path(__file__).parents[1] / "__assets__" / "loggers.ini"
)

logger = logging.getLogger(__name__)

MOCK_CONNECTION = MockMessageConnection()


async def get_latest_payloads_for_table(
oracle: Oracle, table: CosmosTable, datetime_gt: datetime
) -> List[dict]:
"""Gets all payloads after the datetime for a given Oracle table
Iterates through all sites found in the table and filters by datetimes
after the specified timestamp.
Args:
oracle: The oracle database connection
table: The database table to search
datetime_gt: The datetime that values must be greater than.
Returns:
A list dictionaries where each dictionary is a payload.
"""

sites = await oracle.query_site_ids(table)

logger.debug(f"Found {len(sites)} sites IDs for table: {table}")

payloads = await asyncio.gather(
*[
get_latest_payloads_for_site(oracle, table, datetime_gt, site)
for site in sites
]
)

# Flatten lists and return
return [item for row in payloads for item in row]


async def get_latest_payloads_for_site(
oracle: Oracle, table: CosmosTable, datetime_gt: datetime, site: str
) -> List[dict]:
"""Gets all payloads after the datetime for a given site from an Oracle table.
Args:
oracle: The oracle database connection
table: The database table to search
datetime_gt: The datetime that values must be greater than
site: The name of the site
Returns:
A list dictionaries where each dictionary is a payload.
"""
latest = await oracle.query_datetime_gt_from_site(site, datetime_gt, table)

if not latest:
logger.debug(f"Got 0 rows for site {site} in table: {table}")
return []

device = CR1000XDevice(
device_id=site,
data_source=oracle,
connection=MOCK_CONNECTION,
table=table,
)

logger.debug(f"Got {len(latest)} rows for site {site} in table: {table}")

payloads = [device._format_payload(x) for x in latest]

return payloads


async def main(config_file: Path) -> List[dict]:
"""The main invocation method.
Initialises the Oracle connection and defines which data the query.
Args:
config_file: Path to the *.cfg file that contains oracle credentials.
"""
oracle_creds = Config(str(config_file))

oracle = await Oracle.create(
oracle_creds["dsn"], oracle_creds["user"], oracle_creds["pass"]
)
tables = [CosmosTable.LEVEL_1_SOILMET_30MIN, CosmosTable.LEVEL_1_NMDB_1HOUR]

date_gt = datetime.now() - timedelta(hours=3)
result = await asyncio.gather(
*[get_latest_payloads_for_table(oracle, table, date_gt) for table in tables]
)

table_data = dict(zip(tables, result))
print(table_data)


if __name__ == "__main__":
if len(sys.argv) == 1:
sys.argv.append(str(Path(__file__).parents[3] / "oracle.cfg"))
asyncio.run(main(*sys.argv[1:]))
23 changes: 23 additions & 0 deletions src/iotswarm/livecosmos/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Utility methods for the module"""

import boto3


def get_alphabetically_last_s3_object(s3_client, bucket_name, prefix=""):
"""Returns the alohabetically last object in an s3 bucket"""
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)

last_key = None

for page in pages:
if "Contents" in page:
# The last key in the current page (sorted lexicographically within the page)
page_last_key = page["Contents"][-1]["Key"]

# Update the global last key if this page's last key is greater
if last_key is None or page_last_key > last_key:
last_key = page_last_key
print(page_last_key)

return last_key
14 changes: 14 additions & 0 deletions src/iotswarm/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,17 @@ class CosmosQuery(StrEnum):
SELECT UNQIUE(site_id) FROM <table>
"""

ORACLE_DATE_GREATER_THAN = """SELECT * FROM COSMOS.{table}
WHERE site_id = :site_id
AND date_time > :date_time"""

"""Query for retreiving data from a given table in oracle format
that is greater than a given datetime.
.. code-block:: sql
SELECT * FROM <table>
WHERE site_id = <site_id>
AND date_time > <date_time>
"""
1 change: 1 addition & 0 deletions tests/livecosmos/test_livecosmos_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import unittest

0 comments on commit 89cd8e1

Please sign in to comment.