Skip to content

Commit

Permalink
fix: api.py revert back cause of stability issues
Browse files Browse the repository at this point in the history
fix: Integration reload
feat: Asynchronous Discovery
feat: Dynamic load of inverter definition list
  • Loading branch information
davidrapan committed Jul 3, 2024
1 parent 846fa15 commit 2a6c52b
Show file tree
Hide file tree
Showing 11 changed files with 1,806 additions and 358 deletions.
217 changes: 164 additions & 53 deletions custom_components/solarman/api.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
from __future__ import annotations

import time
import socket
import time
import yaml
import struct
import logging
import asyncio
import aiofiles
import threading
import concurrent.futures

from datetime import datetime
from pysolarmanv5 import PySolarmanV5Async, NoSocketAvailableError
from pysolarmanv5 import PySolarmanV5Async, V5FrameError, NoSocketAvailableError
from homeassistant.helpers.update_coordinator import UpdateFailed

from .const import *
from .common import *
from .parser import ParameterParser

_LOGGER = logging.getLogger(__name__)

class PySolarmanV5AsyncApi(PySolarmanV5Async):
def read_file(filepath):
with open(filepath) as file:
return file.read()

class InverterApi(PySolarmanV5Async):
def __init__(self, address, serial, port, mb_slave_id):
super().__init__(address, serial, port = port, mb_slave_id = mb_slave_id, logger = _LOGGER, auto_reconnect = True, socket_timeout = COORDINATOR_SOCKET_TIMEOUT)
super().__init__(address, serial, port = port, mb_slave_id = mb_slave_id, logger = _LOGGER, auto_reconnect = True, socket_timeout = COORDINATOR_TIMEOUT)
self._last_frame: bytes = b""
self.status = -1
self.status_lastUpdate = "N/A"

def is_connecting(self):
return self.status == 0

def is_connected(self):
return self.status > -1

async def reconnect(self) -> None:
"""
Overridden and prevented the exception to be risen (only logged).
Overridden to prevent the exception to be risen (only logged).
Because the method is called as a Task.
"""
Expand All @@ -43,10 +58,10 @@ async def reconnect(self) -> None:
async def _send_receive_v5_frame(self, data_logging_stick_frame):
"""
Overridden cause of the noisy TimeoutError exception.
Which is in fact kinda expected to happen now and then.
Which is in fact kinda expected cause of communication with Solarman servers to happen now and then.
"""
self.log.debug(f"[{self.serial}] SENT: {data_logging_stick_frame.hex(" ")}")
self.log.debug("[%s] SENT: %s", self.serial, data_logging_stick_frame.hex(" "))
self.data_wanted_ev.set()
self._last_frame = data_logging_stick_frame
try:
Expand All @@ -55,55 +70,26 @@ async def _send_receive_v5_frame(self, data_logging_stick_frame):
v5_response = await asyncio.wait_for(self.data_queue.get(), self.socket_timeout)
if v5_response == b"":
raise NoSocketAvailableError("Connection closed on read. Retry if auto-reconnect is enabled")
except AttributeError as e:
raise NoSocketAvailableError("Connection already closed") from e
except AttributeError as exc:
raise NoSocketAvailableError("Connection already closed") from exc
except NoSocketAvailableError:
raise
except TimeoutError:
raise
except Exception as e:
self.log.exception(f"[{self.serial}] Send/Receive error. [{format_exception(e)}]")
except Exception as exc:
self.log.exception("[%s] Send/Receive error: %s", self.serial, exc)
raise
finally:
self.data_wanted_ev.clear()

self.log.debug(f"[{self.serial}] RECD: {v5_response.hex(" ")}")
self.log.debug("[%s] RECD: %s", self.serial, v5_response.hex(" "))
return v5_response

class SolarmanApi(PySolarmanV5AsyncApi):
def __init__(self, address, serial, port, mb_slave_id):
super().__init__(address, serial, port, mb_slave_id)
self.status = -1
self.status_lastUpdate = "N/A"

async def _async_read_registers(self, code, params, start, end) -> None:
length = end - start + 1

match code:
case 3:
response = await self.read_holding_registers(register_addr = start, quantity = length)
case 4:
response = await self.read_input_registers(register_addr = start, quantity = length)

params.parse(response, start, length)

def is_connected(self):
return self.status > -1

def set_connection_status(self, status):
self.status = status
if self.is_connected():
self.status_lastUpdate = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")

def get_connection_status(self):
if self.is_connected():
return "Connected"
return "Disconnected"

async def async_connect(self) -> None:
if self.reader_task:
_LOGGER.debug(f"[{self.serial}] Reader Task done: {self.reader_task.done()}, cancelled: {self.reader_task.cancelled()}.")
if not self.reader_task: #if not self.reader_task or self.reader_task.done() or self.reader_task.cancelled():
_LOGGER.debug(f"Reader Task done: {self.reader_task.done()}, cancelled: {self.reader_task.cancelled()}.")
#if not self.reader_task or self.reader_task.done() or self.reader_task.cancelled():
if not self.reader_task:
_LOGGER.info(f"Connecting to {self.address}:{self.port}")
await self.connect()
elif not self.is_connected():
Expand All @@ -113,14 +99,8 @@ async def async_disconnect(self, loud = True) -> None:
if loud:
_LOGGER.info(f"Disconnecting from {self.address}:{self.port}")

self.status = 0 if self.status == 1 else -1

if self.reader_task:
self.reader_task.cancel()
try:
await self.reader_task
except asyncio.CancelledError:
_LOGGER.debug(f"Reader Task is cancelled.")

if self.writer:
try:
Expand All @@ -132,6 +112,137 @@ async def async_disconnect(self, loud = True) -> None:
self.writer.close()
await self.writer.wait_closed()

async def async_reconnect(self) -> None:
await self.async_disconnect(False)
loop = asyncio.get_running_loop()
loop.create_task(self.reconnect())

async def _read_registers(self, code, params, start, end) -> None:
length = end - start + 1

match code:
case 3:
response = await self.read_holding_registers(register_addr = start, quantity = length)
case 4:
response = await self.read_input_registers(register_addr = start, quantity = length)

params.parse(response, start, length)

async def async_read(self, code, params, start, end) -> None:
await self.async_connect()
await self._async_read_registers(code, params, start, end)
await self._read_registers(code, params, start, end)

class Inverter(InverterApi):
def __init__(self, address, mac, serial, port, mb_slave_id, lookup_path, lookup_file):
super().__init__(address, serial, port, mb_slave_id)
self.mac = mac
self.lookup_path = lookup_path
self.lookup_file = lookup_file if lookup_file and not lookup_file == "parameters.yaml" else "deye_hybrid.yaml"

async def async_load(self):
loop = asyncio.get_running_loop()
self.parameter_definition = await loop.run_in_executor(None, lambda: yaml.safe_load(read_file(self.path + self.lookup_file)))

async def get_sensors(self):
async with aiofiles.open(self.lookup_path + self.lookup_file) as f:
self.parameter_definition = await f.read()
if self.parameter_definition:
params = ParameterParser(self.parameter_definition)
return params.get_sensors()
return []

def get_connection_status(self):
if self.is_connected():
return "Connected"
return "Disconnected"

def get_result(self, middleware = None):
if middleware:
_LOGGER.debug(f"Querying succeeded, exposing updated values. [Previous Status: {self.get_connection_status()}]")
self.status_lastUpdate = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
self.status = 1

result = middleware.get_result() if middleware else {}
result["Connection Status"] = self.get_connection_status()
return result

async def async_get_failed(self, message):
_LOGGER.debug(f"Request failed. [Previous Status: {self.get_connection_status()}]")
self.status = 0 if self.status == 1 else -1

await self.async_disconnect()

if self.status == -1:
raise UpdateFailed(message)

async def async_get(self, runtime = 0):
params = ParameterParser(self.parameter_definition)
requests = params.get_requests(runtime)
requests_count = len(requests)
result = 0

_LOGGER.debug(f"Scheduling {requests_count} query requests. #{runtime}")

try:
for request in requests:
code = request['mb_functioncode']
start = request['start']
end = request['end']

_LOGGER.debug(f"Querying ({start} - {end}) ...")

attempts_left = COORDINATOR_QUERY_RETRY_ATTEMPTS
while attempts_left > 0:
attempts_left -= 1

try:
await self.async_read(code, params, start, end)
result = 1
except (V5FrameError, TimeoutError, Exception) as e:
result = 0

if not isinstance(e, TimeoutError) or not attempts_left > 0 or _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.warning(f"Querying ({start} - {end}) failed. #{runtime} [{format_exception(e)}]")

await asyncio.sleep(COORDINATOR_ERROR_SLEEP)

_LOGGER.debug(f"Querying {'succeeded.' if result == 1 else f'attempts left: {attempts_left}{'' if attempts_left > 0 else ', aborting.'}'}")

if result == 1:
break

if result == 0:
break

if result == 1:
return self.get_result(params)
else:
await self.async_get_failed(f"Querying {self.serial} at {self.address}:{self.port} failed.")

except UpdateFailed:
raise
except Exception as e:
await self.async_get_failed(f"Querying {self.serial} at {self.address}:{self.port} failed during connection start. [{format_exception(e)}]")

return self.get_result()

# Service calls
async def service_write_holding_register(self, register, value):
_LOGGER.debug(f'Service Call: write_holding_register : [{register}], value : [{value}]')
try:
await self.async_connect()
await self.write_holding_register(register, value)
except Exception as e:
_LOGGER.warning(f"Service Call: write_holding_register : [{register}], value : [{value}] failed. [{format_exception(e)}]")
await self.async_disconnect()
return

async def service_write_multiple_holding_registers(self, register, values):
_LOGGER.debug(f'Service Call: write_multiple_holding_registers: [{register}], values : [{values}]')
try:
await self.async_connect()
await self.write_multiple_holding_registers(register, values)
except Exception as e:
_LOGGER.warning(f"Service Call: write_multiple_holding_registers: [{register}], values : [{values}] failed. [{format_exception(e)}]")
await self.async_disconnect()
return
22 changes: 12 additions & 10 deletions custom_components/solarman/config_flow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import logging
import voluptuous as vol

Expand All @@ -22,15 +23,17 @@

async def async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
_LOGGER.debug(f"async_update_listener: entry: {entry.as_dict()}")
hass.data[DOMAIN][entry.entry_id].config(entry)
entry.title = entry.options[CONF_NAME]
#hass.data[DOMAIN][entry.entry_id].config(entry)
#entry.title = entry.options[CONF_NAME]
await hass.config_entries.async_reload(entry.entry_id)

async def step_user_data_process(discovery):
_LOGGER.debug(f"step_user_data_process: discovery: {discovery}")
return { CONF_NAME: DEFAULT_NAME, CONF_INVERTER_DISCOVERY: DEFAULT_DISCOVERY, CONF_INVERTER_HOST: await discovery.get_ip(), CONF_INVERTER_SERIAL: await discovery.get_serial(), CONF_INVERTER_PORT: DEFAULT_PORT_INVERTER, CONF_INVERTER_MB_SLAVE_ID: DEFAULT_INVERTER_MB_SLAVE_ID, CONF_LOOKUP_FILE: DEFAULT_LOOKUP_FILE, CONF_BATTERY_NOMINAL_VOLTAGE: DEFAULT_BATTERY_NOMINAL_VOLTAGE, CONF_BATTERY_LIFE_CYCLE_RATING: DEFAULT_BATTERY_LIFE_CYCLE_RATING, CONF_DISABLE_TEMPLATING: DEFAULT_DISABLE_TEMPLATING }

def step_user_data_schema(data: dict[str, Any] = { CONF_NAME: DEFAULT_NAME, CONF_INVERTER_DISCOVERY: DEFAULT_DISCOVERY, CONF_INVERTER_PORT: DEFAULT_PORT_INVERTER, CONF_INVERTER_MB_SLAVE_ID: DEFAULT_INVERTER_MB_SLAVE_ID, CONF_LOOKUP_FILE: DEFAULT_LOOKUP_FILE, CONF_BATTERY_NOMINAL_VOLTAGE: DEFAULT_BATTERY_NOMINAL_VOLTAGE, CONF_BATTERY_LIFE_CYCLE_RATING: DEFAULT_BATTERY_LIFE_CYCLE_RATING, CONF_DISABLE_TEMPLATING: DEFAULT_DISABLE_TEMPLATING }) -> Schema:
_LOGGER.debug(f"step_user_data_schema: data: {data}")
def step_user_data_schema(hass: HomeAssistant, data: dict[str, Any] = { CONF_NAME: DEFAULT_NAME, CONF_INVERTER_DISCOVERY: DEFAULT_DISCOVERY, CONF_INVERTER_PORT: DEFAULT_PORT_INVERTER, CONF_INVERTER_MB_SLAVE_ID: DEFAULT_INVERTER_MB_SLAVE_ID, CONF_LOOKUP_FILE: DEFAULT_LOOKUP_FILE, CONF_BATTERY_NOMINAL_VOLTAGE: DEFAULT_BATTERY_NOMINAL_VOLTAGE, CONF_BATTERY_LIFE_CYCLE_RATING: DEFAULT_BATTERY_LIFE_CYCLE_RATING, CONF_DISABLE_TEMPLATING: DEFAULT_DISABLE_TEMPLATING }) -> Schema:
lookup_files = [f for f in os.listdir(hass.config.path(LOOKUP_DIRECTORY_PATH)) if os.path.isfile(LOOKUP_DIRECTORY_PATH + f)]
_LOGGER.debug(f"step_user_data_schema: data: {data}, {LOOKUP_DIRECTORY_PATH}: {lookup_files}")
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME, default = data.get(CONF_NAME)): str,
Expand All @@ -39,15 +42,14 @@ def step_user_data_schema(data: dict[str, Any] = { CONF_NAME: DEFAULT_NAME, CONF
vol.Required(CONF_INVERTER_SERIAL, default = data.get(CONF_INVERTER_SERIAL)): int,
vol.Optional(CONF_INVERTER_PORT, default = data.get(CONF_INVERTER_PORT)): int,
vol.Optional(CONF_INVERTER_MB_SLAVE_ID, default = data.get(CONF_INVERTER_MB_SLAVE_ID)): int,
vol.Optional(CONF_LOOKUP_FILE, default = data.get(CONF_LOOKUP_FILE)): vol.In(LOOKUP_FILES),
vol.Optional(CONF_LOOKUP_FILE, default = data.get(CONF_LOOKUP_FILE)): vol.In(lookup_files),
vol.Optional(CONF_BATTERY_NOMINAL_VOLTAGE, default = data.get(CONF_BATTERY_NOMINAL_VOLTAGE)): int,
vol.Optional(CONF_BATTERY_LIFE_CYCLE_RATING, default = data.get(CONF_BATTERY_LIFE_CYCLE_RATING)): int,
vol.Optional(CONF_DISABLE_TEMPLATING, default = data.get(CONF_DISABLE_TEMPLATING)): bool,
},
extra = vol.PREVENT_EXTRA
)
_LOGGER.debug(f"step_user_data_schema: STEP_USER_DATA_SCHEMA: {STEP_USER_DATA_SCHEMA}")

return STEP_USER_DATA_SCHEMA

async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, Any]:
Expand Down Expand Up @@ -88,7 +90,7 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Con
_LOGGER.debug(f"ConfigFlowHandler.async_step_user: {user_input}")
if user_input is None:
discovery_options = await step_user_data_process(InverterDiscovery(self.hass))
return self.async_show_form(step_id = "user", data_schema = step_user_data_schema(discovery_options))
return self.async_show_form(step_id = "user", data_schema = step_user_data_schema(self.hass, discovery_options))

errors = {}

Expand All @@ -110,7 +112,7 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Con

_LOGGER.debug(f"ConfigFlowHandler.async_step_user: validation failed: {user_input}")

return self.async_show_form(step_id = "user", data_schema = step_user_data_schema(user_input), errors = errors)
return self.async_show_form(step_id = "user", data_schema = step_user_data_schema(self.hass, user_input), errors = errors)

@staticmethod
@callback
Expand All @@ -131,7 +133,7 @@ async def async_step_init(self, user_input: dict[str, Any] | None = None) -> Con
"""Handle options flow."""
_LOGGER.debug(f"OptionsFlowHandler.async_step_init: {user_input}")
if user_input is None:
return self.async_show_form(step_id = "init", data_schema = step_user_data_schema(self.entry.options))
return self.async_show_form(step_id = "init", data_schema = step_user_data_schema(self.hass, self.entry.options))

errors = {}

Expand All @@ -147,7 +149,7 @@ async def async_step_init(self, user_input: dict[str, Any] | None = None) -> Con
else:
return self.async_create_entry(title = info["title"], data = user_input)

return self.async_show_form(step_id = "init", data_schema = step_user_data_schema(user_input), errors = errors)
return self.async_show_form(step_id = "init", data_schema = step_user_data_schema(self.hass, user_input), errors = errors)

class InvalidHost(HomeAssistantError):
"""Error to indicate there is invalid hostname or IP address."""
Expand Down
Loading

0 comments on commit 2a6c52b

Please sign in to comment.