From 2831bc298ffcc928763abb015bd7c3bdec1347a5 Mon Sep 17 00:00:00 2001 From: "Aapo (f021b) Rista" Date: Mon, 13 Feb 2023 08:50:09 +0200 Subject: [PATCH] Use compression in influxdb requests and cache files --- NuukaAPI/nuuka_client.py | 84 +++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 18 deletions(-) diff --git a/NuukaAPI/nuuka_client.py b/NuukaAPI/nuuka_client.py index bb0c0e2..c1f3d12 100644 --- a/NuukaAPI/nuuka_client.py +++ b/NuukaAPI/nuuka_client.py @@ -1,5 +1,6 @@ import argparse import datetime +import gzip import json import logging import math @@ -106,6 +107,30 @@ def parse_too_many_rows(message: str) -> (int, int): return None, None +def read_cached_data(fpath: Path) -> Union[dict, None]: + """Read cached data from file. Try first fname as-is and if that fails, try fname with .gz appended.""" + try: + with fpath.open("rt") as f: + return json.load(f) + except FileNotFoundError: + pass + try: + with gzip.open(str(fpath) + ".gz", "rt") as f: + return json.load(f) + except FileNotFoundError: + return None + + +def cache_data_to_file(fpath: Path, data: dict, compress: bool = True): + """Save cached data to file. If compress is True, save to fname.gz, otherwise to fname.""" + if compress: + with gzip.open(str(fpath) + ".gz", "wt") as f: + json.dump(data, f) + else: + with fpath.open("wt") as f: + json.dump(data, f) + + class NuukaClient(ABC): """ Base class for Nuuka API clients. Subclasses must implement get_data() method. @@ -113,14 +138,16 @@ class NuukaClient(ABC): def __init__(self): self.args = get_args() - self.measurement_info_fname = f"measurement_info_{self.args.get_measurement_info}.json" + self.measurement_info_fname = None if self.args.get_buildings: with open("buildings.json", "w") as f: json.dump(self.get_buildings(), f, indent=2) elif self.args.get_measurement_info: + self.measurement_info_fname = f"measurement_info_{self.args.get_measurement_info}.json" with open(self.measurement_info_fname, "w") as f: json.dump(self.get_measurement_info(self.args.get_measurement_info), f, indent=2) elif self.args.get_measurement_data: + self.measurement_info_fname = f"measurement_info_{self.args.get_measurement_data}.json" self.start_time, self.end_time, self.timedelta = parse_times( self.args.start_time, self.args.end_time, self.args.timedelta, self.args.round_times ) @@ -156,7 +183,13 @@ def get_data(self): """ building_id = self.args.get_measurement_data if self.args.measurement_ids == ["all"]: - measurement_info = self.get_measurement_info(building_id) + mi_path = Path(self.measurement_info_fname) + if mi_path.exists(): + logging.info(f"Using cached measurement info from {mi_path}") + measurement_info = json.load(mi_path.open("rt")) + else: + logging.info("Getting measurement info from Nuuka REST API") + measurement_info = self.get_measurement_info(building_id) measurement_ids = [data_point["DataPointID"] for data_point in measurement_info] else: measurement_ids = self.args.measurement_ids @@ -190,7 +223,10 @@ def get_measurement_info(self, building_id: str): """ Get measurement info (all available measuring points) from Nuuka API. """ - return self.api_get("GetMeasurementInfo/", {"BuildingID": building_id}, {}) + measurement_info = self.api_get("GetMeasurementInfo/", {"BuildingID": building_id}, {}) + # sort measurement info by DataPointID + measurement_info = sorted(measurement_info, key=lambda x: x["DataPointID"]) + return measurement_info def get_measurement_data( self, building_id: str, data_point_ids: list, start_time: datetime.datetime, end_time: datetime.datetime @@ -215,23 +251,32 @@ def get_measurement_data( def get_data_chunk_from_url(data_point_ids: str, start: datetime.datetime, end: datetime.datetime): params = get_request_params(building_id, start, end) params["DataPointIDs"] = data_point_ids - cache_dir = Path("cache") - cache_dir.mkdir(exist_ok=True) + cache_dir = Path("cache") / Path(building_id) + cache_dir.mkdir(exist_ok=True, parents=True) # remove [-: ] characters from dates using regex start_end = re.sub(r"[-: ]", "", "{}_{}".format(params["StartTime"], params["EndTime"])) fname = "data-{}_{}-{}.json".format(start_end, ids[0], ids[-1]) fpath = cache_dir / fname - if Path(fpath).exists(): - logging.debug(f"Using cached data from file {fname}") - with open(fpath, "r") as f: - data = json.loads(f.read()) - cached = True - else: + logging.debug(f"Using {fpath}") + # Try to read data from cache first + data = read_cached_data(fpath) + if data is None: data = self.api_get("GetMeasurementDataByIDs/", params, {}) - logging.debug(f"Saving data to file {fname}") - with open(fpath, "w") as f: - f.write(json.dumps(data)) cached = False + cache_data_to_file(fpath, data) + else: + cached = True + # if Path(fpath).exists(): + # logging.debug(f"Using cached data from file {fname}") + # with open(fpath, "r") as f: + # data = json.loads(f.read()) + # cached = True + # else: + # data = self.api_get("GetMeasurementDataByIDs/", params, {}) + # logging.debug(f"Saving data to file {fname}") + # with open(fpath, "w") as f: + # f.write(json.dumps(data)) + # cached = False return cached, data # Split list into chunks of max_points items @@ -240,7 +285,7 @@ def get_data_chunk_from_url(data_point_ids: str, start: datetime.datetime, end: point_cnt += len(ids) data_point_ids = ";".join([str(x) for x in ids]) for start, end in times: - # Use recursive splitting if too many rows are returned + # TODO: Use recursive splitting if too many rows are returned cached, data = get_data_chunk_from_url(data_point_ids, start, end) if len(data) == 1 and data[0].get("message", "").startswith("Too many rows"): row_cnt, max_rows = parse_too_many_rows(data[0]["message"]) @@ -257,11 +302,11 @@ def get_data_chunk_from_url(data_point_ids: str, start: datetime.datetime, end: if len(data) == 1 and data[0].get("message", "").startswith("Too many rows"): row_cnt, max_rows = parse_too_many_rows(data[0]["message"]) logging.error("Too many rows {}/{}. Splitting failed".format(row_cnt, max_rows)) - exit(1) + data = [] yield cached, data else: yield cached, data - if point_cnt >= self.args.limit: + if self.args.limit and point_cnt >= self.args.limit: logging.info("Reached limit of {} points".format(self.args.limit)) break @@ -274,7 +319,10 @@ class Nuuka2InfluxDB(NuukaClient): def __init__(self): self.influx_args = self.parse_influxdb_args() self.influxdb_client = InfluxDBClient( - url=self.influx_args.influx_host, org=self.influx_args.influx_org, token=self.influx_args.influx_token + url=self.influx_args.influx_host, + org=self.influx_args.influx_org, + token=self.influx_args.influx_token, + enable_gzip=True, # TODO: this could be optional ) super().__init__() if self.args.get_measurement_info: