From f964200b2a29cd21b9ad467d6be2ceb06a99385e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 23 Dec 2024 09:48:13 +0000 Subject: [PATCH] Auto-format code with Black and isort --- .test/solis_cloud_test.py | 55 +++- apps/pv_opt/.test.py | 25 +- apps/pv_opt/pv_opt.py | 648 ++++++++++++++++++++++++++++---------- apps/pv_opt/pvpy.py | 281 +++++++++++++---- apps/pv_opt/solax.py | 47 ++- apps/pv_opt/solis.py | 214 ++++++++++--- apps/pv_opt/sunsynk.py | 66 +++- 7 files changed, 1011 insertions(+), 325 deletions(-) diff --git a/.test/solis_cloud_test.py b/.test/solis_cloud_test.py index bd8be68..439aab2 100644 --- a/.test/solis_cloud_test.py +++ b/.test/solis_cloud_test.py @@ -1,13 +1,14 @@ # %% +import base64 import hashlib import hmac -import base64 import json import re -import requests -from http import HTTPStatus from datetime import datetime, timezone +from http import HTTPStatus + import pandas as pd +import requests # def getInverterList(config): # body = getBody(stationId=config['plantId']) @@ -62,7 +63,9 @@ def get_body(self, **params): return body def digest(self, body: str) -> str: - return base64.b64encode(hashlib.md5(body.encode("utf-8")).digest()).decode("utf-8") + return base64.b64encode(hashlib.md5(body.encode("utf-8")).digest()).decode( + "utf-8" + ) def header(self, body: str, canonicalized_resource: str) -> dict[str, str]: content_md5 = self.digest(body) @@ -71,8 +74,22 @@ def header(self, body: str, canonicalized_resource: str) -> dict[str, str]: now = datetime.now(timezone.utc) date = now.strftime("%a, %d %b %Y %H:%M:%S GMT") - encrypt_str = "POST" + "\n" + content_md5 + "\n" + content_type + "\n" + date + "\n" + canonicalized_resource - hmac_obj = hmac.new(self.key_secret.encode("utf-8"), msg=encrypt_str.encode("utf-8"), digestmod=hashlib.sha1) + encrypt_str = ( + "POST" + + "\n" + + content_md5 + + "\n" + + content_type + + "\n" + + date + + "\n" + + canonicalized_resource + ) + hmac_obj = hmac.new( + self.key_secret.encode("utf-8"), + msg=encrypt_str.encode("utf-8"), + digestmod=hashlib.sha1, + ) sign = base64.b64encode(hmac_obj.digest()) authorization = "API " + self.key_id + ":" + sign.decode("utf-8") @@ -88,7 +105,9 @@ def header(self, body: str, canonicalized_resource: str) -> dict[str, str]: def inverter_id(self): body = self.get_body(stationId=self.plant_id) header = self.header(body, self.URLS["inverterList"]) - response = requests.post(self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header + ) if response.status_code == HTTPStatus.OK: return response.json()["data"]["page"]["records"][0].get("id", "") @@ -96,7 +115,9 @@ def inverter_id(self): def inverter_sn(self): body = self.get_body(stationId=self.plant_id) header = self.header(body, self.URLS["inverterList"]) - response = requests.post(self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header + ) if response.status_code == HTTPStatus.OK: return response.json()["data"]["page"]["records"][0].get("sn", "") @@ -104,7 +125,9 @@ def inverter_sn(self): def inverter_details(self): body = self.get_body(id=self.inverter_id, sn=self.inverter_sn) header = self.header(body, self.URLS["inverterDetail"]) - response = requests.post(self.URLS["root"] + self.URLS["inverterDetail"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["inverterDetail"], data=body, headers=header + ) if response.status_code == HTTPStatus.OK: return response.json()["data"] @@ -125,7 +148,9 @@ def set_code(self, cid, value): body = self.get_body(inverterSn=self.inverter_sn, cid=cid, value=value) headers = self.header(body, self.URLS["control"]) headers["token"] = self.token - response = requests.post(self.URLS["root"] + self.URLS["control"], data=body, headers=headers) + response = requests.post( + self.URLS["root"] + self.URLS["control"], data=body, headers=headers + ) if response.status_code == HTTPStatus.OK: return response.json() @@ -137,14 +162,18 @@ def read_code(self, cid): body = self.get_body(inverterSn=self.inverter_sn, cid=cid) headers = self.header(body, self.URLS["atRead"]) headers["token"] = self.token - response = requests.post(self.URLS["root"] + self.URLS["atRead"], data=body, headers=headers) + response = requests.post( + self.URLS["root"] + self.URLS["atRead"], data=body, headers=headers + ) if response.status_code == HTTPStatus.OK: return response.json()["data"]["msg"] def login(self): body = self.get_body(username=self.username, password=self.md5passowrd) header = self.header(body, self.URLS["login"]) - response = requests.post(self.URLS["root"] + self.URLS["login"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["login"], data=body, headers=header + ) status = response.status_code if status == HTTPStatus.OK: result = response.json() @@ -227,7 +256,7 @@ def set_timer(self, direction, start, end, power): # %% sc.set_mode_switch(35) # %% -r=sc.set_code(696, 3600) +r = sc.set_code(696, 3600) # %% # %% diff --git a/apps/pv_opt/.test.py b/apps/pv_opt/.test.py index 09a7f05..5e8c521 100644 --- a/apps/pv_opt/.test.py +++ b/apps/pv_opt/.test.py @@ -1,16 +1,15 @@ # %% -import pandas as pd +from datetime import datetime, time + +import matplotlib.pyplot as plt import numpy as np +import pandas as pd +import pvpy as pvpy import requests -from datetime import datetime -from datetime import time -import matplotlib.pyplot as plt import yaml from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS -import pvpy as pvpy - entities = [ "pv_total_power", "battery_input_energy", @@ -39,9 +38,19 @@ result = query_api.query(org=org, query=query) # Process the results - data = [{"Time": record.get_time(), "Value": record.get_value()} for record in result[-1].records] + data = [ + {"Time": record.get_time(), "Value": record.get_value()} + for record in result[-1].records + ] series += [pd.DataFrame(data)] - series[-1] = series[-1].set_index("Time").resample("1min").mean().fillna(0)["Value"].rename(entity) + series[-1] = ( + series[-1] + .set_index("Time") + .resample("1min") + .mean() + .fillna(0)["Value"] + .rename(entity) + ) df = pd.concat(series, axis=1) df = df.resample("5min").mean() # Close the client diff --git a/apps/pv_opt/pv_opt.py b/apps/pv_opt/pv_opt.py index 932f468..4c6463f 100644 --- a/apps/pv_opt/pv_opt.py +++ b/apps/pv_opt/pv_opt.py @@ -1,16 +1,15 @@ # %% -import appdaemon.plugins.hass.hassapi as hass -import appdaemon.adbase as ad -import appdaemon.plugins.mqtt.mqttapi as mqtt +import re +import time from json import dumps +import appdaemon.adbase as ad +import appdaemon.plugins.hass.hassapi as hass +import appdaemon.plugins.mqtt.mqttapi as mqtt +import numpy as np import pandas as pd -import time - import pvpy as pv -import numpy as np from numpy import nan -import re VERSION = "3.18.5" @@ -336,7 +335,9 @@ }, "solar_forecast": { "default": "Solcast", - "attributes": {"options": ["Solcast", "Solcast_p10", "Solcast_p90", "Weighted"]}, + "attributes": { + "options": ["Solcast", "Solcast_p10", "Solcast_p90", "Weighted"] + }, "domain": "select", }, "id_solcast_today": {"default": "sensor.solcast_pv_forecast_forecast_today"}, @@ -497,7 +498,9 @@ def initialize(self): retry_count = 0 while (not self.inverter.is_online()) and (retry_count < ONLINE_RETRIES): - self.log("Inverter controller appears not to be running. Waiting 5 secomds to re-try") + self.log( + "Inverter controller appears not to be running. Waiting 5 secomds to re-try" + ) time.sleep(5) retry_count += 1 @@ -532,7 +535,10 @@ def initialize(self): # self._estimate_capacity() self._load_pv_system_model() self._load_contract() - self.ev = self.get_config("ev_charger") in DEFAULT_CONFIG["ev_charger"]["attributes"]["options"][1:] + self.ev = ( + self.get_config("ev_charger") + in DEFAULT_CONFIG["ev_charger"]["attributes"]["options"][1:] + ) self._check_for_zappi() if self.get_config("alt_tariffs") is not None: @@ -561,7 +567,9 @@ def initialize(self): if self.debug: self.log(f"PV Opt Initialisation complete. Listen_state Handles:") for id in self.handles: - self.log(f" {id} {self.handles[id]} {self.info_listen_state(self.handles[id])}") + self.log( + f" {id} {self.handles[id]} {self.info_listen_state(self.handles[id])}" + ) @ad.app_lock def _run_test(self): @@ -624,7 +632,9 @@ def _get_io(self): self.rlog(f" {k:20s} {self.io_dispatch_attrib[k]}") for k in [x for x in self.io_dispatch_attrib.keys() if "dispatches" in x]: - self.log(f" {k:20s} {'Start':20s} {'End':20s} {'Charge':12s} {'Source':12s}") + self.log( + f" {k:20s} {'Start':20s} {'End':20s} {'Charge':12s} {'Source':12s}" + ) self.log(f" {'-'*20} {'-'*20} {'-'*20} {'-'*12} {'-'*12} ") for z in self.io_dispatch_attrib[k]: self.log( @@ -635,7 +645,13 @@ def _get_io(self): def _check_for_zappi(self): self.ulog("Checking for Zappi Sensors") sensor_entities = self.get_state("sensor") - self.zappi_entities = [k for k in sensor_entities if "zappi" in k for x in ["charge_added_session"] if x in k] + self.zappi_entities = [ + k + for k in sensor_entities + if "zappi" in k + for x in ["charge_added_session"] + if x in k + ] if len(self.zappi_entities) > 0: for entity_id in self.zappi_entities: zappi_sn = entity_id.split("_")[2] @@ -648,7 +664,9 @@ def _check_for_zappi(self): def _get_zappi(self, start, end, log=False): df = pd.DataFrame() for entity_id in self.zappi_entities: - df = self._get_hass_power_from_daily_kwh(entity_id, start=start, end=end, log=log) + df = self._get_hass_power_from_daily_kwh( + entity_id, start=start, end=end, log=log + ) if log: self.rlog(f">>> Zappi entity {entity_id}") self.log(f">>>\n{df.to_string()}") @@ -705,8 +723,12 @@ def _load_inverter(self): if self.inverter_type in INVERTER_TYPES: inverter_brand = self.inverter_type.split("_")[0].lower() InverterController = importName(f"{inverter_brand}", "InverterController") - self.log(f"Inverter type: {self.inverter_type}: inverter module: {inverter_brand}.py") - self.inverter = InverterController(inverter_type=self.inverter_type, host=self) + self.log( + f"Inverter type: {self.inverter_type}: inverter module: {inverter_brand}.py" + ) + self.inverter = InverterController( + inverter_type=self.inverter_type, host=self + ) self.log(f" Device name: {self.device_name}") self.rlog(f" Serial number: {self.inverter_sn}") @@ -728,10 +750,14 @@ def _load_pv_system_model(self): self.battery_model = pv.BatteryModel( capacity=self.get_config("battery_capacity_wh"), max_dod=self.get_config("maximum_dod_percent", 15) / 100, - current_limit_amps=self.get_config("battery_current_limit_amps", default=100), + current_limit_amps=self.get_config( + "battery_current_limit_amps", default=100 + ), voltage=self.get_config("battery_voltage", default=50), ) - self.pv_system = pv.PVsystemModel("PV_Opt", self.inverter_model, self.battery_model, host=self) + self.pv_system = pv.PVsystemModel( + "PV_Opt", self.inverter_model, self.battery_model, host=self + ) # def _setup_agile_schedule(self): # start = (pd.Timestamp.now(tz="UTC") + pd.Timedelta(1, "minutes")).to_pydatetime() @@ -742,7 +768,9 @@ def _load_pv_system_model(self): # ) def _setup_compare_schedule(self): - start = (pd.Timestamp.now(tz="UTC").ceil("60min") - pd.Timedelta("2min")).to_pydatetime() + start = ( + pd.Timestamp.now(tz="UTC").ceil("60min") - pd.Timedelta("2min") + ).to_pydatetime() self.timer_handle = self.run_every( self._compare_tariff_cb, start=start, @@ -774,7 +802,9 @@ def _cost_actual(self, **kwargs): grid = grid.set_axis(cols, axis=1).fillna(0) grid["grid_export"] *= -1 - cost_today = self.contract.net_cost(grid_flow=grid, log=self.debug, day_ahead=False) + cost_today = self.contract.net_cost( + grid_flow=grid, log=self.debug, day_ahead=False + ) return cost_today @@ -787,13 +817,23 @@ def get_config(self, item, default=None): return self._value_from_state(self.config_state[item]) if item in self.config: - if isinstance(self.config[item], str) and self.entity_exists(self.config[item]): + if isinstance(self.config[item], str) and self.entity_exists( + self.config[item] + ): x = self.get_ha_value(entity_id=self.config[item]) return x elif isinstance(self.config[item], list): if min([isinstance(x, str)] for x in self.config[item])[0]: - if min([self.entity_exists(entity_id=entity_id) for entity_id in self.config[item]]): - l = [self.get_ha_value(entity_id=entity_id) for entity_id in self.config[item]] + if min( + [ + self.entity_exists(entity_id=entity_id) + for entity_id in self.config[item] + ] + ): + l = [ + self.get_ha_value(entity_id=entity_id) + for entity_id in self.config[item] + ] try: return sum(l) except: @@ -806,7 +846,11 @@ def get_config(self, item, default=None): return default def _setup_schedule(self): - start_opt = pd.Timestamp.now().ceil(f"{self.get_config('optimise_frequency_minutes')}min").to_pydatetime() + start_opt = ( + pd.Timestamp.now() + .ceil(f"{self.get_config('optimise_frequency_minutes')}min") + .to_pydatetime() + ) self.timer_handle = self.run_every( self.optimise_time, start=start_opt, @@ -838,21 +882,30 @@ def _load_contract(self): octopus_entities = [ name - for name in self.get_state_retry(BOTTLECAP_DAVE["domain"]).keys() - if ("octopus_energy_electricity" in name and BOTTLECAP_DAVE["rates"] in name) + for name in self.get_state_retry( + BOTTLECAP_DAVE["domain"] + ).keys() + if ( + "octopus_energy_electricity" in name + and BOTTLECAP_DAVE["rates"] in name + ) ] entities = {} - entities["import"] = [x for x in octopus_entities if not "export" in x] + entities["import"] = [ + x for x in octopus_entities if not "export" in x + ] entities["export"] = [x for x in octopus_entities if "export" in x] for imp_exp in IMPEXP: for entity in entities[imp_exp]: - tariff_code = self.get_state_retry(entity, attribute="all")["attributes"].get( - BOTTLECAP_DAVE["tariff_code"], None - ) + tariff_code = self.get_state_retry(entity, attribute="all")[ + "attributes" + ].get(BOTTLECAP_DAVE["tariff_code"], None) - self.rlog(f" Found {imp_exp} entity {entity}: Tariff code: {tariff_code}") + self.rlog( + f" Found {imp_exp} entity {entity}: Tariff code: {tariff_code}" + ) tariffs = {x: None for x in IMPEXP} for imp_exp in IMPEXP: @@ -860,9 +913,9 @@ def _load_contract(self): self.log(f">>>{imp_exp}: {entities[imp_exp]}") if len(entities[imp_exp]) > 0: for entity in entities[imp_exp]: - tariff_code = self.get_state_retry(entity, attribute="all")["attributes"].get( - BOTTLECAP_DAVE["tariff_code"], None - ) + tariff_code = self.get_state_retry( + entity, attribute="all" + )["attributes"].get(BOTTLECAP_DAVE["tariff_code"], None) if self.debug: self.log(f">>>_load_contract {tariff_code}") @@ -876,7 +929,9 @@ def _load_contract(self): if "AGILE" in tariff_code: self.agile = True # Tariff is Octopus Agile if "INTELLI" in tariff_code: - self.intelligent = True # Tariff is Octopus Intelligent + self.intelligent = ( + True # Tariff is Octopus Intelligent + ) self.contract = pv.Contract( "current", @@ -898,8 +953,12 @@ def _load_contract(self): self.contract = None if self.contract is None: - if ("octopus_account" in self.config) and ("octopus_api_key" in self.config): - if (self.config["octopus_account"] is not None) and (self.config["octopus_api_key"] is not None): + if ("octopus_account" in self.config) and ( + "octopus_api_key" in self.config + ): + if (self.config["octopus_account"] is not None) and ( + self.config["octopus_api_key"] is not None + ): for x in ["octopus_account", "octopus_api_key"]: if self.config[x] not in self.redact_regex: self.redact_regex.append(x) @@ -938,7 +997,9 @@ def _load_contract(self): "octopus_import_tariff_code" in self.config and self.config["octopus_import_tariff_code"] is not None ): - self.rlog(f"Trying to load tariff codes: Import: {self.config['octopus_import_tariff_code']}") + self.rlog( + f"Trying to load tariff codes: Import: {self.config['octopus_import_tariff_code']}" + ) # try: # First load the import as we always need that tariffs["import"] = pv.Tariff( @@ -951,7 +1012,9 @@ def _load_contract(self): if tariffs["import"] is not None: if "octopus_export_tariff_code" in self.config: - self.rlog(f"Trying to load tariff codes: Export: {self.config['octopus_export_tariff_code']}") + self.rlog( + f"Trying to load tariff codes: Export: {self.config['octopus_export_tariff_code']}" + ) tariffs["export"] = pv.Tariff( self.config[f"octopus_export_tariff_code"], export=False, @@ -966,13 +1029,17 @@ def _load_contract(self): exp=tariffs["export"], host=self, ) - self.rlog("Contract tariffs loaded OK from Tariff Codes / Manual Spec") + self.rlog( + "Contract tariffs loaded OK from Tariff Codes / Manual Spec" + ) # except Exception as e: # self.rlog(f"Unable to load Tariff Codes {e}", level="ERROR") if self.contract is None: i += 1 - self.rlog(f"Failed to load contact - Attempt {i} of {n}. Waiting 2 minutes to re-try") + self.rlog( + f"Failed to load contact - Attempt {i} of {n}. Waiting 2 minutes to re-try" + ) time.sleep(12) if self.contract is None: @@ -987,7 +1054,9 @@ def _load_contract(self): else: self.contract_last_loaded = pd.Timestamp.now(tz="UTC") if self.contract.tariffs["export"] is None: - self.contract.tariffs["export"] = pv.Tariff("None", export=True, unit=0, octopus=False, host=self) + self.contract.tariffs["export"] = pv.Tariff( + "None", export=True, unit=0, octopus=False, host=self + ) self.rlog("") self._load_saving_events() @@ -1057,24 +1126,34 @@ def _check_tariffs(self): def _load_saving_events(self): if ( - len([name for name in self.get_state_retry("event").keys() if ("octoplus_saving_session_events" in name)]) + len( + [ + name + for name in self.get_state_retry("event").keys() + if ("octoplus_saving_session_events" in name) + ] + ) > 0 ): saving_events_entity = [ - name for name in self.get_state_retry("event").keys() if ("octoplus_saving_session_events" in name) + name + for name in self.get_state_retry("event").keys() + if ("octoplus_saving_session_events" in name) ][0] self.log("") self.rlog(f"Found Octopus Savings Events entity: {saving_events_entity}") - octopus_account = self.get_state_retry(entity_id=saving_events_entity, attribute="account_id") + octopus_account = self.get_state_retry( + entity_id=saving_events_entity, attribute="account_id" + ) self.config["octopus_account"] = octopus_account if octopus_account not in self.redact_regex: self.redact_regex.append(octopus_account) self.redact_regex.append(octopus_account.lower().replace("-", "_")) - available_events = self.get_state_retry(saving_events_entity, attribute="all")["attributes"][ - "available_events" - ] + available_events = self.get_state_retry( + saving_events_entity, attribute="all" + )["attributes"]["available_events"] if len(available_events) > 0: self.log("Joining the following new Octoplus Events:") @@ -1090,12 +1169,14 @@ def _load_saving_events(self): event_code=event["code"], ) - joined_events = self.get_state_retry(saving_events_entity, attribute="all")["attributes"]["joined_events"] + joined_events = self.get_state_retry(saving_events_entity, attribute="all")[ + "attributes" + ]["joined_events"] for event in joined_events: - if event["id"] not in self.saving_events and pd.Timestamp(event["end"], tz="UTC") > pd.Timestamp.now( - tz="UTC" - ): + if event["id"] not in self.saving_events and pd.Timestamp( + event["end"], tz="UTC" + ) > pd.Timestamp.now(tz="UTC"): self.saving_events[event["id"]] = event self.log("") @@ -1117,7 +1198,9 @@ def get_ha_value(self, entity_id): # if the state is None return None if state is not None: - if (state in ["unknown", "unavailable"]) and (entity_id[:6] != "button"): + if (state in ["unknown", "unavailable"]) and ( + entity_id[:6] != "button" + ): e = f"HA returned {state} for state of {entity_id}" self.status(f"ERROR: {e}") self.log(e, level="ERROR") @@ -1151,7 +1234,9 @@ def get_default_config(self, item): def same_type(self, a, b): if type(a) != type(b): - (isinstance(a, int) | isinstance(a, float)) & (isinstance(b, int) | isinstance(b, float)) + (isinstance(a, int) | isinstance(a, float)) & ( + isinstance(b, int) | isinstance(b, float) + ) else: return True @@ -1183,7 +1268,12 @@ def _load_args(self, items=None): self.args[item] = [self.args[item]] values = [ - (v.replace("{device_name}", self.device_name) if isinstance(v, str) else v) for v in self.args[item] + ( + v.replace("{device_name}", self.device_name) + if isinstance(v, str) + else v + ) + for v in self.args[item] ] if values[0] is None: @@ -1204,8 +1294,12 @@ def _load_args(self, items=None): str1 = "" str2 = " " - self.rlog(f" {str1:34s} {str2} {x['name']:27s} Import: {x['octopus_import_tariff_code']:>36s}") - self.rlog(f" {'':34s} {'':27s} Export: {x['octopus_export_tariff_code']:>36s}") + self.rlog( + f" {str1:34s} {str2} {x['name']:27s} Import: {x['octopus_import_tariff_code']:>36s}" + ) + self.rlog( + f" {'':34s} {'':27s} Export: {x['octopus_export_tariff_code']:>36s}" + ) self.yaml_config[item] = self.config[item] elif item == "consumption_shape": @@ -1221,7 +1315,9 @@ def _load_args(self, items=None): str2 = " " str3 = " " str4 = " " - self.rlog(f" {str1:34s} {str2} {str3} {x['hour']:5.2f} {str4} {x['consumption']:5.0f} W") + self.rlog( + f" {str1:34s} {str2} {str3} {x['hour']:5.2f} {str4} {x['consumption']:5.0f} W" + ) self.yaml_config[item] = self.config[item] elif re.match("^manual_..port_tariff_unit$", item): @@ -1244,7 +1340,9 @@ def _load_args(self, items=None): elif "id_" in item: if self.debug: - self.log(f">>> Test: {self.entity_exists('update.home_assistant_core_update')}") + self.log( + f">>> Test: {self.entity_exists('update.home_assistant_core_update')}" + ) for v in values: self.log(f">>> {item} {v} {self.entity_exists(v)}") if min([self.entity_exists(v) for v in values]): @@ -1276,12 +1374,18 @@ def _load_args(self, items=None): for value in self.args[item]: self.rlog(f"\t{value}") - arg_types = {t: [isinstance(v, t) for v in values] for t in [str, float, int, bool]} + arg_types = { + t: [isinstance(v, t) for v in values] + for t in [str, float, int, bool] + } if ( len(values) == 1 and isinstance(values[0], str) - and (pd.to_datetime(values[0], errors="coerce", format="%H:%M") != pd.NaT) + and ( + pd.to_datetime(values[0], errors="coerce", format="%H:%M") + != pd.NaT + ) ): self.config[item] = values[0] self.rlog( @@ -1293,7 +1397,10 @@ def _load_args(self, items=None): if self.debug: self.rlog("\tFound a valid list of strings") - if isinstance(self.get_default_config(item), str) and len(values) == 1: + if ( + isinstance(self.get_default_config(item), str) + and len(values) == 1 + ): self.config[item] = values[0] self.rlog( f" {item:34s} = {str(self.config[item]):57s} {str(self.get_config(item)):>6s}: value in YAML" @@ -1303,13 +1410,16 @@ def _load_args(self, items=None): else: ha_values = [self.get_ha_value(entity_id=v) for v in values] val_types = { - t: np.array([isinstance(v, t) for v in ha_values]) for t in [str, float, int, bool] + t: np.array([isinstance(v, t) for v in ha_values]) + for t in [str, float, int, bool] } # if they are all float or int valid_strings = [ j - for j in [h for h in zip(ha_values[:-1], values[:-1]) if h[0]] + for j in [ + h for h in zip(ha_values[:-1], values[:-1]) if h[0] + ] if j[0] in DEFAULT_CONFIG[item]["options"] ] @@ -1329,7 +1439,9 @@ def _load_args(self, items=None): ) elif len(values) > 1: - if self.same_type(values[-1], self.get_default_config(item)): + if self.same_type( + values[-1], self.get_default_config(item) + ): self.config[item] = values[-1] self.rlog( f" {item:34s} = {str(self.config[item]):57s} {str(self.get_config(item)):>6s}: YAML default. Unable to read from HA entities listed in YAML." @@ -1349,7 +1461,10 @@ def _load_args(self, items=None): f" {item:34s} = {str(self.config[item]):57s} {str(self.get_config(item)):>6s}: system default. Unable to read from HA entities listed in YAML. No default in YAML.", level="WARNING", ) - elif item in self.inverter.config or item in self.inverter.brand_config: + elif ( + item in self.inverter.config + or item in self.inverter.brand_config + ): self.config[item] = self.get_default_config(item) self.rlog( @@ -1362,7 +1477,9 @@ def _load_args(self, items=None): f" {item:34s} = {str(self.config[item]):57s} {str(self.get_config(item)):>6s}: YAML default value. No default defined." ) - elif len(values) == 1 and (arg_types[bool][0] or arg_types[int][0] or arg_types[float][0]): + elif len(values) == 1 and ( + arg_types[bool][0] or arg_types[int][0] or arg_types[float][0] + ): if self.debug: self.rlog("\tFound a single default value") @@ -1375,12 +1492,21 @@ def _load_args(self, items=None): elif ( len(values) > 1 and (min(arg_types[str][:-1])) - and (arg_types[bool][-1] or arg_types[int][-1] or arg_types[float][-1]) + and ( + arg_types[bool][-1] + or arg_types[int][-1] + or arg_types[float][-1] + ) ): if self.debug: - self.rlog("\tFound a valid list of strings followed by a single default value") + self.rlog( + "\tFound a valid list of strings followed by a single default value" + ) ha_values = [self.get_ha_value(entity_id=v) for v in values[:-1]] - val_types = {t: np.array([isinstance(v, t) for v in ha_values]) for t in [str, float, int, bool]} + val_types = { + t: np.array([isinstance(v, t) for v in ha_values]) + for t in [str, float, int, bool] + } # if they are all float or int if np.min(val_types[int] | val_types[float]): self.config[item] = sum(ha_values) @@ -1472,7 +1598,9 @@ def _expose_configs(self, over_write=True): state_topic = f"homeassistant/{domain}/{id}/state" if not self.entity_exists(entity_id=entity_id): - self.log(f" - Creating HA Entity {entity_id} for {item} using MQTT Discovery") + self.log( + f" - Creating HA Entity {entity_id} for {item} using MQTT Discovery" + ) conf = ( { "state_topic": state_topic, @@ -1500,13 +1628,18 @@ def _expose_configs(self, over_write=True): elif ( isinstance(self.get_ha_value(entity_id=entity_id), str) - and (self.get_ha_value(entity_id=entity_id) not in attributes.get("options", {})) + and ( + self.get_ha_value(entity_id=entity_id) + not in attributes.get("options", {}) + ) and (domain not in ["text", "button"]) ) or (self.get_ha_value(entity_id=entity_id) is None): state = self._state_from_value(self.get_default_config(item)) - self.log(f" - Found unexpected str for {entity_id} reverting to default of {state}") + self.log( + f" - Found unexpected str for {entity_id} reverting to default of {state}" + ) self.set_state(state=state, entity_id=entity_id) @@ -1519,13 +1652,21 @@ def _expose_configs(self, over_write=True): self.log("Over-writing HA from YAML:") self.log("--------------------------") self.log("") - self.log(f" {'Config Item':40s} {'HA Entity':42s} Old State New State") - self.log(f" {'-----------':40s} {'---------':42s} ---------- ----------") + self.log( + f" {'Config Item':40s} {'HA Entity':42s} Old State New State" + ) + self.log( + f" {'-----------':40s} {'---------':42s} ---------- ----------" + ) over_write_log = True - str_log = f" {item:40s} {entity_id:42s} {state:10s} {new_state:10s}" + str_log = ( + f" {item:40s} {entity_id:42s} {state:10s} {new_state:10s}" + ) over_write_count = 0 - while (state != new_state) and (over_write_count < OVERWRITE_ATTEMPTS): + while (state != new_state) and ( + over_write_count < OVERWRITE_ATTEMPTS + ): self.set_state(state=new_state, entity_id=entity_id) time.sleep(0.1) state = self.get_state_retry(entity_id) @@ -1556,7 +1697,9 @@ def _expose_configs(self, over_write=True): for entity_id in self.change_items: if not "sensor" in entity_id: item = self.change_items[entity_id] - self.log(f" {item:40s} {entity_id:42s} {self.config_state[item]}") + self.log( + f" {item:40s} {entity_id:42s} {self.config_state[item]}" + ) self.handles[entity_id] = self.listen_state( callback=self.optimise_state_change, entity_id=entity_id ) @@ -1568,7 +1711,9 @@ def _expose_configs(self, over_write=True): def status(self, status): entity_id = f"sensor.{self.prefix.lower()}_status" - attributes = {"last_updated": pd.Timestamp.now().strftime(DATE_TIME_FORMAT_LONG)} + attributes = { + "last_updated": pd.Timestamp.now().strftime(DATE_TIME_FORMAT_LONG) + } # self.log(f">>> {status}") # self.log(f">>> {entity_id}") self.set_state(state=status, entity_id=entity_id, attributes=attributes) @@ -1576,7 +1721,9 @@ def status(self, status): @ad.app_lock def optimise_state_change(self, entity_id, attribute, old, new, kwargs): item = self.change_items[entity_id] - self.log(f"State change detected for {entity_id} [config item: {item}] from {old} to {new}:") + self.log( + f"State change detected for {entity_id} [config item: {item}] from {old} to {new}:" + ) self.config_state[item] = new @@ -1645,7 +1792,9 @@ def optimise(self): if self.io: self._get_io() - if self.get_config("forced_discharge") and (self.get_config("supports_forced_discharge", True)): + if self.get_config("forced_discharge") and ( + self.get_config("supports_forced_discharge", True) + ): discharge_enable = "enabled" else: discharge_enable = "disabled" @@ -1656,19 +1805,23 @@ def optimise(self): self.ulog("Checking tariffs:") - self.log(f" Contract last loaded at {self.contract_last_loaded.strftime(DATE_TIME_FORMAT_SHORT)}") + self.log( + f" Contract last loaded at {self.contract_last_loaded.strftime(DATE_TIME_FORMAT_SHORT)}" + ) if self.agile: - if (self.contract.tariffs["import"].end().day == pd.Timestamp.now().day) and ( - pd.Timestamp.now(tz=self.tz).hour >= 16 - ): + if ( + self.contract.tariffs["import"].end().day == pd.Timestamp.now().day + ) and (pd.Timestamp.now(tz=self.tz).hour >= 16): self.log( f"Contract end day: {self.contract.tariffs['import'].end().day} Today:{pd.Timestamp.now().day}" ) self._load_contract() # If intelligent tariff, load at 4.40pm (rather than 4pm to cut down number of reloads) elif self.intelligent: - if (pd.Timestamp.now(tz=self.tz).hour == 16) and (pd.Timestamp.now(tz=self.tz).minute >= 40): + if (pd.Timestamp.now(tz=self.tz).hour == 16) and ( + pd.Timestamp.now(tz=self.tz).minute >= 40 + ): self.log(" About to reload Octopus Intelligent Tariff") self._load_contract() @@ -1725,14 +1878,19 @@ def optimise(self): if self.debug: self.log(f">>> soc_now: {self.soc_now}") self.log(f">>> x: {x}") - self.log(f">>> Original: {x.loc[x.loc[: self.static.index[0]].index[-1] :]}") + self.log( + f">>> Original: {x.loc[x.loc[: self.static.index[0]].index[-1] :]}" + ) try: self.soc_now = float(self.soc_now) except: self.log("") - self.log("Unable to get current SOC from HASS. Using last value from History.", level="WARNING") + self.log( + "Unable to get current SOC from HASS. Using last value from History.", + level="WARNING", + ) self.soc_now = x.iloc[-1] # x = x.astype(float) @@ -1741,7 +1899,9 @@ def optimise(self): x = x.loc[x.loc[: self.static.index[0]].index[-1] :] if self.debug: - self.log(f">>> Fixed : {x.loc[x.loc[: self.static.index[0]].index[-1] :]}") + self.log( + f">>> Fixed : {x.loc[x.loc[: self.static.index[0]].index[-1] :]}" + ) x = pd.concat( [ @@ -1781,9 +1941,7 @@ def optimise(self): self.log("") if self.get_config("use_solar", True): - str_log = ( - f'Optimising for Solcast {self.get_config("solcast_confidence_level")}% confidence level forecast' - ) + str_log = f'Optimising for Solcast {self.get_config("solcast_confidence_level")}% confidence level forecast' else: str_log = "Optimising without Solar" @@ -1836,11 +1994,18 @@ def optimise(self): self.log(f" {'Base cost:':40s} {self.optimised_cost['Base'].sum():6.1f}p") cost_today = self._cost_actual().sum() self.summary_costs = { - "Base": {"cost": ((self.optimised_cost["Base"].sum() + cost_today) / 100).round(2), "Selected": ""} + "Base": { + "cost": ((self.optimised_cost["Base"].sum() + cost_today) / 100).round( + 2 + ), + "Selected": "", + } } for case in cases: str_log = f" {f'Optimised cost ({case}):':40s} {self.optimised_cost[case].sum():6.1f}p" - self.summary_costs[case] = {"cost": ((self.optimised_cost[case].sum() + cost_today) / 100).round(2)} + self.summary_costs[case] = { + "cost": ((self.optimised_cost[case].sum() + cost_today) / 100).round(2) + } if case == self.selected_case: self.summary_costs[case]["Selected"] = " <=== Current Setup" else: @@ -1892,13 +2057,20 @@ def optimise(self): status = self.inverter.status self._log_inverterstatus(status) - time_to_slot_start = (self.charge_start_datetime - pd.Timestamp.now(self.tz)).total_seconds() / 60 - time_to_slot_end = (self.charge_end_datetime - pd.Timestamp.now(self.tz)).total_seconds() / 60 + time_to_slot_start = ( + self.charge_start_datetime - pd.Timestamp.now(self.tz) + ).total_seconds() / 60 + time_to_slot_end = ( + self.charge_end_datetime - pd.Timestamp.now(self.tz) + ).total_seconds() / 60 # if len(self.windows) > 0: if ( (time_to_slot_start > 0) - and (time_to_slot_start < self.get_config("optimise_frequency_minutes")) + and ( + time_to_slot_start + < self.get_config("optimise_frequency_minutes") + ) and (len(self.windows) > 0) ) or (self.get_config("id_battery_soc") < self.get_config("sleep_soc")): # Next slot starts before the next optimiser run. This implies we are not currently in @@ -1909,7 +2081,9 @@ def optimise(self): f"Current SOC of {self.get_config('id_battery_soc'):0.1f}% is less than battery_sleep SOC of {self.get_config('sleep_soc'):0.1f}%" ) elif len(self.windows) > 0: - self.log(f"Next charge/discharge window starts in {time_to_slot_start:0.1f} minutes.") + self.log( + f"Next charge/discharge window starts in {time_to_slot_start:0.1f} minutes." + ) else: self.log("No charge/discharge windows planned.") @@ -1937,7 +2111,10 @@ def optimise(self): elif ( (time_to_slot_start <= 0) - and (time_to_slot_start < self.get_config("optimise_frequency_minutes")) + and ( + time_to_slot_start + < self.get_config("optimise_frequency_minutes") + ) and (len(self.windows) > 0) ): # We are currently in a charge/discharge slot @@ -1945,8 +2122,13 @@ def optimise(self): # If the current slot is a Hold SOC slot and we aren't holding then we need to # enable Hold SOC if self.hold and self.hold[0]["active"]: - if not status["hold_soc"]["active"] or status["hold_soc"]["soc"] != self.hold[0]["soc"]: - self.log(f" Enabling SOC hold at SOC of {self.hold[0]['soc']:0.0f}%") + if ( + not status["hold_soc"]["active"] + or status["hold_soc"]["soc"] != self.hold[0]["soc"] + ): + self.log( + f" Enabling SOC hold at SOC of {self.hold[0]['soc']:0.0f}%" + ) self.inverter.hold_soc( enable=True, soc=self.hold[0]["soc"], @@ -1954,10 +2136,14 @@ def optimise(self): end=self.charge_end_datetime, ) else: - self.log(f" Inverter already holding SOC of {self.hold[0]['soc']:0.0f}%") + self.log( + f" Inverter already holding SOC of {self.hold[0]['soc']:0.0f}%" + ) else: - self.log(f"Current charge/discharge window ends in {time_to_slot_end:0.1f} minutes.") + self.log( + f"Current charge/discharge window ends in {time_to_slot_end:0.1f} minutes." + ) if self.charge_power > 0: if not status["charge"]["active"]: @@ -2044,8 +2230,10 @@ def optimise(self): if len(self.windows) > 0: if ( direction == "charge" - and self.charge_start_datetime > status["discharge"]["start"] - and status["discharge"]["start"] != status["discharge"]["end"] + and self.charge_start_datetime + > status["discharge"]["start"] + and status["discharge"]["start"] + != status["discharge"]["end"] ): str_log += " but inverter has a discharge slot before then. Disabling discharge." self.log(str_log) @@ -2143,7 +2331,9 @@ def _create_windows(self): self.windows = pd.concat([windows, self.windows]).sort_values("start") tolerance = self.get_config("forced_power_group_tolerance") if tolerance > 0: - self.windows["forced"] = ((self.windows["forced"] / tolerance).round(0) * tolerance).astype(int) + self.windows["forced"] = ( + (self.windows["forced"] / tolerance).round(0) * tolerance + ).astype(int) self.windows["soc"] = self.windows["soc"].round(0).astype(int) self.windows["soc_end"] = self.windows["soc_end"].round(0).astype(int) @@ -2152,7 +2342,10 @@ def _create_windows(self): if self.config["supports_hold_soc"]: self.log("Checking for Hold SOC slots") self.windows.loc[ - ((self.windows["soc_end"] - self.windows["soc"]).abs() < HOLD_TOLERANCE) + ( + (self.windows["soc_end"] - self.windows["soc"]).abs() + < HOLD_TOLERANCE + ) & (self.windows["soc"] > self.get_config("maximum_dod_percent")), "hold_soc", ] = "<=" @@ -2170,7 +2363,9 @@ def _create_windows(self): self.charge_current = self.charge_power / voltage else: self.charge_current = None - self.charge_start_datetime = self.windows["start"].iloc[0].tz_convert(self.tz) + self.charge_start_datetime = ( + self.windows["start"].iloc[0].tz_convert(self.tz) + ) self.charge_end_datetime = self.windows["end"].iloc[0].tz_convert(self.tz) self.charge_target_soc = self.windows["soc_end"].iloc[0] self.hold = [ @@ -2202,7 +2397,9 @@ def _log_inverterstatus(self, status): self.log(f" {s:18s}:") for x in status[s]: if isinstance(status[s][x], pd.Timestamp): - self.log(f" {x:16s}: {status[s][x].strftime(DATE_TIME_FORMAT_SHORT)}") + self.log( + f" {x:16s}: {status[s][x].strftime(DATE_TIME_FORMAT_SHORT)}" + ) else: self.log(f" {x:16s}: {status[s][x]}") self.log("") @@ -2226,7 +2423,10 @@ def write_cost( cost_today = self._cost_actual() midnight = pd.Timestamp.now(tz="UTC").normalize() + pd.Timedelta(24, "hours") df = df.fillna(0).round(2) - df["period_start"] = df.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + ":00" + df["period_start"] = ( + df.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + + ":00" + ) cols = [ "soc", "forced", @@ -2244,7 +2444,10 @@ def write_cost( cost["cumulative_cost"] = cost["cost"].cumsum() for d in [df, cost]: - d["period_start"] = d.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + ":00" + d["period_start"] = ( + d.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + + ":00" + ) state = round((cost["cost"].sum()) / 100, 2) @@ -2255,12 +2458,17 @@ def write_cost( "state_class": "measurement", "unit_of_measurement": "GBP", "cost_today": round( - (cost["cost"].loc[: midnight - pd.Timedelta(30, "minutes")].sum()) / 100, + (cost["cost"].loc[: midnight - pd.Timedelta(30, "minutes")].sum()) + / 100, 2, ), "cost_tomorrow": round((cost["cost"].loc[midnight:].sum()) / 100, 2), } - | {col: df[["period_start", col]].to_dict("records") for col in cols if col in df.columns} + | { + col: df[["period_start", col]].to_dict("records") + for col in cols + if col in df.columns + } | {"cost": cost[["period_start", "cumulative_cost"]].to_dict("records")} | attributes ) @@ -2371,7 +2579,9 @@ def _write_output(self): def load_solcast(self): if not self.get_config("use_solar", True): df = pd.DataFrame( - index=pd.date_range(pd.Timestamp.now(tz="UTC").normalize(), periods=96, freq="30min"), + index=pd.date_range( + pd.Timestamp.now(tz="UTC").normalize(), periods=96, freq="30min" + ), data={"Solcast": 0, "Solcast_p10": 0, "Solcast_p90": 0, "weighted": 0}, ) return df @@ -2379,12 +2589,12 @@ def load_solcast(self): if self.debug: self.log("Getting Solcast data") try: - solar = self.get_state_retry(self.config["id_solcast_today"], attribute="all")["attributes"][ - "detailedForecast" - ] - solar += self.get_state_retry(self.config["id_solcast_tomorrow"], attribute="all")["attributes"][ - "detailedForecast" - ] + solar = self.get_state_retry( + self.config["id_solcast_today"], attribute="all" + )["attributes"]["detailedForecast"] + solar += self.get_state_retry( + self.config["id_solcast_tomorrow"], attribute="all" + )["attributes"]["detailedForecast"] except Exception as e: self.log(f"Failed to get solcast attributes: {e}") @@ -2423,7 +2633,9 @@ def load_solcast(self): self.log("") return - def _get_hass_power_from_daily_kwh(self, entity_id, start=None, end=None, days=None, log=False): + def _get_hass_power_from_daily_kwh( + self, entity_id, start=None, end=None, days=None, log=False + ): if days is None: days = (pd.Timestamp.now(tz="UTC") - start).days + 1 @@ -2438,8 +2650,14 @@ def _get_hass_power_from_daily_kwh(self, entity_id, start=None, end=None, days=N x = df.diff().clip(0).fillna(0).cumsum() + df.iloc[0] x.index = x.index.round("1s") x = x[~x.index.duplicated()] - y = -pd.concat([x.resample("1s").interpolate().resample("30min").asfreq(), x.iloc[-1:]]).diff(-1) - dt = y.index.diff().total_seconds() / pd.Timedelta("60min").total_seconds() / 1000 + y = -pd.concat( + [x.resample("1s").interpolate().resample("30min").asfreq(), x.iloc[-1:]] + ).diff(-1) + dt = ( + y.index.diff().total_seconds() + / pd.Timedelta("60min").total_seconds() + / 1000 + ) df = y[1:-1] / dt[2:] if start is not None: @@ -2459,7 +2677,9 @@ def load_consumption(self, start, end): if self.get_config("use_consumption_history"): time_now = pd.Timestamp.now(tz="UTC") if (start < time_now) and (end < time_now): - self.log(" - Start and end are both in past so actuals will be used with no weighting") + self.log( + " - Start and end are both in past so actuals will be used with no weighting" + ) days = (time_now - start).days + 1 else: days = int(self.get_config("consumption_history_days")) @@ -2474,7 +2694,11 @@ def load_consumption(self, start, end): if not isinstance(entity_ids, list): entity_ids = [entity_ids] - entity_ids = [entity_id for entity_id in entity_ids if self.entity_exists(entity_id)] + entity_ids = [ + entity_id + for entity_id in entity_ids + if self.entity_exists(entity_id) + ] if ( (len(entity_ids) == 0) @@ -2525,8 +2749,12 @@ def load_consumption(self, start, end): self.log(f" - {days} days was expected. {str_days}") - if (len(self.zappi_entities) > 0) and (self.get_config("ev_charger", "None") == "Zappi"): - ev_power = self._get_zappi(start=df.index[0], end=df.index[-1], log=True) + if (len(self.zappi_entities) > 0) and ( + self.get_config("ev_charger", "None") == "Zappi" + ): + ev_power = self._get_zappi( + start=df.index[0], end=df.index[-1], log=True + ) if len(ev_power) > 0: self.log("") self.log(f" Deducting EV consumption of {ev_power.sum()/2000}") @@ -2549,21 +2777,31 @@ def load_consumption(self, start, end): dfx = None if self.get_config("ev_part_of_house_load", False): - self.log("EV charger is seen as house load, so subtracting EV charging from Total consumption") + self.log( + "EV charger is seen as house load, so subtracting EV charging from Total consumption" + ) df_EV_Total = pd.concat( [ev_power, df], axis=1 ) # concatenate total consumption and ev consumption into a single dataframe (as they are different lengths) df_EV_Total.columns = ["EV", "Total"] # Set column names - df_EV_Total = df_EV_Total.fillna(0) # fill any missing values with 0 + df_EV_Total = df_EV_Total.fillna( + 0 + ) # fill any missing values with 0 # self.log("Attempt to concatenate is") # self.log(df_EV_Total) # self.log("Attempt to concatenate is") # self.log(df_EV_Total.to_string()) - df_EV = df_EV_Total["EV"].squeeze() # Extract EV consumption to Series - df_Total = df_EV_Total["Total"].squeeze() # Extract total consumption to Series - df = df_Total - df_EV # Substract EV consumption from Total Consumption + df_EV = df_EV_Total[ + "EV" + ].squeeze() # Extract EV consumption to Series + df_Total = df_EV_Total[ + "Total" + ].squeeze() # Extract total consumption to Series + df = ( + df_Total - df_EV + ) # Substract EV consumption from Total Consumption if self.debug: self.log("Result of subtraction is") self.log(df.to_string()) @@ -2577,7 +2815,9 @@ def load_consumption(self, start, end): dfx = pd.Series(index=df.index, data=df.to_list()) # Group by time and take the mean - df = df.groupby(df.index.time).aggregate(self.get_config("consumption_grouping")) + df = df.groupby(df.index.time).aggregate( + self.get_config("consumption_grouping") + ) df.name = "consumption" if self.debug: @@ -2588,26 +2828,36 @@ def load_consumption(self, start, end): temp = pd.DataFrame(index=index) temp["time"] = temp.index.time - consumption_mean = temp.merge(df, "left", left_on="time", right_index=True)["consumption"] + consumption_mean = temp.merge( + df, "left", left_on="time", right_index=True + )["consumption"] if days >= 7: - consumption_dow = self.get_config("day_of_week_weighting") * dfx.iloc[: len(temp)] + consumption_dow = ( + self.get_config("day_of_week_weighting") * dfx.iloc[: len(temp)] + ) if len(consumption_dow) != len(consumption_mean): self.log(">>> Inconsistent lengths in consumption arrays") self.log(f">>> dow : {len(consumption_dow)}") self.log(f">>> mean: {len(consumption_mean)}") idx = consumption_dow.index.intersection(consumption_mean.index) - self.log(f"Clipping the consumption to the overlap ({len(idx)/24:0.1f} days)", level="WARNING") + self.log( + f"Clipping the consumption to the overlap ({len(idx)/24:0.1f} days)", + level="WARNING", + ) consumption_mean = consumption_mean.loc[idx] consumption_dow = consumption_dow.loc[idx] consumption["consumption"] += pd.Series( consumption_dow.to_numpy() - + consumption_mean.to_numpy() * (1 - self.get_config("day_of_week_weighting")), + + consumption_mean.to_numpy() + * (1 - self.get_config("day_of_week_weighting")), index=consumption_mean.index, ) else: - self.log(f" - Ignoring 'Day of Week Weighting' because only {days} days of history is available") + self.log( + f" - Ignoring 'Day of Week Weighting' because only {days} days of history is available" + ) consumption["consumption"] = consumption_mean if len(entity_ids) > 0: @@ -2618,7 +2868,9 @@ def load_consumption(self, start, end): else: daily_kwh = self.get_config("daily_consumption_kwh") - self.log(f" - Creating consumption based on daily estimate of {daily_kwh} kWh") + self.log( + f" - Creating consumption based on daily estimate of {daily_kwh} kWh" + ) if self.get_config("shape_consumption_profile"): self.log(" and typical usage profile.") @@ -2633,15 +2885,21 @@ def load_consumption(self, start, end): daily.index = pd.to_datetime(daily.index, unit="h").time consumption["time"] = consumption.index.time consumption = pd.DataFrame( - consumption.merge(daily, left_on="time", right_index=True)["consumption_y"] + consumption.merge(daily, left_on="time", right_index=True)[ + "consumption_y" + ] ).set_axis(["consumption"], axis=1) else: self.log(" and flat usage profile.") - consumption["consumption"] = self.get_config("daily_consumption_kwh") * 1000 / 24 + consumption["consumption"] = ( + self.get_config("daily_consumption_kwh") * 1000 / 24 + ) self.log(" - Consumption estimated OK") - self.log(f" - Total consumption: {(consumption['consumption'].sum() / 2000):0.1f} kWh") + self.log( + f" - Total consumption: {(consumption['consumption'].sum() / 2000):0.1f} kWh" + ) if self.debug: self.log("Printing final result of routine load_consumption.....") self.log(consumption.to_string()) @@ -2655,14 +2913,20 @@ def _auto_cal(self): solar = self._get_solar(start, end) consumption = self.load_consumption(start, end) grid = self.load_grid(start, end) - soc = self.hass2df(self.config["id_battery_soc"], days=2, freq="30min").loc[start:end] + soc = self.hass2df(self.config["id_battery_soc"], days=2, freq="30min").loc[ + start:end + ] def load_grid(self, start, end): self.log( f"Getting yesterday's grid flows ({start.strftime(DATE_TIME_FORMAT_SHORT)} - {end.strftime(DATE_TIME_FORMAT_SHORT)}):" ) # entity_id = self.config["id_daily_solar"] - mults = {"id_grid_import_power": 1, "id_grid_import_power": -1, "id_grid_power": 1} + mults = { + "id_grid_import_power": 1, + "id_grid_import_power": -1, + "id_grid_power": 1, + } days = (pd.Timestamp.now(tz="UTC") - start).days + 1 mults = {mults[id] for id in mults if id in self.config} for id in mults: @@ -2670,7 +2934,14 @@ def load_grid(self, start, end): if self.entity_exists(entity_id): x = self.hass2df(entity_id, days=days) if x is not None: - x = (self.riemann_avg(x).loc[start : end - pd.Timedelta("30min")] / 10).round(0) * 10 * mults[id] + x = ( + ( + self.riemann_avg(x).loc[start : end - pd.Timedelta("30min")] + / 10 + ).round(0) + * 10 + * mults[id] + ) if df is None: df = x else: @@ -2692,9 +2963,13 @@ def _compare_tariffs(self): return consumption = self.load_consumption(start, end) - static = pd.concat([solar, consumption], axis=1).set_axis(["solar", "consumption"], axis=1) + static = pd.concat([solar, consumption], axis=1).set_axis( + ["solar", "consumption"], axis=1 + ) - initial_soc_df = self.hass2df(self.config["id_battery_soc"], days=2, freq="30min") + initial_soc_df = self.hass2df( + self.config["id_battery_soc"], days=2, freq="30min" + ) initial_soc = initial_soc_df.loc[start] base = self.pv_system.flows(initial_soc, static, solar="solar") @@ -2717,7 +2992,9 @@ def _compare_tariffs(self): name = tariff_set["name"] for imp_exp in IMPEXP: code[imp_exp] = tariff_set[f"octopus_{imp_exp}_tariff_code"] - tariffs[imp_exp] = pv.Tariff(code[imp_exp], export=(imp_exp == "export"), host=self) + tariffs[imp_exp] = pv.Tariff( + code[imp_exp], export=(imp_exp == "export"), host=self + ) contracts.append( pv.Contract( @@ -2729,7 +3006,10 @@ def _compare_tariffs(self): ) actual = self._cost_actual(start=start, end=end - pd.Timedelta(30, "minutes")) - static["period_start"] = static.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + ":00" + static["period_start"] = ( + static.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + + ":00" + ) entity_id = f"sensor.{self.prefix}_opt_cost_actual" self.set_state( state=round(actual.sum() / 100, 2), @@ -2740,12 +3020,19 @@ def _compare_tariffs(self): "unit_of_measurement": "GBP", "friendly_name": f"PV Opt Comparison Actual", } - | {col: static[["period_start", col]].to_dict("records") for col in ["solar", "consumption"]}, + | { + col: static[["period_start", col]].to_dict("records") + for col in ["solar", "consumption"] + }, ) self.ulog("Net Cost comparison:", underline=None) - self.log(f" {'Tariff':20s} {'Base Cost (GBP)':>20s} {'Optimised Cost (GBP)':>20s} ") - self.log(f" {'------':20s} {'---------------':>20s} {'--------------------':>20s} ") + self.log( + f" {'Tariff':20s} {'Base Cost (GBP)':>20s} {'Optimised Cost (GBP)':>20s} " + ) + self.log( + f" {'------':20s} {'---------------':>20s} {'--------------------':>20s} " + ) self.log(f" {'Actual':20s} {'':20s} {(actual.sum()/100):>20.3f}") cols = [ @@ -2771,7 +3058,10 @@ def _compare_tariffs(self): log=False, ) - opt["period_start"] = opt.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + ":00" + opt["period_start"] = ( + opt.index.tz_convert(self.tz).strftime("%Y-%m-%dT%H:%M:%S%z").str[:-2] + + ":00" + ) attributes = { "state_class": "measurement", @@ -2779,10 +3069,16 @@ def _compare_tariffs(self): "unit_of_measurement": "GBP", "friendly_name": f"PV Opt Comparison {contract.name}", "net_base": round(net_base.sum() / 100, 2), - } | {col: opt[["period_start", col]].to_dict("records") for col in cols if col in opt.columns} + } | { + col: opt[["period_start", col]].to_dict("records") + for col in cols + if col in opt.columns + } net_opt = contract.net_cost(opt, day_ahead=False) - self.log(f" {contract.name:20s} {(net_base.sum()/100):>20.3f} {(net_opt.sum()/100):>20.3f}") + self.log( + f" {contract.name:20s} {(net_base.sum()/100):>20.3f} {(net_opt.sum()/100):>20.3f}" + ) entity_id = f"sensor.{self.prefix}_opt_cost_{contract.name}" self.set_state( state=round(net_opt.sum() / 100, 2), @@ -2806,7 +3102,10 @@ def _get_solar(self, start, end): if self.entity_exists(entity_id): x = self.hass2df(entity_id, days=days) if x is not None: - x = (self.riemann_avg(x).loc[start : end - pd.Timedelta("30min")] / 10).round(0) * 10 + x = ( + self.riemann_avg(x).loc[start : end - pd.Timedelta("30min")] + / 10 + ).round(0) * 10 if df is None: df = x else: @@ -2828,16 +3127,18 @@ def _check_tariffs_vs_bottlecap(self): else: df = pd.DataFrame( - self.get_state_retry(self.bottlecap_entities[direction], attribute=("rates")) + self.get_state_retry( + self.bottlecap_entities[direction], attribute=("rates") + ) ).set_index("start")["value_inc_vat"] df.index = pd.to_datetime(df.index, utc=True) df *= 100 df = pd.concat( [ df, - self.contract.tariffs[direction].to_df(start=df.index[0], end=df.index[-1], day_ahead=False)[ - "unit" - ], + self.contract.tariffs[direction].to_df( + start=df.index[0], end=df.index[-1], day_ahead=False + )["unit"], ], axis=1, ).set_axis(["bottlecap", "pv_opt"], axis=1) @@ -2845,7 +3146,13 @@ def _check_tariffs_vs_bottlecap(self): # Drop any Savings Sessions for id in self.saving_events: - df = df.drop(df[self.saving_events[id]["start"] : self.saving_events[id]["end"]].index[:-1]) + df = df.drop( + df[ + self.saving_events[id]["start"] : self.saving_events[id][ + "end" + ] + ].index[:-1] + ) pvopt_price = df["pv_opt"].mean() bottlecap_price = df["bottlecap"].mean() @@ -2880,7 +3187,10 @@ def _list_entities(self, domains=["select", "number", "sensor"]): states = self.get_state_retry(domain) states = {k: states[k] for k in states if self.device_name in k} for entity_id in states: - x = entity_id + f" ({states[entity_id]['attributes'].get('device_class',None)}):" + x = ( + entity_id + + f" ({states[entity_id]['attributes'].get('device_class',None)}):" + ) x = f" {x:60s}" if domain != "select": @@ -2942,7 +3252,9 @@ def write_and_poll_value(self, entity_id, value, tolerance=0.0, verbose=False): if diff > tolerance: changed = True try: - self.call_service("number/set_value", entity_id=entity_id, value=str(value)) + self.call_service( + "number/set_value", entity_id=entity_id, value=str(value) + ) written = False retries = 0 @@ -2966,7 +3278,9 @@ def set_select(self, item, state): if state is not None: entity_id = self.config[f"id_{item}"] if self.get_state_retry(entity_id=entity_id) != state: - self.call_service("select/select_option", entity_id=entity_id, option=state) + self.call_service( + "select/select_option", entity_id=entity_id, option=state + ) self.rlog(f"Setting {entity_id} to {state}") def get_state_retry(self, *args, **kwargs): @@ -3005,7 +3319,11 @@ def riemann_avg(self, x, freq="30min"): dt = x.index.diff().total_seconds().fillna(0) integral = (dt * x.shift(1)).fillna(0).cumsum().resample(freq).last() - avg = (integral.diff().shift(-1)[:-1] / pd.Timedelta(freq).total_seconds()).fillna(0).round(1) + avg = ( + (integral.diff().shift(-1)[:-1] / pd.Timedelta(freq).total_seconds()) + .fillna(0) + .round(1) + ) # self.log(avg) return avg diff --git a/apps/pv_opt/pvpy.py b/apps/pv_opt/pvpy.py index 193b057..033098d 100644 --- a/apps/pv_opt/pvpy.py +++ b/apps/pv_opt/pvpy.py @@ -1,12 +1,12 @@ # %% -import pandas as pd -import requests from copy import copy -from numpy import isnan - # from scipy.stats import linregress from datetime import datetime +import pandas as pd +import requests +from numpy import isnan + OCTOPUS_PRODUCT_URL = r"https://api.octopus.energy/v1/products/" AGILE_PREDICT_URL = r"https://agilepredict.com/api/" @@ -133,11 +133,15 @@ def get_octopus(self, **kwargs): url = f"{OCTOPUS_PRODUCT_URL}{product}/electricity-tariffs/{code}/day-unit-rates/" self.day = [ - x for x in requests.get(url, params=params).json()["results"] if x["payment_method"] == "DIRECT_DEBIT" + x + for x in requests.get(url, params=params).json()["results"] + if x["payment_method"] == "DIRECT_DEBIT" ] url = f"{OCTOPUS_PRODUCT_URL}{product}/electricity-tariffs/{code}/night-unit-rates/" self.night = [ - x for x in requests.get(url, params=params).json()["results"] if x["payment_method"] == "DIRECT_DEBIT" + x + for x in requests.get(url, params=params).json()["results"] + if x["payment_method"] == "DIRECT_DEBIT" ] self.unit = self.day @@ -171,7 +175,9 @@ def end(self): def to_df(self, start=None, end=None, **kwargs): if self.host.debug: self.log(f">>> {self.name}") - self.log(f">>> Start: {start.strftime(TIME_FORMAT)} End: {end.strftime(TIME_FORMAT)}") + self.log( + f">>> Start: {start.strftime(TIME_FORMAT)} End: {end.strftime(TIME_FORMAT)}" + ) time_now = pd.Timestamp.now(tz="UTC") @@ -188,11 +194,16 @@ def to_df(self, start=None, end=None, **kwargs): if end is None: end = pd.Timestamp.now(tz=start.tzinfo).ceil("30min") - use_day_ahead = kwargs.get("day_ahead", ((start > time_now) or (end > time_now))) + use_day_ahead = kwargs.get( + "day_ahead", ((start > time_now) or (end > time_now)) + ) if self.eco7: df = pd.concat( - [pd.DataFrame(x).set_index("valid_from")["value_inc_vat"] for x in [self.day, self.night]], + [ + pd.DataFrame(x).set_index("valid_from")["value_inc_vat"] + for x in [self.day, self.night] + ], axis=1, ).set_axis(["unit", "Night"], axis=1) df.index = pd.to_datetime(df.index) @@ -215,11 +226,16 @@ def to_df(self, start=None, end=None, **kwargs): pd.concat( [ pd.DataFrame( - index=[midnight + pd.Timedelta(f"{x['period_start']}:00") for x in self.unit], + index=[ + midnight + pd.Timedelta(f"{x['period_start']}:00") + for x in self.unit + ], data=[{"unit": x["price"]} for x in self.unit], ).sort_index() for midnight in pd.date_range( - start.floor("1D") - pd.Timedelta("1D"), end.ceil("1D"), freq="1D" + start.floor("1D") - pd.Timedelta("1D"), + end.ceil("1D"), + freq="1D", ) ] ) @@ -281,10 +297,20 @@ def to_df(self, start=None, end=None, **kwargs): self.agile_predict = self._get_agile_predict() if self.agile_predict is not None: - df = pd.concat([df, self.agile_predict.loc[df.index[-1] + pd.Timedelta("30min") : end]]) + df = pd.concat( + [ + df, + self.agile_predict.loc[ + df.index[-1] + pd.Timedelta("30min") : end + ], + ] + ) # If the index frequency >30 minutes so we need to just extend it: - if (len(df) > 1 and ((df.index[-1] - df.index[-2]).total_seconds() / 60) > 30) or len(df) == 1: + if ( + len(df) > 1 + and ((df.index[-1] - df.index[-2]).total_seconds() / 60) > 30 + ) or len(df) == 1: newindex = pd.date_range(df.index[0], end, freq="30min") df = df.reindex(index=newindex).ffill().loc[start:] else: @@ -296,7 +322,11 @@ def to_df(self, start=None, end=None, **kwargs): df.index[-1] + pd.Timedelta(24, "hours"), freq="30min", ) - dfx = pd.concat([df, pd.DataFrame(index=extended_index)]).shift(48).loc[extended_index[0] :] + dfx = ( + pd.concat([df, pd.DataFrame(index=extended_index)]) + .shift(48) + .loc[extended_index[0] :] + ) df = pd.concat([df, dfx]) df = df[df.columns[0]] df = df.loc[start:end] @@ -304,7 +334,11 @@ def to_df(self, start=None, end=None, **kwargs): if not self.export: if not self.manual: - x = pd.DataFrame(self.fixed).set_index("valid_from")["value_inc_vat"].sort_index() + x = ( + pd.DataFrame(self.fixed) + .set_index("valid_from")["value_inc_vat"] + .sort_index() + ) x.index = pd.to_datetime(x.index) newindex = pd.date_range(x.index[0], df.index[-1], freq="30min") x = x.reindex(newindex).sort_index() @@ -440,7 +474,13 @@ class BatteryModel: voltage: An int describing the voltage of the battery system. """ - def __init__(self, capacity: int, max_dod: float = 0.15, current_limit_amps: int = 100, voltage: int = 50) -> None: + def __init__( + self, + capacity: int, + max_dod: float = 0.15, + current_limit_amps: int = 100, + voltage: int = 50, + ) -> None: self.capacity = capacity self.max_dod = max_dod self.current_limit_amps = current_limit_amps @@ -499,7 +539,9 @@ def __init__( self.tz = "GB" if imp is None and octopus_account is None: - raise ValueError("Either a named import tariff or Octopus Account details much be provided") + raise ValueError( + "Either a named import tariff or Octopus Account details much be provided" + ) self.tariffs = {} @@ -533,7 +575,9 @@ def __init__( self.rlog(f"Retrieved most recent tariff code {tariff_code}") if mpan["is_export"]: - self.tariffs["export"] = Tariff(tariff_code, export=True, host=self.host) + self.tariffs["export"] = Tariff( + tariff_code, export=True, host=self.host + ) else: self.tariffs["import"] = Tariff(tariff_code, host=self.host) @@ -575,18 +619,28 @@ def net_cost(self, grid_flow, **kwargs): imp_df = self.tariffs["import"].to_df(start, end, **kwargs) nc = imp_df["fixed"] if kwargs.get("log"): - self.rlog(f">>> Import{self.tariffs['import'].to_df(start,end).to_string()}") + self.rlog( + f">>> Import{self.tariffs['import'].to_df(start,end).to_string()}" + ) nc += imp_df["unit"] * grid_imp / 2000 if kwargs.get("log"): - self.rlog(f">>> Export{self.tariffs['export'].to_df(start,end).to_string()}") + self.rlog( + f">>> Export{self.tariffs['export'].to_df(start,end).to_string()}" + ) if self.tariffs["export"] is not None: - nc += self.tariffs["export"].to_df(start, end, **kwargs)["unit"] * grid_exp / 2000 + nc += ( + self.tariffs["export"].to_df(start, end, **kwargs)["unit"] + * grid_exp + / 2000 + ) return nc class PVsystemModel: - def __init__(self, name: str, inverter: InverterModel, battery: BatteryModel, host=None) -> None: + def __init__( + self, name: str, inverter: InverterModel, battery: BatteryModel, host=None + ) -> None: self.name = name self.inverter = inverter self.battery = battery @@ -663,8 +717,12 @@ def flows(self, initial_soc, static_flows, slots=[], soc_now=None, **kwargs): df["chg_end"] = chg[1:] df["chg_end"] = df["chg_end"].bfill() df["battery"] = (pd.Series(chg).diff(-1) / freq)[:-1].to_list() - df.loc[df["battery"] > 0, "battery"] = df["battery"] * self.inverter.inverter_efficiency - df.loc[df["battery"] < 0, "battery"] = df["battery"] / self.inverter.charger_efficiency + df.loc[df["battery"] > 0, "battery"] = ( + df["battery"] * self.inverter.inverter_efficiency + ) + df.loc[df["battery"] < 0, "battery"] = ( + df["battery"] / self.inverter.charger_efficiency + ) df["grid"] = -(solar - consumption + df["battery"]).round(0) df["forced"] = forced_charge df["soc"] = (df["chg"] / self.battery.capacity) * 100 @@ -689,9 +747,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg prices = pd.concat( [ prices, - contract.tariffs[direction].to_df(start=static_flows.index[0], end=static_flows.index[-1])[ - "unit" - ], + contract.tariffs[direction].to_df( + start=static_flows.index[0], end=static_flows.index[-1] + )["unit"], ], axis=1, ) @@ -701,7 +759,10 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg f"Optimiser prices loaded for period {prices.index[0].strftime(TIME_FORMAT)} - {prices.index[-1].strftime(TIME_FORMAT)}" ) - prices = prices.set_axis([t for t in contract.tariffs.keys() if contract.tariffs[t] is not None], axis=1) + prices = prices.set_axis( + [t for t in contract.tariffs.keys() if contract.tariffs[t] is not None], + axis=1, + ) if not use_export: if log: self.log(f"Ignoring export pricing because Use Export is turned off") @@ -763,7 +824,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg if max_slot_energy > 0: round_trip_energy_required = ( - max_slot_energy / self.inverter.charger_efficiency / self.inverter.inverter_efficiency + max_slot_energy + / self.inverter.charger_efficiency + / self.inverter.inverter_efficiency ) # potential windows end at the max_slot @@ -771,7 +834,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg x = x[available.loc[:max_slot]] # count back to find the slots where soc_end < 100 - x["countback"] = (x["soc_end"] >= 97).sum() - (x["soc_end"] >= 97).cumsum() + x["countback"] = (x["soc_end"] >= 97).sum() - ( + x["soc_end"] >= 97 + ).cumsum() x = x[x["countback"] == 0] @@ -803,7 +868,11 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg if pd.Timestamp.now() > slot.tz_localize(None): factors.append( ( - (slot.tz_localize(None) + pd.Timedelta(30, "minutes")) - pd.Timestamp.now() + ( + slot.tz_localize(None) + + pd.Timedelta(30, "minutes") + ) + - pd.Timestamp.now() ).total_seconds() / 1800 ) @@ -814,7 +883,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg if round(cost_at_min_price, 1) < round(max_import_cost, 1): for slot, factor in zip(window, factors): - slot_power_required = max(round_trip_energy_required * 2000 * factor, 0) + slot_power_required = max( + round_trip_energy_required * 2000 * factor, 0 + ) slot_charger_power_available = max( self.inverter.charger_power - x["forced"].loc[slot] @@ -822,12 +893,23 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg 0, ) slot_available_capacity = max( - ((100 - x["soc_end"].loc[slot]) / 100 * self.battery.capacity) * 2 * factor, 0 + ( + (100 - x["soc_end"].loc[slot]) + / 100 + * self.battery.capacity + ) + * 2 + * factor, + 0, ) min_power = min( - slot_power_required, slot_charger_power_available, slot_available_capacity + slot_power_required, + slot_charger_power_available, + slot_available_capacity, + ) + remaining_slot_capacity = ( + slot_charger_power_available - min_power ) - remaining_slot_capacity = slot_charger_power_available - min_power if remaining_slot_capacity < 10: available[slot] = False @@ -854,7 +936,12 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg [ prices, consumption, - self.flows(initial_soc, static_flows, slots=slots, **kwargs), + self.flows( + initial_soc, + static_flows, + slots=slots, + **kwargs, + ), ], axis=1, ) @@ -867,11 +954,28 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg self.log(str_log) if self.host.debug: xx = pd.concat( - [old_cost, old_soc, contract.net_cost(df), df["soc_end"], df["import"]], + [ + old_cost, + old_soc, + contract.net_cost(df), + df["soc_end"], + df["import"], + ], + axis=1, + ).set_axis( + [ + "Old", + "Old_SOC", + "New", + "New_SOC", + "import", + ], axis=1, - ).set_axis(["Old", "Old_SOC", "New", "New_SOC", "import"], axis=1) + ) xx["Diff"] = xx["New"] - xx["Old"] - self.log(f"\n{xx.loc[window[0] : max_slot].to_string()}") + self.log( + f"\n{xx.loc[window[0] : max_slot].to_string()}" + ) # yy = False else: available[max_slot] = False @@ -933,16 +1037,22 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg slots_pre = copy(slots) if log: - self.log(f"Max export price when there is no forced charge: {max_export_price:0.2f}p/kWh.") + self.log( + f"Max export price when there is no forced charge: {max_export_price:0.2f}p/kWh." + ) i = 0 available = ( - (df["import"] < max_export_price) & (df["forced"] < self.inverter.charger_power) & (df["forced"] >= 0) + (df["import"] < max_export_price) + & (df["forced"] < self.inverter.charger_power) + & (df["forced"] >= 0) ) # self.log((df["import"]2d} Min import price {min_price:5.2f}p/kWh at {start_window.strftime(TIME_FORMAT)} {x.loc[start_window]['forced']:4.0f}W " if (pd.Timestamp.now() > start_window.tz_localize(None)) and ( - pd.Timestamp.now() < start_window.tz_localize(None) + pd.Timedelta(30, "minutes") + pd.Timestamp.now() + < start_window.tz_localize(None) + pd.Timedelta(30, "minutes") ): str_log += "* " factor = ( - (start_window.tz_localize(None) + pd.Timedelta(30, "minutes")) - pd.Timestamp.now() + ( + start_window.tz_localize(None) + + pd.Timedelta(30, "minutes") + ) + - pd.Timestamp.now() ).total_seconds() / 1800 else: str_log += " " @@ -980,7 +1095,13 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg min(self.battery.max_charge_power, self.inverter.charger_power) - x["forced"].loc[start_window] - x[cols["solar"]].loc[start_window], - ((100 - x["soc_end"].loc[start_window]) / 100 * self.battery.capacity) * 2 * factor, + ( + (100 - x["soc_end"].loc[start_window]) + / 100 + * self.battery.capacity + ) + * 2 + * factor, ) slot = ( @@ -993,17 +1114,19 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg df = pd.concat( [ prices, - self.flows(initial_soc, static_flows, slots=slots, **kwargs), + self.flows( + initial_soc, static_flows, slots=slots, **kwargs + ), ], axis=1, ) net_cost = contract.net_cost(df).sum() str_log += f"Net: {net_cost:5.1f} " - if net_cost < net_cost_opt - self.host.get_config("slot_threshold_p"): - str_log += ( - f"New SOC: {df.loc[start_window]['soc']:5.1f}%->{df.loc[start_window]['soc_end']:5.1f}% " - ) + if net_cost < net_cost_opt - self.host.get_config( + "slot_threshold_p" + ): + str_log += f"New SOC: {df.loc[start_window]['soc']:5.1f}%->{df.loc[start_window]['soc_end']:5.1f}% " str_log += f"Max export: {-df['grid'].min():0.0f}W " net_cost_opt = net_cost slots_added += 1 @@ -1015,7 +1138,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg df = pd.concat( [ prices, - self.flows(initial_soc, static_flows, slots=slots, **kwargs), + self.flows( + initial_soc, static_flows, slots=slots, **kwargs + ), ], axis=1, ) @@ -1059,7 +1184,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg available = (df["export"] > min_import_price) & (df["forced"] == 0) a0 = available.sum() if log: - self.log(f"{available.sum()} slots have an export price greater than the min import price") + self.log( + f"{available.sum()} slots have an export price greater than the min import price" + ) done = available.sum() == 0 while not done: @@ -1074,11 +1201,17 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg str_log = f"{available.sum():>2d} Max export price {max_price:5.2f}p/kWh at {start_window.strftime(TIME_FORMAT)} " if (pd.Timestamp.now() > start_window.tz_localize(None)) and ( - pd.Timestamp.now() < start_window.tz_localize(None) + pd.Timedelta(30, "minutes") + pd.Timestamp.now() + < start_window.tz_localize(None) + + pd.Timedelta(30, "minutes") ): str_log += "* " factor = ( - (start_window.tz_localize(None) + pd.Timedelta(30, "minutes")) - pd.Timestamp.now() + ( + start_window.tz_localize(None) + + pd.Timedelta(30, "minutes") + ) + - pd.Timestamp.now() ).total_seconds() / 1800 else: str_log += " " @@ -1089,9 +1222,19 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg slot = ( start_window, -min( - min(self.battery.max_discharge_power, self.inverter.inverter_power), + min( + self.battery.max_discharge_power, + self.inverter.inverter_power, + ), -x[kwargs.get("solar", "solar")].loc[start_window], - ((x["soc_end"].loc[start_window] - self.battery.max_dod) / 100 * self.battery.capacity) + ( + ( + x["soc_end"].loc[start_window] + - self.battery.max_dod + ) + / 100 + * self.battery.capacity + ) * 2 * factor, ), @@ -1102,14 +1245,18 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg df = pd.concat( [ prices, - self.flows(initial_soc, static_flows, slots=slots, **kwargs), + self.flows( + initial_soc, static_flows, slots=slots, **kwargs + ), ], axis=1, ) net_cost = contract.net_cost(df).sum() str_log += f"Net: {net_cost:5.1f} " - if net_cost < net_cost_opt - self.host.get_config("slot_threshold_p"): + if net_cost < net_cost_opt - self.host.get_config( + "slot_threshold_p" + ): str_log += f"New SOC: {df.loc[start_window]['soc']:5.1f}%->{df.loc[start_window]['soc_end']:5.1f}% " str_log += f"Max export: {-df['grid'].min():0.0f}W " net_cost_opt = net_cost @@ -1122,7 +1269,9 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg df = pd.concat( [ prices, - self.flows(initial_soc, static_flows, slots=slots, **kwargs), + self.flows( + initial_soc, static_flows, slots=slots, **kwargs + ), ], axis=1, ) @@ -1157,7 +1306,11 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg ) df.index = pd.to_datetime(df.index) - if (not self.host.get_config("allow_cyclic")) and (len(slots) > 0) and discharge: + if ( + (not self.host.get_config("allow_cyclic")) + and (len(slots) > 0) + and discharge + ): if log: self.log("") self.log("Removing cyclic charge/discharge") @@ -1188,13 +1341,17 @@ def optimised_force(self, initial_soc, static_flows, contract: Contract, **kwarg df = pd.concat( [ prices, - self.flows(initial_soc, static_flows, slots=revised_slots, **kwargs), + self.flows( + initial_soc, static_flows, slots=revised_slots, **kwargs + ), ], axis=1, ) net_cost_opt_new = contract.net_cost(df).sum() if log: - self.log(f" Net cost revised from {net_cost_opt:0.1f}p to {net_cost_opt_new:0.1f}p") + self.log( + f" Net cost revised from {net_cost_opt:0.1f}p to {net_cost_opt_new:0.1f}p" + ) slots = revised_slots df.index = pd.to_datetime(df.index) return df diff --git a/apps/pv_opt/solax.py b/apps/pv_opt/solax.py index 3eb5cbb..ba9f895 100644 --- a/apps/pv_opt/solax.py +++ b/apps/pv_opt/solax.py @@ -1,6 +1,7 @@ -import pandas as pd import time +import pandas as pd + TIMEFORMAT = "%H:%M" LIMITS = ["start", "end"] DIRECTIONS = ["charge"] @@ -70,10 +71,15 @@ def __init__(self, inverter_type, host) -> None: ): for item in defs: if isinstance(defs[item], str): - conf[item] = defs[item].replace("{device_name}", self.host.device_name) + conf[item] = defs[item].replace( + "{device_name}", self.host.device_name + ) # conf[item] = defs[item].replace("{inverter_sn}", self.host.inverter_sn) elif isinstance(defs[item], list): - conf[item] = [z.replace("{device_name}", self.host.device_name) for z in defs[item]] + conf[item] = [ + z.replace("{device_name}", self.host.device_name) + for z in defs[item] + ] # conf[item] = [z.replace("{inverter_sn}", self.host.inverter_sn) for z in defs[item]] else: conf[item] = defs[item] @@ -100,7 +106,9 @@ def control_charge(self, enable, **kwargs): if enable: self.host.set_select("use_mode", "Force Time Use") time_now = pd.Timestamp.now(tz=self.tz) - start = kwargs.get("start", time_now).floor("15min").strftime(TIMEFORMAT) + start = ( + kwargs.get("start", time_now).floor("15min").strftime(TIMEFORMAT) + ) end = kwargs.get("end", time_now).ceil("30min").strftime(TIMEFORMAT) self.host.set_select("charge_start_time_1", start) self.host.set_select("charge_end_time_1", end) @@ -113,11 +121,17 @@ def control_charge(self, enable, **kwargs): voltage = self.host.get_config("battery_voltage") if voltage == 0: voltage = BATTERY_VOLTAGE_DEFAULT - self.log(f"Read a battery voltage of zero. Assuming default of {BATTERY_VOLTAGE_DEFAULT}") + self.log( + f"Read a battery voltage of zero. Assuming default of {BATTERY_VOLTAGE_DEFAULT}" + ) current = abs(round(power / voltage, 1)) - current = min(current, self.host.get_config("battery_current_limit_amps")) + current = min( + current, self.host.get_config("battery_current_limit_amps") + ) - self.log(f"Power {power:0.0f} = {current:0.1f}A at {self.host.get_config('battery_voltage')}V") + self.log( + f"Power {power:0.0f} = {current:0.1f}A at {self.host.get_config('battery_voltage')}V" + ) changed, written = self.host.write_and_poll_value( entity_id=entity_id, value=current, tolerance=1, verbose=True ) @@ -126,7 +140,9 @@ def control_charge(self, enable, **kwargs): if written: self.log(f"Current {current}A written to inverter") else: - self.log(f"Failed to write current of {current}A to inverter") + self.log( + f"Failed to write current of {current}A to inverter" + ) else: self.log("Inverter already at correct current") @@ -142,7 +158,9 @@ def control_charge(self, enable, **kwargs): if written: self.log(f"Target SOC {target_soc}% written to inverter") else: - self.log(f"Failed to write Target SOC of {target_soc}% to inverter") + self.log( + f"Failed to write Target SOC of {target_soc}% to inverter" + ) else: self.log("Inverter already at correct Target SOC") else: @@ -229,10 +247,17 @@ def _solax_charge_periods(self, **kwargs): if self.type == "SOLAX_X1": return { limit: pd.Timestamp( - self.host.get_state_retry(entity_id=self.host.config[f"id_charge_{limit}_time_1"]), tz=self.tz + self.host.get_state_retry( + entity_id=self.host.config[f"id_charge_{limit}_time_1"] + ), + tz=self.tz, ) for limit in LIMITS - } | {"current": self.host.get_state_retry(entity_id=self.host.config[f"id_max_charge_current"])} + } | { + "current": self.host.get_state_retry( + entity_id=self.host.config[f"id_max_charge_current"] + ) + } else: self._unknown_inverter() diff --git a/apps/pv_opt/solis.py b/apps/pv_opt/solis.py index 38874e0..3a988ce 100644 --- a/apps/pv_opt/solis.py +++ b/apps/pv_opt/solis.py @@ -1,13 +1,14 @@ -import pandas as pd -import time +import base64 import hashlib import hmac -import base64 import json import re -import requests -from http import HTTPStatus +import time from datetime import datetime, timezone +from http import HTTPStatus + +import pandas as pd +import requests URLS = { "root": "https://www.soliscloud.com:13333", @@ -277,7 +278,9 @@ def get_body(self, **params): return body def digest(self, body: str) -> str: - return base64.b64encode(hashlib.md5(body.encode("utf-8")).digest()).decode("utf-8") + return base64.b64encode(hashlib.md5(body.encode("utf-8")).digest()).decode( + "utf-8" + ) def header(self, body: str, canonicalized_resource: str) -> dict[str, str]: content_md5 = self.digest(body) @@ -286,8 +289,22 @@ def header(self, body: str, canonicalized_resource: str) -> dict[str, str]: now = datetime.now(timezone.utc) date = now.strftime("%a, %d %b %Y %H:%M:%S GMT") - encrypt_str = "POST" + "\n" + content_md5 + "\n" + content_type + "\n" + date + "\n" + canonicalized_resource - hmac_obj = hmac.new(self.key_secret.encode("utf-8"), msg=encrypt_str.encode("utf-8"), digestmod=hashlib.sha1) + encrypt_str = ( + "POST" + + "\n" + + content_md5 + + "\n" + + content_type + + "\n" + + date + + "\n" + + canonicalized_resource + ) + hmac_obj = hmac.new( + self.key_secret.encode("utf-8"), + msg=encrypt_str.encode("utf-8"), + digestmod=hashlib.sha1, + ) sign = base64.b64encode(hmac_obj.digest()) authorization = "API " + str(self.key_id) + ":" + sign.decode("utf-8") @@ -303,7 +320,9 @@ def header(self, body: str, canonicalized_resource: str) -> dict[str, str]: def inverter_id(self): body = self.get_body(stationId=self.plant_id) header = self.header(body, self.URLS["inverterList"]) - response = requests.post(self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header + ) if response.status_code == HTTPStatus.OK: return response.json()["data"]["page"]["records"][0].get("id", "") @@ -311,7 +330,9 @@ def inverter_id(self): def inverter_sn(self): body = self.get_body(stationId=self.plant_id) header = self.header(body, self.URLS["inverterList"]) - response = requests.post(self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["inverterList"], data=body, headers=header + ) if response.status_code == HTTPStatus.OK: return response.json()["data"]["page"]["records"][0].get("sn", "") @@ -319,7 +340,9 @@ def inverter_sn(self): def inverter_details(self): body = self.get_body(id=self.inverter_id, sn=self.inverter_sn) header = self.header(body, self.URLS["inverterDetail"]) - response = requests.post(self.URLS["root"] + self.URLS["inverterDetail"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["inverterDetail"], data=body, headers=header + ) if response.status_code == HTTPStatus.OK: return response.json()["data"] @@ -343,7 +366,9 @@ def read_code(self, cid): body = self.get_body(inverterSn=self.inverter_sn, cid=cid) headers = self.header(body, self.URLS["atRead"]) headers["token"] = self.token - response = requests.post(self.URLS["root"] + self.URLS["atRead"], data=body, headers=headers) + response = requests.post( + self.URLS["root"] + self.URLS["atRead"], data=body, headers=headers + ) if response.status_code == HTTPStatus.OK: data = response.json()["data"]["msg"] else: @@ -363,14 +388,18 @@ def set_code(self, cid, value): body = self.get_body(inverterSn=self.inverter_sn, cid=cid, value=value) headers = self.header(body, self.URLS["control"]) headers["token"] = self.token - response = requests.post(self.URLS["root"] + self.URLS["control"], data=body, headers=headers) + response = requests.post( + self.URLS["root"] + self.URLS["control"], data=body, headers=headers + ) if response.status_code == HTTPStatus.OK: return response.json() def login(self): body = self.get_body(username=self.username, password=self.md5password) header = self.header(body, self.URLS["login"]) - response = requests.post(self.URLS["root"] + self.URLS["login"], data=body, headers=header) + response = requests.post( + self.URLS["root"] + self.URLS["login"], data=body, headers=header + ) status = response.status_code if status == HTTPStatus.OK: result = response.json() @@ -449,9 +478,14 @@ def __init__(self, inverter_type, host) -> None: ): for item in defs: if isinstance(defs[item], str): - conf[item] = defs[item].replace("{device_name}", self.host.device_name) + conf[item] = defs[item].replace( + "{device_name}", self.host.device_name + ) elif isinstance(defs[item], list): - conf[item] = [z.replace("{device_name}", self.host.device_name) for z in defs[item]] + conf[item] = [ + z.replace("{device_name}", self.host.device_name) + for z in defs[item] + ] else: conf[item] = defs[item] if self.type == "SOLIS_CLOUD": @@ -471,13 +505,23 @@ def is_online(self): entity_id = INVERTER_DEFS[self.type].get("online", (None, None)) if entity_id is not None: entity_id = entity_id.replace("{device_name}", self.host.device_name) - return self.host.get_state_retry(entity_id) not in ["unknown", "unavailable"] + return self.host.get_state_retry(entity_id) not in [ + "unknown", + "unavailable", + ] else: return True def enable_timed_mode(self): - if self.type in ["SOLIS_SOLAX_MODBUS", "SOLIS_CORE_MODBUS", "SOLIS_SOLARMAN", "SOLIS_CLOUD"]: - self._solis_set_mode_switch(SelfUse=True, Timed=True, GridCharge=True, Backup=False) + if self.type in [ + "SOLIS_SOLAX_MODBUS", + "SOLIS_CORE_MODBUS", + "SOLIS_SOLARMAN", + "SOLIS_CLOUD", + ]: + self._solis_set_mode_switch( + SelfUse=True, Timed=True, GridCharge=True, Backup=False + ) else: self._unknown_inverter() @@ -492,7 +536,12 @@ def control_discharge(self, enable, **kwargs): self._control_charge_discharge("discharge", enable, **kwargs) def hold_soc(self, enable, soc=None, **kwargs): - if self.type in ["SOLIS_SOLAX_MODBUS", "SOLIS_CORE_MODBUS", "SOLIS_SOLARMAN", "SOLIS_CLOUD"]: + if self.type in [ + "SOLIS_SOLAX_MODBUS", + "SOLIS_CORE_MODBUS", + "SOLIS_SOLARMAN", + "SOLIS_CLOUD", + ]: start = kwargs.get("start", pd.Timestamp.now(tz=self.tz).floor("1min")) end = kwargs.get("end", pd.Timestamp.now(tz=self.tz).ceil("30min")) self._solis_control_charge_discharge( @@ -512,10 +561,16 @@ def _unknown_inverter(self): raise Exception(e) def hold_soc_old(self, enable, soc=None): - if self.type == "SOLIS_SOLAX_MODBUS" or self.type == "SOLIS_CORE_MODBUS" or self.type == "SOLIS_SOLARMAN": + if ( + self.type == "SOLIS_SOLAX_MODBUS" + or self.type == "SOLIS_CORE_MODBUS" + or self.type == "SOLIS_SOLARMAN" + ): if enable: - self._solis_set_mode_switch(SelfUse=True, Timed=False, GridCharge=True, Backup=True) + self._solis_set_mode_switch( + SelfUse=True, Timed=False, GridCharge=True, Backup=True + ) else: self.enable_timed_mode() @@ -529,7 +584,9 @@ def hold_soc_old(self, enable, soc=None): self.log(f"Setting Backup SOC to {soc}%") if self.type == "SOLIS_SOLAX_MODBUS": - changed, written = self.host.write_and_poll_value(entity_id=entity_id, value=soc) + changed, written = self.host.write_and_poll_value( + entity_id=entity_id, value=soc + ) elif self.type == "SOLIS_CORE_MODBUS" or self.type == "SOLIS_SOLARMAN": changed, written = self.solis_write_holding_register( address=INVERTER_DEFS(self.type)["registers"]["backup_mode_soc"], @@ -544,7 +601,12 @@ def hold_soc_old(self, enable, soc=None): @property def status(self): status = None - if self.type in ["SOLIS_SOLAX_MODBUS", "SOLIS_CORE_MODBUS", "SOLIS_SOLARMAN", "SOLIS_CLOUD"]: + if self.type in [ + "SOLIS_SOLAX_MODBUS", + "SOLIS_CORE_MODBUS", + "SOLIS_SOLARMAN", + "SOLIS_CLOUD", + ]: status = self._solis_state() return status @@ -552,7 +614,12 @@ def _monitor_target_soc(self, target_soc, mode="charge"): pass def _control_charge_discharge(self, direction, enable, **kwargs): - if self.type in ["SOLIS_SOLAX_MODBUS", "SOLIS_CORE_MODBUS", "SOLIS_SOLARMAN", "SOLIS_CLOUD"]: + if self.type in [ + "SOLIS_SOLAX_MODBUS", + "SOLIS_CORE_MODBUS", + "SOLIS_SOLARMAN", + "SOLIS_CLOUD", + ]: self._solis_control_charge_discharge(direction, enable, **kwargs) def _solis_control_charge_discharge(self, direction, enable, **kwargs): @@ -598,7 +665,9 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): for limit in times: if times[limit] is not None: for unit in ["hours", "minutes"]: - entity_id = self.host.config[f"id_timed_{direction}_{limit}_{unit}"] + entity_id = self.host.config[ + f"id_timed_{direction}_{limit}_{unit}" + ] if unit == "hours": value = times[limit].hour else: @@ -608,8 +677,13 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): changed, written = self.host.write_and_poll_value( entity_id=entity_id, value=value, verbose=True ) - elif self.type == "SOLIS_CORE_MODBUS" or self.type == "SOLIS_SOLARMAN": - changed, written = self._solis_write_time_register(direction, limit, unit, value) + elif ( + self.type == "SOLIS_CORE_MODBUS" + or self.type == "SOLIS_SOLARMAN" + ): + changed, written = self._solis_write_time_register( + direction, limit, unit, value + ) else: e = "Unknown inverter type" @@ -618,7 +692,9 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): if changed: if written: - self.log(f"Wrote {direction} {limit} {unit} of {value} to inverter") + self.log( + f"Wrote {direction} {limit} {unit} of {value} to inverter" + ) value_changed = True else: self.log( @@ -633,9 +709,13 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): self.host.call_service("button/press", entity_id=entity_id) time.sleep(0.5) try: - time_pressed = pd.Timestamp(self.host.get_state_retry(entity_id)) + time_pressed = pd.Timestamp( + self.host.get_state_retry(entity_id) + ) - dt = (pd.Timestamp.now(self.host.tz) - time_pressed).total_seconds() + dt = ( + pd.Timestamp.now(self.host.tz) - time_pressed + ).total_seconds() if dt < 10: self.log(f"Successfully pressed button {entity_id}") @@ -644,7 +724,9 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): f"Failed to press button {entity_id}. Last pressed at {time_pressed.strftime(TIMEFORMAT)} ({dt:0.2f} seconds ago)" ) except: - self.log(f"Failed to press button {entity_id}: it appears to never have been pressed.") + self.log( + f"Failed to press button {entity_id}: it appears to never have been pressed." + ) else: self.log("Inverter already at correct time settings") @@ -653,12 +735,20 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): entity_id = self.host.config[f"id_timed_{direction}_current"] current = abs(round(power / self.host.get_config("battery_voltage"), 1)) - current = min(current, self.host.get_config("battery_current_limit_amps")) - self.log(f"Power {power:0.0f} = {current:0.1f}A at {self.host.get_config('battery_voltage')}V") + current = min( + current, self.host.get_config("battery_current_limit_amps") + ) + self.log( + f"Power {power:0.0f} = {current:0.1f}A at {self.host.get_config('battery_voltage')}V" + ) if self.type == "SOLIS_SOLAX_MODBUS": - changed, written = self.host.write_and_poll_value(entity_id=entity_id, value=current, tolerance=1) + changed, written = self.host.write_and_poll_value( + entity_id=entity_id, value=current, tolerance=1 + ) elif self.type == "SOLIS_CORE_MODBUS" or self.type == "SOLIS_SOLARMAN": - changed, written = self._solis_write_current_register(direction, current, tolerance=1) + changed, written = self._solis_write_current_register( + direction, current, tolerance=1 + ) else: e = "Unknown inverter type" self.log(e, level="ERROR") @@ -675,8 +765,12 @@ def _solis_control_charge_discharge(self, direction, enable, **kwargs): elif self.type == "SOLIS_CLOUD": current = abs(round(power / self.host.get_config("battery_voltage"), 0)) current = min(current, self.host.get_config("battery_current_limit_amps")) - self.log(f"Power {power:0.0f} = {current:0.0f}A at {self.host.get_config('battery_voltage')}V") - response = self.cloud.set_timer(direction, times["start"], times["end"], current) + self.log( + f"Power {power:0.0f} = {current:0.0f}A at {self.host.get_config('battery_voltage')}V" + ) + response = self.cloud.set_timer( + direction, times["start"], times["end"], current + ) if response["code"] == -1: self.log("Inverter already at correct time and current settings") elif response["code"] == 0: @@ -717,7 +811,10 @@ def _solis_set_mode_switch(self, **kwargs): if self.type == "SOLIS_SOLAX_MODBUS": entity_modes = self.host.get_state_retry(entity_id, attribute="options") - modes = {INVERTER_DEFS[self.type]["codes"].get(mode): mode for mode in entity_modes} + modes = { + INVERTER_DEFS[self.type]["codes"].get(mode): mode + for mode in entity_modes + } # mode = INVERTER_DEFS[self.type]["modes"].get(code) mode = modes.get(code) if self.host.debug: @@ -730,13 +827,17 @@ def _solis_set_mode_switch(self, **kwargs): elif self.type == "SOLIS_CORE_MODBUS" or self.type == "SOLIS_SOLARMAN": address = INVERTER_DEFS[self.type]["registers"]["storage_control_switch"] - self._solis_write_holding_register(address=address, value=code, entity_id=entity_id) + self._solis_write_holding_register( + address=address, value=code, entity_id=entity_id + ) elif self.type == "SOLIS_CLOUD": self.cloud.set_mode_switch(code) def _solis_solax_solarman_mode_switch(self): - inverter_mode = self.host.get_state_retry(entity_id=self.host.config["id_inverter_mode"]) + inverter_mode = self.host.get_state_retry( + entity_id=self.host.config["id_inverter_mode"] + ) if self.type == "SOLIS_SOLAX_MODBUS": code = INVERTER_DEFS[self.type]["codes"][inverter_mode] else: @@ -752,7 +853,9 @@ def _solis_solax_solarman_mode_switch(self): def _solis_core_mode_switch(self): bits = INVERTER_DEFS["SOLIS_CORE_MODBUS"]["bits"] - code = int(self.host.get_state_retry(entity_id=self.host.config["id_inverter_mode"])) + code = int( + self.host.get_state_retry(entity_id=self.host.config["id_inverter_mode"]) + ) switches = {bit: (code & 2**i == 2**i) for i, bit in enumerate(bits)} return {"code": code, "switches": switches} @@ -776,14 +879,21 @@ def _solis_state(self): for limit in limits: states = {} for unit in ["hours", "minutes"]: - entity_id = self.host.config[f"id_timed_{direction}_{limit}_{unit}"] - states[unit] = int(float(self.host.get_state_retry(entity_id=entity_id))) + entity_id = self.host.config[ + f"id_timed_{direction}_{limit}_{unit}" + ] + states[unit] = int( + float(self.host.get_state_retry(entity_id=entity_id)) + ) status[direction][limit] = pd.Timestamp( - f"{states['hours']:02d}:{states['minutes']:02d}", tz=self.host.tz + f"{states['hours']:02d}:{states['minutes']:02d}", + tz=self.host.tz, ) status[direction]["current"] = float( - self.host.get_state_retry(self.host.config[f"id_timed_{direction}_current"]) + self.host.get_state_retry( + self.host.config[f"id_timed_{direction}_current"] + ) ) elif self.type == "SOLIS_CLOUD": @@ -849,7 +959,9 @@ def _solis_write_holding_register( if changed: data = {"register": address, "value": value} # self.host.call_service("solarman/write_holding_register", **data) - self.log(">>> Writing {value} to inverter register {address} using Solarman") + self.log( + ">>> Writing {value} to inverter register {address} using Solarman" + ) written = True return changed, written @@ -866,7 +978,11 @@ def _solis_write_current_register(self, direction, current, tolerance): ) def _solis_write_time_register(self, direction, limit, unit, value): - address = INVERTER_DEFS[self.type]["registers"][f"timed_{direction}_{limit}_{unit}"] + address = INVERTER_DEFS[self.type]["registers"][ + f"timed_{direction}_{limit}_{unit}" + ] entity_id = self.host.config[f"id_timed_{direction}_{limit}_{unit}"] - return self._solis_write_holding_register(address=address, value=value, entity_id=entity_id) + return self._solis_write_holding_register( + address=address, value=value, entity_id=entity_id + ) diff --git a/apps/pv_opt/sunsynk.py b/apps/pv_opt/sunsynk.py index 1955c23..f66f8fe 100644 --- a/apps/pv_opt/sunsynk.py +++ b/apps/pv_opt/sunsynk.py @@ -1,6 +1,7 @@ -import pandas as pd -import time import json +import time + +import pandas as pd TIMEFORMAT = "%H:%M" INVERTER_DEFS = { @@ -55,13 +56,17 @@ "id_priority_load": "sensor.{device_name}_{inverter_sn}_priority_load", "id_timed_charge_start": "sensor.{device_name}_{inverter_sn}_prog1_time", "id_timed_charge_end": "sensor.{device_name}_{inverter_sn}_prog2_time", - "id_timed_charge_unused": ["sensor.{device_name}_{inverter_sn}_" + f"prog{i}_time" for i in range(2, 7)], + "id_timed_charge_unused": [ + "sensor.{device_name}_{inverter_sn}_" + f"prog{i}_time" + for i in range(2, 7) + ], "id_timed_charge_enable": "sensor.{device_name}_{inverter_sn}_prog1_charge", "id_timed_charge_capacity": "sensor.{device_name}_{inverter_sn}_prog1_capacity", "id_timed_discharge_start": "sensor.{device_name}_{inverter_sn}_prog3_time", "id_timed_discharge_end": "sensor.{device_name}_{inverter_sn}_prog4_time", "id_timed_discharge_unused": [ - "sensor.{device_name}_{inverter_sn}_" + f"prog{i}_time" for i in [1, 2, 5, 6] + "sensor.{device_name}_{inverter_sn}_" + f"prog{i}_time" + for i in [1, 2, 5, 6] ], "id_timed_dicharge_enable": "sensor.{device_name}_{inverter_sn}_prog3_charge", "id_timed_discharge_capacity": "sensor.{device_name}_{inverter_sn}_prog3_capacity", @@ -106,11 +111,21 @@ def __init__(self, inverter_type, host) -> None: ): for item in defs: if isinstance(defs[item], str): - conf[item] = defs[item].replace("{device_name}", self.host.device_name) - conf[item] = defs[item].replace("{inverter_sn}", self.host.inverter_sn) + conf[item] = defs[item].replace( + "{device_name}", self.host.device_name + ) + conf[item] = defs[item].replace( + "{inverter_sn}", self.host.inverter_sn + ) elif isinstance(defs[item], list): - conf[item] = [z.replace("{device_name}", self.host.device_name) for z in defs[item]] - conf[item] = [z.replace("{inverter_sn}", self.host.inverter_sn) for z in defs[item]] + conf[item] = [ + z.replace("{device_name}", self.host.device_name) + for z in defs[item] + ] + conf[item] = [ + z.replace("{inverter_sn}", self.host.inverter_sn) + for z in defs[item] + ] else: conf[item] = defs[item] @@ -154,13 +169,18 @@ def control_charge(self, enable, **kwargs): self.enable_timed_mode() params = { self.config["json_work_mode"]: 2, - self.config["json_timed_charge_target_soc"]: kwargs.get("target_soc", 100), - self.config["json_timed_charge_start"]: kwargs.get("start", time_now.strftime(TIMEFORMAT)), + self.config["json_timed_charge_target_soc"]: kwargs.get( + "target_soc", 100 + ), + self.config["json_timed_charge_start"]: kwargs.get( + "start", time_now.strftime(TIMEFORMAT) + ), self.config["json_timed_charge_end"]: kwargs.get( "end", time_now.ceil("30min").strftime(TIMEFORMAT) ), self.config["json_charge_current"]: min( - kwargs.get("power", 0) / self.host.get_config("battery_voltage"), + kwargs.get("power", 0) + / self.host.get_config("battery_voltage"), self.host.get_config("battery_current_limit_amps"), ), self.config["json_timed_charge_enable"]: True, @@ -175,7 +195,9 @@ def control_charge(self, enable, **kwargs): self.config["json_target_soc"]: 100, self.config["json_timed_charge_start"]: "00:00", self.config["json_timed_charge_end"]: "00:00", - self.config["json_charge_current"]: self.host.get_config("battery_current_limit_amps"), + self.config["json_charge_current"]: self.host.get_config( + "battery_current_limit_amps" + ), self.config["json_timed_charge_enable"]: False, self.config["json_gen_charge_enable"]: True, } | {x: "00:00" for x in self.config["json_timed_charge_unused"]} @@ -193,7 +215,9 @@ def control_discharge(self, enable, **kwargs): self.config["json_timed_discharge_target_soc"]: kwargs.get( "target_soc", self.host.get_config("maximum_dod_percent") ), - self.config["json_timed_discharge_start"]: kwargs.get("start", time_now.strftime(TIMEFORMAT)), + self.config["json_timed_discharge_start"]: kwargs.get( + "start", time_now.strftime(TIMEFORMAT) + ), self.config["json_timed_discharge_end"]: kwargs.get( "end", time_now.ceil("30min").strftime(TIMEFORMAT) ), @@ -230,10 +254,18 @@ def status(self): time_now = pd.Timestamp.now(tz=self.tz) if self.type == "SUNSYNK_SOLARSYNK2": - charge_start = pd.Timestamp(self.host.get_config("id_timed_charge_start"), tz=self.tz) - charge_end = pd.Timestamp(self.host.get_config("id_timed_charge_end"), tz=self.tz) - discharge_start = pd.Timestamp(self.host.get_config("id_timed_charge_start"), tz=self.tz) - discharge_end = pd.Timestamp(self.host.get_config("id_timed_charge_end"), tz=self.tz) + charge_start = pd.Timestamp( + self.host.get_config("id_timed_charge_start"), tz=self.tz + ) + charge_end = pd.Timestamp( + self.host.get_config("id_timed_charge_end"), tz=self.tz + ) + discharge_start = pd.Timestamp( + self.host.get_config("id_timed_charge_start"), tz=self.tz + ) + discharge_end = pd.Timestamp( + self.host.get_config("id_timed_charge_end"), tz=self.tz + ) status = { "timer mode": self.host.get_config("id_use_timer"),