Skip to content

Commit

Permalink
Standardise input sensor/meter handelling to permitt unlimited meters
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamish Findlay committed Sep 9, 2023
1 parent 9bd56fc commit 875a8d0
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 134 deletions.
246 changes: 113 additions & 133 deletions custom_components/battery_sim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
FORCE_DISCHARGE,
GRID_EXPORT_SIM,
GRID_IMPORT_SIM,
GRID_SECOND_EXPORT_SIM,
GRID_SECOND_IMPORT_SIM,
MESSAGE_TYPE_BATTERY_RESET_IMP,
MESSAGE_TYPE_BATTERY_RESET_EXP,
MESSAGE_TYPE_BATTERY_UPDATE,
Expand All @@ -68,6 +70,14 @@
PAUSE_BATTERY,
TARIFF_SENSOR_ENTITIES,
TARIFF_TYPE,
SENSOR_ID,
SENSOR_TYPE,
TARIFF,
CONF_SECOND_ENERGY_IMPORT_TARIFF,
CONF_SECOND_ENERGY_EXPORT_TARIFF,
IMPORT,
EXPORT,
SIMULATED_SENSOR
)

BATTERY_CONFIG_SCHEMA = vol.Schema(
Expand Down Expand Up @@ -164,12 +174,6 @@ def __init__(self, config, hass):

self._sensor_collection: list = []

self._export_sensor_id: str = config[CONF_EXPORT_SENSOR]
self._export_tariff_sensor_id: str = None

self._import_sensor_id: str = config[CONF_IMPORT_SENSOR]
self._import_tariff_sensor_id: str = None

self._charging: bool = False

self._last_import_reading_time = time.time()
Expand All @@ -182,32 +186,56 @@ def __init__(self, config, hass):
self._last_export_reading: float = 0.0
self._last_import_cumulative_reading: float = 1.0

self._second_import_sensor_id: str = (
config.get(CONF_SECOND_IMPORT_SENSOR)
if len(config.get(CONF_SECOND_IMPORT_SENSOR, "")) > 6
else None
)

self._second_export_sensor_id: str = (
config.get(CONF_SECOND_EXPORT_SENSOR)
if len(config.get(CONF_SECOND_EXPORT_SENSOR, "")) > 6
else None
)

"""Default sensor entities for backwards compatibility"""
self._tariff_type: str = TARIFF_SENSOR_ENTITIES

if TARIFF_TYPE in config:
self._tariff_type = config[TARIFF_TYPE]

self._inputs = [
{
SENSOR_ID: config[CONF_IMPORT_SENSOR],
SENSOR_TYPE: IMPORT,
SIMULATED_SENSOR: GRID_IMPORT_SIM,
TARIFF_TYPE: self._tariff_type,
TARIFF: None
},
{
SENSOR_ID: config[CONF_EXPORT_SENSOR],
SENSOR_TYPE: EXPORT,
SIMULATED_SENSOR: GRID_EXPORT_SIM,
TARIFF_TYPE: self._tariff_type,
TARIFF: None
},
]
if len(config.get(CONF_SECOND_IMPORT_SENSOR, "")) > 6:
self._inputs[2] = {
SENSOR_ID: config[CONF_SECOND_IMPORT_SENSOR],
SENSOR_TYPE: IMPORT,
SIMULATED_SENSOR: GRID_SECOND_IMPORT_SIM,
TARIFF_TYPE: self._tariff_type,
TARIFF: None
}
if len(config.get(CONF_SECOND_EXPORT_SENSOR, "")) > 6:
self._inputs[3] = {
SENSOR_ID: config[CONF_SECOND_EXPORT_SENSOR],
SENSOR_TYPE: EXPORT,
SIMULATED_SENSOR: GRID_SECOND_EXPORT_SIM,
TARIFF_TYPE: self._tariff_type,
TARIFF: None
}

"""Default sensor entities for backwards compatibility"""
if CONF_ENERGY_IMPORT_TARIFF in config:
self._import_tariff_sensor_id = config[CONF_ENERGY_IMPORT_TARIFF]
self._inputs[0][TARIFF] = config[CONF_ENERGY_IMPORT_TARIFF]
elif CONF_ENERGY_TARIFF in config:
"""For backwards compatibility"""
self._import_tariff_sensor_id = config[CONF_ENERGY_TARIFF]
self._inputs[0][TARIFF] = config[CONF_ENERGY_TARIFF]

if CONF_ENERGY_EXPORT_TARIFF in config:
self._export_tariff_sensor_id = config[CONF_ENERGY_EXPORT_TARIFF]
self._inputs[1][TARIFF] = config[CONF_ENERGY_EXPORT_TARIFF]
if CONF_SECOND_ENERGY_IMPORT_TARIFF in config:
self._inputs[2][TARIFF] = config[CONF_SECOND_ENERGY_IMPORT_TARIFF]
if CONF_SECOND_ENERGY_EXPORT_TARIFF in config:
self._inputs[3][TARIFF] = config[CONF_SECOND_ENERGY_EXPORT_TARIFF]

self._switches: dict = {
OVERIDE_CHARGING: False,
Expand All @@ -224,6 +252,8 @@ def __init__(self, config, hass):
DISCHARGING_RATE: 0.0,
GRID_EXPORT_SIM: 0.0,
GRID_IMPORT_SIM: 0.0,
GRID_SECOND_EXPORT_SIM: 0.0,
GRID_SECOND_IMPORT_SIM: 0.0,
ATTR_MONEY_SAVED: 0.0,
BATTERY_MODE: MODE_IDLE,
ATTR_MONEY_SAVED_IMPORT: 0.0,
Expand Down Expand Up @@ -254,16 +284,11 @@ def __init__(self, config, hass):
def async_reset_battery(self):
"""Reset the battery to start over."""
_LOGGER.debug("Reset battery")
self.reset_sim_sensor(
GRID_IMPORT_SIM,
self._import_sensor_id,
self._second_import_sensor_id
)
self.reset_sim_sensor(
GRID_EXPORT_SIM,
self._export_sensor_id,
self._second_export_sensor_id
)
for input in self._inputs:
self.reset_sim_sensor(
input[SIMULATED_SENSOR],
input[SENSOR_ID]
)

self._charge_state = 0.0

Expand All @@ -288,27 +313,19 @@ def async_reset_battery(self):
def reset_sim_sensor(
self,
target_sensor_key,
primary_sensor_id,
secondary_sensor_id
real_sensor_id,
):
"""Reset the Primary and Secondary Sensor."""
"""Reset the Simulated Sensor."""
_LOGGER.debug(f"Reset {target_sensor_key} sim sensor")

sensor_ids = [primary_sensor_id]

if secondary_sensor_id is not None:
sensor_ids.append(secondary_sensor_id)
self._sensors[target_sensor_key] = 0.0

total_sim = 0.0
if self._hass.states.get(real_sensor_id).state not in [
STATE_UNAVAILABLE,
STATE_UNKNOWN,
]:
self._sensors[target_sensor_key] = float(self._hass.states.get(real_sensor_id).state)

for sid in sensor_ids:
if self._hass.states.get(sid).state not in [
STATE_UNAVAILABLE,
STATE_UNKNOWN,
]:
total_sim += float(self._hass.states.get(sid).state)

self._sensors[target_sensor_key] = total_sim
dispatcher_send(
self._hass,
f"{self._name}-{MESSAGE_TYPE_BATTERY_UPDATE}"
Expand All @@ -318,52 +335,19 @@ def reset_sim_sensor(
def async_source_tracking(self, event):
"""Wait for source to be ready, then start."""

def start_sensor_tracking(
sensor_id: str,
collection: str,
reading_function,
is_import: bool
):
reading_function=self.async_reading_handler
collection="_sensor_collection",
for input_details in self._inputs:
"""Start tracking state changes for a sensor."""
getattr(self, collection).append(
async_track_state_change_event(
self._hass,
[sensor_id],
lambda event: reading_function(event, is_import),
[input_details[SENSOR_ID]],
lambda event: reading_function(event, input_details),
)
)

_LOGGER.debug("(%s) monitoring %s", self._name, sensor_id)

start_sensor_tracking(
sensor_id=self._import_sensor_id,
collection="_sensor_collection",
reading_function=self.async_reading_handler,
is_import=True,
)

start_sensor_tracking(
sensor_id=self._export_sensor_id,
collection="_sensor_collection",
reading_function=self.async_reading_handler,
is_import=False,
)

if self._second_import_sensor_id is not None:
start_sensor_tracking(
sensor_id=self._second_import_sensor_id,
collection="_sensor_collection",
reading_function=self.async_reading_handler,
is_import=True,
)

if self._second_import_sensor_id is not None:
start_sensor_tracking(
sensor_id=self._second_export_sensor_id,
collection="_sensor_collection",
reading_function=self.async_reading_handler,
is_import=False,
)
_LOGGER.debug("(%s) monitoring %s", self._name, input_details[SENSOR_ID])

_LOGGER.debug(
"(%s) monitoring %s", self._name, "Done adding the Sensor tracking"
Expand All @@ -374,14 +358,10 @@ def start_sensor_tracking(
def async_reading_handler(
self,
event,
is_import
input_details
):
"""Handle the sensor state changes for import or export."""
sensor_charge_rate = DISCHARGING_RATE if is_import else CHARGING_RATE
sensor_type = "import" if is_import else "export"

cumulative_reading_attr = f"_last_{sensor_type}_cumulative_reading"
last_reading_time_attr = f"_last_{sensor_type}_reading_time"
sensor_charge_rate = DISCHARGING_RATE if input_details[SENSOR_TYPE] == IMPORT else CHARGING_RATE

last_reading_time = time.time()

Expand Down Expand Up @@ -421,64 +401,64 @@ def async_reading_handler(
reading_variance = new_state_value - old_state_value

_LOGGER.debug(
f"({self._name}) {sensor_id} {is_import}: {old_state_value} {unit_of_energy} => {new_state_value} {unit_of_energy} = Δ {reading_variance} {unit_of_energy}"
f"({self._name}) {sensor_id} {input_details[SENSOR_TYPE]}: {old_state_value} {unit_of_energy} => {new_state_value} {unit_of_energy} = Δ {reading_variance} {unit_of_energy}"
)

if reading_variance < 0:
_LOGGER.warning(
"(%s) %s sensor value decreased - meter may have been reset",
self._name,
sensor_type,
input_details[SENSOR_TYPE],
)
self._sensors[sensor_charge_rate] = 0
return

if reading_variance >= 0:
setattr(self, cumulative_reading_attr, new_state_value)

if is_import:
self.update_battery(
reading_variance,
self._last_export_reading
)
if input_details[SENSOR_TYPE] is IMPORT:
self._last_import_reading_sensor_data = input_details
self.update_battery(
reading_variance,
self._last_export_reading
)
self._last_export_reading = 0.0
self._last_import_reading_time = last_reading_time

if input_details[SENSOR_TYPE] is EXPORT:
self._last_export_reading_sensor_data = input_details
if self._last_import_reading_time > self._last_export_reading_time:
if self._last_export_reading > 0:
_LOGGER.warning(
"(%s) Accumulated export reading not cleared error",
self._last_export_reading,
)

self._last_export_reading = reading_variance
else:
reading_variance += self._last_export_reading
self._last_export_reading = 0.0
self.update_battery(0.0, reading_variance)

if not is_import:
if self._last_import_reading_time > self._last_export_reading_time:
if self._last_export_reading > 0:
_LOGGER.warning(
"(%s) Accumulated export reading not cleared error",
self._last_export_reading,
)

self._last_export_reading = reading_variance

else:
reading_variance += self._last_export_reading
self._last_export_reading = 0.0
self.update_battery(0.0, reading_variance)
self._last_export_reading_time = last_reading_time

# Finish the Sensor Reading
setattr(self, last_reading_time_attr, last_reading_time)
return

def get_tariff_information(self, entity_id):
def get_tariff_information(self, input_details):
"""Get Tarrif information to be used for calculating."""
if self._tariff_type == NO_TARIFF_INFO:
if input_details[TARIFF_TYPE] == NO_TARIFF_INFO:
return None
elif self._tariff_type == FIXED_NUMERICAL_TARIFFS:
return entity_id
elif input_details[TARIFF_TYPE] == FIXED_NUMERICAL_TARIFFS:
return input_details[TARIFF]

# Default behavior - assume sensor entities
if (
entity_id is None
or len(entity_id) < 6
or self._hass.states.get(entity_id) is None
or self._hass.states.get(entity_id).state
input_details[TARIFF] is None
or len(input_details[TARIFF]) < 6
or self._hass.states.get(input_details[TARIFF]) is None
or self._hass.states.get(input_details[TARIFF]).state
in [STATE_UNAVAILABLE, STATE_UNKNOWN]
):
return None

return float(self._hass.states.get(entity_id).state)
return float(self._hass.states.get(input_details[TARIFF]).state)

def update_battery(self, import_amount, export_amount):
"""Update battery statistics based on the reading for Im- or Export."""
Expand Down Expand Up @@ -588,10 +568,10 @@ def update_battery(self, import_amount, export_amount):
self._sensors[BATTERY_MODE] = MODE_IDLE

current_import_tariff = self.get_tariff_information(
self._import_tariff_sensor_id
self._last_import_reading_sensor_data
)
current_export_tariff = self.get_tariff_information(
self._export_tariff_sensor_id
self._last_export_reading_sensor_data
)

if current_import_tariff is not None:
Expand All @@ -615,8 +595,8 @@ def update_battery(self, import_amount, export_amount):
)

self._sensors[ATTR_ENERGY_SAVED] += import_amount - net_import
self._sensors[GRID_IMPORT_SIM] += net_import
self._sensors[GRID_EXPORT_SIM] += net_export
self._sensors[self._last_import_reading_sensor_data[SIMULATED_SENSOR]] += net_import
self._sensors[self._last_import_reading_sensor_data[SIMULATED_SENSOR]] += net_export
self._sensors[ATTR_ENERGY_BATTERY_IN] += amount_to_charge
self._sensors[ATTR_ENERGY_BATTERY_OUT] += amount_to_discharge

Expand Down
Loading

0 comments on commit 875a8d0

Please sign in to comment.