Skip to content

Commit

Permalink
Fix/Feat: Register reading/parsing and timings...
Browse files Browse the repository at this point in the history
Due to security reasons:
- Temporarily removed some custom sensor *templating (Battery State, Battery SOH..)
- Reworked some simple custom sensors like additions and subtracting to work directly with registers (PV Power, Power losses, Total Losses and Today Losses..)
Extended signed integer reading:
- Added Solarman Smart Meter profile
Updated some inverter profiles according to the changes, etc.
Asynchronous file open using aiofiles
Asynchronous directory listing using asyncio
Sorting inverter profiles
Discovery attempts from global value
  • Loading branch information
davidrapan committed Jul 14, 2024
1 parent 1a4a063 commit 4cd1343
Show file tree
Hide file tree
Showing 17 changed files with 2,511 additions and 1,760 deletions.
91 changes: 37 additions & 54 deletions custom_components/solarman/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import socket
import time
import yaml
import struct
import socket
import logging
import asyncio
import aiofiles
Expand All @@ -18,16 +18,11 @@

_LOGGER = logging.getLogger(__name__)

def read_file(filepath):
with open(filepath) as file:
return file.read()

class InverterApi(PySolarmanV5Async):
def __init__(self, address, serial, port, mb_slave_id):
super().__init__(address, serial, port = port, mb_slave_id = mb_slave_id, logger = _LOGGER, auto_reconnect = True, socket_timeout = COORDINATOR_TIMEOUT)
self._last_frame: bytes = b""
self.status = -1
super().__init__(address, serial, port = port, mb_slave_id = mb_slave_id, logger = _LOGGER, auto_reconnect = True, socket_timeout = TIMINGS_SOCKET_TIMEOUT)
self.status_lastUpdate = "N/A"
self.status = -1

def is_connecting(self):
return self.status == 0
Expand All @@ -53,7 +48,7 @@ async def reconnect(self) -> None:
self.writer.write(self._last_frame)
await self.writer.drain()
except Exception as e:
self.log.exception(format_exception(e))
self.log.exception(f"Cannot open connection to {self.address}. [{format_exception(e)}]")

async def _send_receive_v5_frame(self, data_logging_stick_frame):
"""
Expand All @@ -70,14 +65,14 @@ async def _send_receive_v5_frame(self, data_logging_stick_frame):
v5_response = await asyncio.wait_for(self.data_queue.get(), self.socket_timeout)
if v5_response == b"":
raise NoSocketAvailableError("Connection closed on read. Retry if auto-reconnect is enabled")
except AttributeError as exc:
raise NoSocketAvailableError("Connection already closed") from exc
except AttributeError as e:
raise NoSocketAvailableError("Connection already closed") from e
except NoSocketAvailableError:
raise
except TimeoutError:
raise
except Exception as exc:
self.log.exception("[%s] Send/Receive error: %s", self.serial, exc)
except Exception as e:
self.log.exception("[%s] Send/Receive error: %s", self.serial, e)
raise
finally:
self.data_wanted_ev.clear()
Expand All @@ -88,11 +83,10 @@ async def _send_receive_v5_frame(self, data_logging_stick_frame):
async def async_connect(self) -> None:
if self.reader_task:
_LOGGER.debug(f"Reader Task done: {self.reader_task.done()}, cancelled: {self.reader_task.cancelled()}.")
#if not self.reader_task or self.reader_task.done() or self.reader_task.cancelled():
if not self.reader_task:
if not self.reader_task: #if not self.reader_task or self.reader_task.done() or self.reader_task.cancelled():
_LOGGER.info(f"Connecting to {self.address}:{self.port}")
await self.connect()
elif not self.is_connected():
elif not self.status > 0:
await self.reconnect()

async def async_disconnect(self, loud = True) -> None:
Expand All @@ -112,14 +106,11 @@ async def async_disconnect(self, loud = True) -> None:
self.writer.close()
await self.writer.wait_closed()

async def async_reconnect(self) -> None:
await self.async_disconnect(False)
loop = asyncio.get_running_loop()
loop.create_task(self.reconnect())

async def _read_registers(self, code, params, start, end) -> None:
async def async_read(self, params, code, start, end) -> None:
length = end - start + 1

await self.async_connect()

match code:
case 3:
response = await self.read_holding_registers(register_addr = start, quantity = length)
Expand All @@ -128,24 +119,17 @@ async def _read_registers(self, code, params, start, end) -> None:

params.parse(response, start, length)

async def async_read(self, code, params, start, end) -> None:
await self.async_connect()
await self._read_registers(code, params, start, end)

class Inverter(InverterApi):
def __init__(self, address, mac, serial, port, mb_slave_id, lookup_path, lookup_file):
def __init__(self, address, serial, port, mb_slave_id, name, mac, lookup_path, lookup_file):
super().__init__(address, serial, port, mb_slave_id)
self.name = name
self.mac = mac
self.lookup_path = lookup_path
self.lookup_file = lookup_file if lookup_file and not lookup_file == "parameters.yaml" else "deye_hybrid.yaml"

async def async_load(self):
loop = asyncio.get_running_loop()
self.parameter_definition = await loop.run_in_executor(None, lambda: yaml.safe_load(read_file(self.path + self.lookup_file)))

async def get_sensors(self):
async with aiofiles.open(self.lookup_path + self.lookup_file) as f:
self.parameter_definition = await f.read()
self.parameter_definition = yaml.safe_load(await f.read())
if self.parameter_definition:
params = ParameterParser(self.parameter_definition)
return params.get_sensors()
Expand All @@ -162,9 +146,7 @@ def get_result(self, middleware = None):
self.status_lastUpdate = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
self.status = 1

result = middleware.get_result() if middleware else {}
result["Connection Status"] = self.get_connection_status()
return result
return middleware.get_result() if middleware else {}

async def async_get_failed(self, message):
_LOGGER.debug(f"Request failed. [Previous Status: {self.get_connection_status()}]")
Expand All @@ -181,30 +163,30 @@ async def async_get(self, runtime = 0):
requests_count = len(requests)
result = 0

_LOGGER.debug(f"Scheduling {requests_count} query requests. #{runtime}")
_LOGGER.debug(f"Scheduling {requests_count} query requests. #{runtime}")

try:
for request in requests:
code = request['mb_functioncode']
start = request['start']
end = request['end']
code = request["mb_functioncode"]
start = request["start"]
end = request["end"]

_LOGGER.debug(f"Querying ({start} - {end}) ...")

attempts_left = COORDINATOR_QUERY_RETRY_ATTEMPTS
attempts_left = ACTION_RETRY_ATTEMPTS
while attempts_left > 0:
attempts_left -= 1

try:
await self.async_read(code, params, start, end)
await self.async_read(params, code, start, end)
result = 1
except (V5FrameError, TimeoutError, Exception) as e:
result = 0

if not isinstance(e, TimeoutError) or not attempts_left > 0 or _LOGGER.isEnabledFor(logging.DEBUG):
if not isinstance(e, TimeoutError) or not attempts_left >= 1 or _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.warning(f"Querying ({start} - {end}) failed. #{runtime} [{format_exception(e)}]")

await asyncio.sleep(COORDINATOR_ERROR_SLEEP)
await asyncio.sleep(TIMINGS_QUERY_EXCEPT_SLEEP)

_LOGGER.debug(f"Querying {'succeeded.' if result == 1 else f'attempts left: {attempts_left}{'' if attempts_left > 0 else ', aborting.'}'}")

Expand All @@ -222,27 +204,28 @@ async def async_get(self, runtime = 0):
except UpdateFailed:
raise
except Exception as e:
await self.async_get_failed(f"Querying {self.serial} at {self.address}:{self.port} failed during connection start. [{format_exception(e)}]")
await self.async_get_failed(f"Querying {self.serial} at {self.address}:{self.port} failed. [{format_exception(e)}]")

return self.get_result()

# Service calls
async def service_write_holding_register(self, register, value):
_LOGGER.debug(f'Service Call: write_holding_register : [{register}], value : [{value}]')
async def service_write_holding_register(self, register, value) -> bool:
_LOGGER.debug(f"service_write_holding_register: {register}, value: {value}")
try:
await self.async_connect()
await self.write_holding_register(register, value)
except Exception as e:
_LOGGER.warning(f"Service Call: write_holding_register : [{register}], value : [{value}] failed. [{format_exception(e)}]")
_LOGGER.warning(f"service_write_holding_register: {register}, value: {value} failed. [{format_exception(e)}]")
await self.async_disconnect()
return
return False
return True

async def service_write_multiple_holding_registers(self, register, values):
_LOGGER.debug(f'Service Call: write_multiple_holding_registers: [{register}], values : [{values}]')
async def service_write_multiple_holding_registers(self, registers, values) -> bool:
_LOGGER.debug(f"service_write_multiple_holding_registers: {registers}, values: {values}")
try:
await self.async_connect()
await self.write_multiple_holding_registers(register, values)
await self.write_multiple_holding_registers(registers, values)
except Exception as e:
_LOGGER.warning(f"Service Call: write_multiple_holding_registers: [{register}], values : [{values}] failed. [{format_exception(e)}]")
_LOGGER.warning(f"service_write_multiple_holding_registers: {registers}, values: {values} failed. [{format_exception(e)}]")
await self.async_disconnect()
return
return False
return True
6 changes: 6 additions & 0 deletions custom_components/solarman/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import asyncio

async def async_execute(x):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, x)

def group_when(iterable, predicate):
i, x, size = 0, 0, len(iterable)
while i < size - 1:
Expand Down
13 changes: 7 additions & 6 deletions custom_components/solarman/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from homeassistant.exceptions import HomeAssistantError

from .const import *
from .common import *
from .discovery import InverterDiscovery

_LOGGER = logging.getLogger(__name__)
Expand All @@ -31,8 +32,8 @@ async def step_user_data_process(discovery):
_LOGGER.debug(f"step_user_data_process: discovery: {discovery}")
return { CONF_NAME: DEFAULT_NAME, CONF_INVERTER_DISCOVERY: DEFAULT_DISCOVERY, CONF_INVERTER_HOST: await discovery.get_ip(), CONF_INVERTER_SERIAL: await discovery.get_serial(), CONF_INVERTER_PORT: DEFAULT_PORT_INVERTER, CONF_INVERTER_MB_SLAVE_ID: DEFAULT_INVERTER_MB_SLAVE_ID, CONF_LOOKUP_FILE: DEFAULT_LOOKUP_FILE, CONF_BATTERY_NOMINAL_VOLTAGE: DEFAULT_BATTERY_NOMINAL_VOLTAGE, CONF_BATTERY_LIFE_CYCLE_RATING: DEFAULT_BATTERY_LIFE_CYCLE_RATING, CONF_DISABLE_TEMPLATING: DEFAULT_DISABLE_TEMPLATING }

def step_user_data_schema(hass: HomeAssistant, data: dict[str, Any] = { CONF_NAME: DEFAULT_NAME, CONF_INVERTER_DISCOVERY: DEFAULT_DISCOVERY, CONF_INVERTER_PORT: DEFAULT_PORT_INVERTER, CONF_INVERTER_MB_SLAVE_ID: DEFAULT_INVERTER_MB_SLAVE_ID, CONF_LOOKUP_FILE: DEFAULT_LOOKUP_FILE, CONF_BATTERY_NOMINAL_VOLTAGE: DEFAULT_BATTERY_NOMINAL_VOLTAGE, CONF_BATTERY_LIFE_CYCLE_RATING: DEFAULT_BATTERY_LIFE_CYCLE_RATING, CONF_DISABLE_TEMPLATING: DEFAULT_DISABLE_TEMPLATING }) -> Schema:
lookup_files = [f for f in os.listdir(hass.config.path(LOOKUP_DIRECTORY_PATH)) if os.path.isfile(LOOKUP_DIRECTORY_PATH + f)]
async def step_user_data_schema(hass: HomeAssistant, data: dict[str, Any] = { CONF_NAME: DEFAULT_NAME, CONF_INVERTER_DISCOVERY: DEFAULT_DISCOVERY, CONF_INVERTER_PORT: DEFAULT_PORT_INVERTER, CONF_INVERTER_MB_SLAVE_ID: DEFAULT_INVERTER_MB_SLAVE_ID, CONF_LOOKUP_FILE: DEFAULT_LOOKUP_FILE, CONF_BATTERY_NOMINAL_VOLTAGE: DEFAULT_BATTERY_NOMINAL_VOLTAGE, CONF_BATTERY_LIFE_CYCLE_RATING: DEFAULT_BATTERY_LIFE_CYCLE_RATING, CONF_DISABLE_TEMPLATING: DEFAULT_DISABLE_TEMPLATING }) -> Schema:
lookup_files = sorted([f for f in await async_execute(lambda: os.listdir(hass.config.path(LOOKUP_DIRECTORY_PATH))) if os.path.isfile(LOOKUP_DIRECTORY_PATH + f)])
_LOGGER.debug(f"step_user_data_schema: data: {data}, {LOOKUP_DIRECTORY_PATH}: {lookup_files}")
STEP_USER_DATA_SCHEMA = vol.Schema(
{
Expand Down Expand Up @@ -90,7 +91,7 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Con
_LOGGER.debug(f"ConfigFlowHandler.async_step_user: {user_input}")
if user_input is None:
discovery_options = await step_user_data_process(InverterDiscovery(self.hass))
return self.async_show_form(step_id = "user", data_schema = step_user_data_schema(self.hass, discovery_options))
return self.async_show_form(step_id = "user", data_schema = await step_user_data_schema(self.hass, discovery_options))

errors = {}

Expand All @@ -112,7 +113,7 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Con

_LOGGER.debug(f"ConfigFlowHandler.async_step_user: validation failed: {user_input}")

return self.async_show_form(step_id = "user", data_schema = step_user_data_schema(self.hass, user_input), errors = errors)
return self.async_show_form(step_id = "user", data_schema = await step_user_data_schema(self.hass, user_input), errors = errors)

@staticmethod
@callback
Expand All @@ -133,7 +134,7 @@ async def async_step_init(self, user_input: dict[str, Any] | None = None) -> Con
"""Handle options flow."""
_LOGGER.debug(f"OptionsFlowHandler.async_step_init: {user_input}")
if user_input is None:
return self.async_show_form(step_id = "init", data_schema = step_user_data_schema(self.hass, self.entry.options))
return self.async_show_form(step_id = "init", data_schema = await step_user_data_schema(self.hass, self.entry.options))

errors = {}

Expand All @@ -149,7 +150,7 @@ async def async_step_init(self, user_input: dict[str, Any] | None = None) -> Con
else:
return self.async_create_entry(title = info["title"], data = user_input)

return self.async_show_form(step_id = "init", data_schema = step_user_data_schema(self.hass, user_input), errors = errors)
return self.async_show_form(step_id = "init", data_schema = await step_user_data_schema(self.hass, user_input), errors = errors)

class InvalidHost(HomeAssistantError):
"""Error to indicate there is invalid hostname or IP address."""
Expand Down
27 changes: 13 additions & 14 deletions custom_components/solarman/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
PLATFORMS: list[str] = ["sensor"]
SENSOR_PREFIX = "Solarman"

DISCOVERY_MESSAGE = "WIFIKIT-214028-READ"
DISCOVERY_PORT = 48899
DISCOVERY_MESSAGE = "WIFIKIT-214028-READ"
DISCOVERY_RECV_MESSAGE_SIZE = 1024

COMPONENTS_DIRECTORY = "custom_components"

LOOKUP_DIRECTORY = "inverter_definitions"
LOOKUP_DIRECTORY_PATH = f"custom_components/{DOMAIN}/{LOOKUP_DIRECTORY}/"
LOOKUP_DIRECTORY_PATH = f"{COMPONENTS_DIRECTORY}/{DOMAIN}/{LOOKUP_DIRECTORY}/"

CONF_INVERTER_DISCOVERY = "inverter_discovery"
CONF_INVERTER_HOST = "inverter_host"
Expand All @@ -25,21 +27,18 @@
DEFAULT_DISCOVERY = True
DEFAULT_PORT_INVERTER = 8899
DEFAULT_INVERTER_MB_SLAVE_ID = 1
DEFAULT_LOOKUP_FILE = "deye_sg04lp3.yaml"
DEFAULT_LOOKUP_FILE = "deye_hybrid.yaml"
DEFAULT_BATTERY_NOMINAL_VOLTAGE = 48
DEFAULT_BATTERY_LIFE_CYCLE_RATING = 6000
DEFAULT_DISABLE_TEMPLATING = False

COORDINATOR_INTERVAL = 5
COORDINATOR_INTERVAL_DEFAULT = 60
COORDINATOR_UPDATE_INTERVAL = td(seconds = COORDINATOR_INTERVAL)
COORDINATOR_TIMEOUT = 15
COORDINATOR_TIMEOUT2 = 30
COORDINATOR_ERROR_SLEEP = 4
ACTION_RETRY_ATTEMPTS = 5

COORDINATOR_SOCKET_TIMEOUT = 30 / 2
COORDINATOR_QUERY_INTERVAL_DEFAULT = 60
COORDINATOR_QUERY_RETRY_ATTEMPTS = 4
COORDINATOR_QUERY_ERROR_SLEEP = 4
TIMINGS_INTERVAL = 5
TIMINGS_COORDINATOR = td(seconds = TIMINGS_INTERVAL)
TIMINGS_COORDINATOR_TIMEOUT = TIMINGS_INTERVAL * 6
TIMINGS_SOCKET_TIMEOUT = TIMINGS_INTERVAL * 3 - 1
TIMINGS_QUERY_INTERVAL_DEFAULT = 60
TIMINGS_QUERY_EXCEPT_SLEEP = 4

FLOAT_ROUND_TO = 6
DIGITS_DEFAULT = 6
4 changes: 2 additions & 2 deletions custom_components/solarman/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class InverterCoordinator(DataUpdateCoordinator[dict[str, Any]]):
def __init__(self, hass: HomeAssistant, inverter):
super().__init__(hass, _LOGGER, name = SENSOR_PREFIX, update_interval = COORDINATOR_UPDATE_INTERVAL, always_update = False)
super().__init__(hass, _LOGGER, name = SENSOR_PREFIX, update_interval = TIMINGS_COORDINATOR, always_update = False)
self.inverter = inverter
self.counter = -1

Expand All @@ -29,7 +29,7 @@ def _accounting(self):
return int(self.counter * self._update_interval_seconds)

async def _async_update_data(self) -> dict[str, Any]:
async with asyncio.timeout(COORDINATOR_TIMEOUT2):
async with asyncio.timeout(TIMINGS_COORDINATOR_TIMEOUT):
return await self.inverter.async_get(self._accounting())

#async def _reload(self):
Expand Down
9 changes: 6 additions & 3 deletions custom_components/solarman/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
_LOGGER = logging.getLogger(__name__)

class InverterDiscovery:
_port = DISCOVERY_PORT
_message = DISCOVERY_MESSAGE.encode()

def __init__(self, hass: HomeAssistant, address = None):
Expand All @@ -24,7 +25,7 @@ def __init__(self, hass: HomeAssistant, address = None):
self._mac = None
self._serial = None

async def _discover(self, address = "<broadcast>"):
async def _discover(self, address = "<broadcast>", source = "0.0.0.0"):
loop = asyncio.get_running_loop()

try:
Expand All @@ -33,8 +34,9 @@ async def _discover(self, address = "<broadcast>"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setblocking(False)
sock.settimeout(1.0)
#sock.bind((source, 0))

await loop.sock_sendto(sock, self._message, (address, DISCOVERY_PORT))
await loop.sock_sendto(sock, self._message, (address, self._port))

while True:
try:
Expand Down Expand Up @@ -62,6 +64,7 @@ async def _discover_all(self):
_LOGGER.debug(f"_discover_all: Broadcasting on {net.with_prefixlen}")

await self._discover(str(IPv4Network(net, False).broadcast_address))
#await self._discover("<broadcast>", ipv4["address"])

if self._ip is not None:
return None
Expand All @@ -70,7 +73,7 @@ async def discover(self):
if self._address:
await self._discover(self._address)

attempts_left = COORDINATOR_QUERY_RETRY_ATTEMPTS
attempts_left = ACTION_RETRY_ATTEMPTS
while self._ip is None and attempts_left > 0:
attempts_left -= 1

Expand Down
Loading

0 comments on commit 4cd1343

Please sign in to comment.