Skip to content

Commit

Permalink
Use compression in influxdb requests and cache files
Browse files Browse the repository at this point in the history
  • Loading branch information
aapris committed Feb 13, 2023
1 parent 4f910df commit 2831bc2
Showing 1 changed file with 66 additions and 18 deletions.
84 changes: 66 additions & 18 deletions NuukaAPI/nuuka_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import datetime
import gzip
import json
import logging
import math
Expand Down Expand Up @@ -106,21 +107,47 @@ def parse_too_many_rows(message: str) -> (int, int):
return None, None


def read_cached_data(fpath: Path) -> Union[dict, None]:
"""Read cached data from file. Try first fname as-is and if that fails, try fname with .gz appended."""
try:
with fpath.open("rt") as f:
return json.load(f)
except FileNotFoundError:
pass
try:
with gzip.open(str(fpath) + ".gz", "rt") as f:
return json.load(f)
except FileNotFoundError:
return None


def cache_data_to_file(fpath: Path, data: dict, compress: bool = True):
"""Save cached data to file. If compress is True, save to fname.gz, otherwise to fname."""
if compress:
with gzip.open(str(fpath) + ".gz", "wt") as f:
json.dump(data, f)
else:
with fpath.open("wt") as f:
json.dump(data, f)


class NuukaClient(ABC):
"""
Base class for Nuuka API clients. Subclasses must implement get_data() method.
"""

def __init__(self):
self.args = get_args()
self.measurement_info_fname = f"measurement_info_{self.args.get_measurement_info}.json"
self.measurement_info_fname = None
if self.args.get_buildings:
with open("buildings.json", "w") as f:
json.dump(self.get_buildings(), f, indent=2)
elif self.args.get_measurement_info:
self.measurement_info_fname = f"measurement_info_{self.args.get_measurement_info}.json"
with open(self.measurement_info_fname, "w") as f:
json.dump(self.get_measurement_info(self.args.get_measurement_info), f, indent=2)
elif self.args.get_measurement_data:
self.measurement_info_fname = f"measurement_info_{self.args.get_measurement_data}.json"
self.start_time, self.end_time, self.timedelta = parse_times(
self.args.start_time, self.args.end_time, self.args.timedelta, self.args.round_times
)
Expand Down Expand Up @@ -156,7 +183,13 @@ def get_data(self):
"""
building_id = self.args.get_measurement_data
if self.args.measurement_ids == ["all"]:
measurement_info = self.get_measurement_info(building_id)
mi_path = Path(self.measurement_info_fname)
if mi_path.exists():
logging.info(f"Using cached measurement info from {mi_path}")
measurement_info = json.load(mi_path.open("rt"))
else:
logging.info("Getting measurement info from Nuuka REST API")
measurement_info = self.get_measurement_info(building_id)
measurement_ids = [data_point["DataPointID"] for data_point in measurement_info]
else:
measurement_ids = self.args.measurement_ids
Expand Down Expand Up @@ -190,7 +223,10 @@ def get_measurement_info(self, building_id: str):
"""
Get measurement info (all available measuring points) from Nuuka API.
"""
return self.api_get("GetMeasurementInfo/", {"BuildingID": building_id}, {})
measurement_info = self.api_get("GetMeasurementInfo/", {"BuildingID": building_id}, {})
# sort measurement info by DataPointID
measurement_info = sorted(measurement_info, key=lambda x: x["DataPointID"])
return measurement_info

def get_measurement_data(
self, building_id: str, data_point_ids: list, start_time: datetime.datetime, end_time: datetime.datetime
Expand All @@ -215,23 +251,32 @@ def get_measurement_data(
def get_data_chunk_from_url(data_point_ids: str, start: datetime.datetime, end: datetime.datetime):
params = get_request_params(building_id, start, end)
params["DataPointIDs"] = data_point_ids
cache_dir = Path("cache")
cache_dir.mkdir(exist_ok=True)
cache_dir = Path("cache") / Path(building_id)
cache_dir.mkdir(exist_ok=True, parents=True)
# remove [-: ] characters from dates using regex
start_end = re.sub(r"[-: ]", "", "{}_{}".format(params["StartTime"], params["EndTime"]))
fname = "data-{}_{}-{}.json".format(start_end, ids[0], ids[-1])
fpath = cache_dir / fname
if Path(fpath).exists():
logging.debug(f"Using cached data from file {fname}")
with open(fpath, "r") as f:
data = json.loads(f.read())
cached = True
else:
logging.debug(f"Using {fpath}")
# Try to read data from cache first
data = read_cached_data(fpath)
if data is None:
data = self.api_get("GetMeasurementDataByIDs/", params, {})
logging.debug(f"Saving data to file {fname}")
with open(fpath, "w") as f:
f.write(json.dumps(data))
cached = False
cache_data_to_file(fpath, data)
else:
cached = True
# if Path(fpath).exists():
# logging.debug(f"Using cached data from file {fname}")
# with open(fpath, "r") as f:
# data = json.loads(f.read())
# cached = True
# else:
# data = self.api_get("GetMeasurementDataByIDs/", params, {})
# logging.debug(f"Saving data to file {fname}")
# with open(fpath, "w") as f:
# f.write(json.dumps(data))
# cached = False
return cached, data

# Split list into chunks of max_points items
Expand All @@ -240,7 +285,7 @@ def get_data_chunk_from_url(data_point_ids: str, start: datetime.datetime, end:
point_cnt += len(ids)
data_point_ids = ";".join([str(x) for x in ids])
for start, end in times:
# Use recursive splitting if too many rows are returned
# TODO: Use recursive splitting if too many rows are returned
cached, data = get_data_chunk_from_url(data_point_ids, start, end)
if len(data) == 1 and data[0].get("message", "").startswith("Too many rows"):
row_cnt, max_rows = parse_too_many_rows(data[0]["message"])
Expand All @@ -257,11 +302,11 @@ def get_data_chunk_from_url(data_point_ids: str, start: datetime.datetime, end:
if len(data) == 1 and data[0].get("message", "").startswith("Too many rows"):
row_cnt, max_rows = parse_too_many_rows(data[0]["message"])
logging.error("Too many rows {}/{}. Splitting failed".format(row_cnt, max_rows))
exit(1)
data = []
yield cached, data
else:
yield cached, data
if point_cnt >= self.args.limit:
if self.args.limit and point_cnt >= self.args.limit:
logging.info("Reached limit of {} points".format(self.args.limit))
break

Expand All @@ -274,7 +319,10 @@ class Nuuka2InfluxDB(NuukaClient):
def __init__(self):
self.influx_args = self.parse_influxdb_args()
self.influxdb_client = InfluxDBClient(
url=self.influx_args.influx_host, org=self.influx_args.influx_org, token=self.influx_args.influx_token
url=self.influx_args.influx_host,
org=self.influx_args.influx_org,
token=self.influx_args.influx_token,
enable_gzip=True, # TODO: this could be optional
)
super().__init__()
if self.args.get_measurement_info:
Expand Down

0 comments on commit 2831bc2

Please sign in to comment.