Skip to content

Commit

Permalink
Merge branch 'main' into patch-3
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrapan authored Jul 19, 2024
2 parents bae1763 + 1f6f612 commit f220060
Show file tree
Hide file tree
Showing 40 changed files with 8,378 additions and 6,316 deletions.
83 changes: 75 additions & 8 deletions custom_components/solarman/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,89 @@

import logging

from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import ConfigEntry

from .const import *
from .common import *
from .api import Inverter
from .discovery import InverterDiscovery
from .coordinator import InverterCoordinator
from .config_flow import async_update_listener
from .services import *

_LOGGER = logging.getLogger(__name__)

async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_LOGGER.debug(f"async_setup_entry({entry.as_dict()})")
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(async_update_listener))
async def async_setup_entry(hass: HomeAssistant, config: ConfigEntry) -> bool:
_LOGGER.debug(f"async_setup_entry({config.as_dict()})")

options = config.options

inverter_name = options.get(CONF_NAME)
inverter_host = options.get(CONF_INVERTER_HOST)
inverter_serial = options.get(CONF_INVERTER_SERIAL)
inverter_port = options.get(CONF_INVERTER_PORT)
inverter_mb_slave_id = options.get(CONF_INVERTER_MB_SLAVE_ID)
lookup_path = hass.config.path(LOOKUP_DIRECTORY_PATH)
lookup_file = options.get(CONF_LOOKUP_FILE)

inverter_discovery = InverterDiscovery(hass, inverter_host)

if inverter_discovery:
if inverter_host_scanned := await inverter_discovery.get_ip():
inverter_host = inverter_host_scanned

if inverter_serial == 0:
if inverter_serial_scanned := await inverter_discovery.get_serial():
inverter_serial = inverter_serial_scanned

inverter_mac = await inverter_discovery.get_mac()

if inverter_host is None:
raise vol.Invalid("Configuration parameter [inverter_host] does not have a value")
if inverter_serial is None:
raise vol.Invalid("Configuration parameter [inverter_serial] does not have a value")
if inverter_port is None:
raise vol.Invalid("Configuration parameter [inverter_port] does not have a value")
if not inverter_mb_slave_id:
inverter_mb_slave_id = DEFAULT_INVERTER_MB_SLAVE_ID

inverter = Inverter(inverter_host, inverter_serial, inverter_port, inverter_mb_slave_id, inverter_name, inverter_mac, lookup_path, lookup_file)

await inverter.load()

coordinator = InverterCoordinator(hass, inverter)

hass.data.setdefault(DOMAIN, {})[config.entry_id] = coordinator

# Fetch initial data so we have data when entities subscribe.
#
# If the refresh fails, async_config_entry_first_refresh will
# raise ConfigEntryNotReady and setup will try again later.
#
# If you do not want to retry setup on failure, use
# coordinator.async_refresh() instead.
#
_LOGGER.debug(f"async_setup: coordinator.async_config_entry_first_refresh")

await coordinator.async_config_entry_first_refresh()

# Register the services with home assistant.
#
_LOGGER.debug(f"async_setup: register_services")

register_services(hass, inverter)

# Forward setup
#
await hass.config_entries.async_forward_entry_setups(config, PLATFORMS)
config.async_on_unload(config.add_update_listener(async_update_listener))
return True

async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_LOGGER.debug(f"async_unload_entry({entry.as_dict()})")
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
_ = hass.data[DOMAIN].pop(entry.entry_id)
async def async_unload_entry(hass: HomeAssistant, config: ConfigEntry) -> bool:
_LOGGER.debug(f"async_unload_entry({config.as_dict()})")
if unload_ok := await hass.config_entries.async_unload_platforms(config, PLATFORMS):
_ = hass.data[DOMAIN].pop(config.entry_id)
remove_services(hass)
return unload_ok
148 changes: 80 additions & 68 deletions custom_components/solarman/api.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import socket
import time
import yaml
import errno
import struct
import socket
import logging
import asyncio
import aiofiles
import threading
import concurrent.futures

Expand All @@ -18,16 +17,11 @@

_LOGGER = logging.getLogger(__name__)

def read_file(filepath):
with open(filepath) as file:
return file.read()

class InverterApi(PySolarmanV5Async):
def __init__(self, address, serial, port, mb_slave_id):
super().__init__(address, serial, port = port, mb_slave_id = mb_slave_id, logger = _LOGGER, auto_reconnect = True, socket_timeout = COORDINATOR_TIMEOUT)
self._last_frame: bytes = b""
self.status = -1
def __init__(self, address, serial, port, mb_slave_id, auto_reconnect):
super().__init__(address, serial, port = port, mb_slave_id = mb_slave_id, logger = _LOGGER, auto_reconnect = auto_reconnect, socket_timeout = TIMINGS_SOCKET_TIMEOUT)
self.status_lastUpdate = "N/A"
self.status = -1

def is_connecting(self):
return self.status == 0
Expand All @@ -53,7 +47,7 @@ async def reconnect(self) -> None:
self.writer.write(self._last_frame)
await self.writer.drain()
except Exception as e:
self.log.exception(format_exception(e))
self.log.exception(f"Cannot open connection to {self.address}. [{format_exception(e)}]")

async def _send_receive_v5_frame(self, data_logging_stick_frame):
"""
Expand All @@ -70,14 +64,18 @@ async def _send_receive_v5_frame(self, data_logging_stick_frame):
v5_response = await asyncio.wait_for(self.data_queue.get(), self.socket_timeout)
if v5_response == b"":
raise NoSocketAvailableError("Connection closed on read. Retry if auto-reconnect is enabled")
except AttributeError as exc:
raise NoSocketAvailableError("Connection already closed") from exc
except AttributeError as e:
raise NoSocketAvailableError("Connection already closed") from e
except NoSocketAvailableError:
raise
except TimeoutError:
raise
except Exception as exc:
self.log.exception("[%s] Send/Receive error: %s", self.serial, exc)
except OSError as e:
if e.errno == errno.EHOSTUNREACH:
raise TimeoutError from e
raise
except Exception as e:
self.log.exception("[%s] Send/Receive error: %s", self.serial, e)
raise
finally:
self.data_wanted_ev.clear()
Expand All @@ -88,11 +86,10 @@ async def _send_receive_v5_frame(self, data_logging_stick_frame):
async def async_connect(self) -> None:
if self.reader_task:
_LOGGER.debug(f"Reader Task done: {self.reader_task.done()}, cancelled: {self.reader_task.cancelled()}.")
#if not self.reader_task or self.reader_task.done() or self.reader_task.cancelled():
if not self.reader_task:
if not self.reader_task: #if not self.reader_task or self.reader_task.done() or self.reader_task.cancelled():
_LOGGER.info(f"Connecting to {self.address}:{self.port}")
await self.connect()
elif not self.is_connected():
elif not self.status > 0:
await self.reconnect()

async def async_disconnect(self, loud = True) -> None:
Expand All @@ -110,16 +107,16 @@ async def async_disconnect(self, loud = True) -> None:
_LOGGER.debug(f"{e} can be during closing ignored.")
finally:
self.writer.close()
await self.writer.wait_closed()
try:
await self.writer.wait_closed()
except OSError as e: # Happens when host is unreachable.
_LOGGER.debug(f"{e} can be during closing ignored.")

async def async_reconnect(self) -> None:
await self.async_disconnect(False)
loop = asyncio.get_running_loop()
loop.create_task(self.reconnect())

async def _read_registers(self, code, params, start, end) -> None:
async def async_read(self, params, code, start, end) -> None:
length = end - start + 1

await self.async_connect()

match code:
case 3:
response = await self.read_holding_registers(register_addr = start, quantity = length)
Expand All @@ -128,24 +125,21 @@ async def _read_registers(self, code, params, start, end) -> None:

params.parse(response, start, length)

async def async_read(self, code, params, start, end) -> None:
await self.async_connect()
await self._read_registers(code, params, start, end)

class Inverter(InverterApi):
def __init__(self, address, mac, serial, port, mb_slave_id, lookup_path, lookup_file):
super().__init__(address, serial, port, mb_slave_id)
def __init__(self, address, serial, port, mb_slave_id, name, mac, lookup_path, lookup_file):
super().__init__(address, serial, port, mb_slave_id, AUTO_RECONNECT)
self.name = name
self.mac = mac
self.lookup_path = lookup_path
self.lookup_file = lookup_file if lookup_file and not lookup_file == "parameters.yaml" else "deye_hybrid.yaml"
self.auto_reconnect = AUTO_RECONNECT

#execute_async(self.load())

async def async_load(self):
loop = asyncio.get_running_loop()
self.parameter_definition = await loop.run_in_executor(None, lambda: yaml.safe_load(read_file(self.path + self.lookup_file)))
async def load(self):
self.parameter_definition = await yaml_open(self.lookup_path + self.lookup_file)

async def get_sensors(self):
async with aiofiles.open(self.lookup_path + self.lookup_file) as f:
self.parameter_definition = await f.read()
def get_sensors(self):
if self.parameter_definition:
params = ParameterParser(self.parameter_definition)
return params.get_sensors()
Expand All @@ -162,9 +156,7 @@ def get_result(self, middleware = None):
self.status_lastUpdate = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
self.status = 1

result = middleware.get_result() if middleware else {}
result["Connection Status"] = self.get_connection_status()
return result
return middleware.get_result() if middleware else {}

async def async_get_failed(self, message):
_LOGGER.debug(f"Request failed. [Previous Status: {self.get_connection_status()}]")
Expand All @@ -185,26 +177,26 @@ async def async_get(self, runtime = 0):

try:
for request in requests:
code = request['mb_functioncode']
start = request['start']
end = request['end']
code = get_request_code(request)
start = get_request_start(request)
end = get_request_end(request)

_LOGGER.debug(f"Querying ({start} - {end}) ...")

attempts_left = COORDINATOR_QUERY_RETRY_ATTEMPTS
attempts_left = ACTION_RETRY_ATTEMPTS
while attempts_left > 0:
attempts_left -= 1

try:
await self.async_read(code, params, start, end)
await self.async_read(params, code, start, end)
result = 1
except (V5FrameError, TimeoutError, Exception) as e:
result = 0

if not isinstance(e, TimeoutError) or not attempts_left > 0 or _LOGGER.isEnabledFor(logging.DEBUG):
if ((not isinstance(e, TimeoutError) or not attempts_left >= 1) and not (not isinstance(e, TimeoutError) or (e.__cause__ and isinstance(e.__cause__, OSError) and e.__cause__.errno == errno.EHOSTUNREACH))) or _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.warning(f"Querying ({start} - {end}) failed. #{runtime} [{format_exception(e)}]")

await asyncio.sleep(COORDINATOR_ERROR_SLEEP)
await asyncio.sleep(TIMINGS_QUERY_EXCEPT_SLEEP)

_LOGGER.debug(f"Querying {'succeeded.' if result == 1 else f'attempts left: {attempts_left}{'' if attempts_left > 0 else ', aborting.'}'}")

Expand All @@ -222,11 +214,10 @@ async def async_get(self, runtime = 0):
except UpdateFailed:
raise
except Exception as e:
await self.async_get_failed(f"Querying {self.serial} at {self.address}:{self.port} failed during connection start. [{format_exception(e)}]")
await self.async_get_failed(f"Querying {self.serial} at {self.address}:{self.port} failed. [{format_exception(e)}]")

return self.get_result()

# Service calls
async def service_read_holding_register(self, register, value):
_LOGGER.debug(f'Service Call: read_holding_registers : [{register}], value : [{value}]')
try:
Expand All @@ -247,23 +238,44 @@ async def service_read_multiple_holding_registers(self, register, values):
await self.async_disconnect()
return

async def service_write_holding_register(self, register, value):
_LOGGER.debug(f'Service Call: write_holding_register : [{register}], value : [{value}]')
try:
await self.async_connect()
await self.write_holding_register(register, value)
except Exception as e:
_LOGGER.warning(f"Service Call: write_holding_register : [{register}], value : [{value}] failed. [{format_exception(e)}]")
await self.async_disconnect()
return
async def service_write_holding_register(self, register, value) -> bool:
_LOGGER.debug(f"service_write_holding_register: {register}, value: {value}")

async def service_write_multiple_holding_registers(self, register, values):
_LOGGER.debug(f'Service Call: write_multiple_holding_registers: [{register}], values : [{values}]')
try:
await self.async_connect()
await self.write_multiple_holding_registers(register, values)
except Exception as e:
_LOGGER.warning(f"Service Call: write_multiple_holding_registers: [{register}], values : [{values}] failed. [{format_exception(e)}]")
await self.async_disconnect()
return
attempts_left = ACTION_RETRY_ATTEMPTS
while attempts_left > 0:
attempts_left -= 1

try:
await self.async_connect()
response = await self.write_holding_register(register, value)
_LOGGER.debug(f"service_write_holding_register: {register}, response: {response}")
return True
except Exception as e:
_LOGGER.warning(f"service_write_holding_register: {register}, value: {value} failed, attempts left: {attempts_left}. [{format_exception(e)}]")
if not self.auto_reconnect:
await self.async_disconnect()
if not attempts_left > 0:
raise

await asyncio.sleep(TIMINGS_WRITE_EXCEPT_SLEEP)

async def service_write_multiple_holding_registers(self, register, values) -> bool:
_LOGGER.debug(f"service_write_multiple_holding_registers: {register}, values: {values}")

attempts_left = ACTION_RETRY_ATTEMPTS
while attempts_left > 0:
attempts_left -= 1

try:
await self.async_connect()
response = await self.write_multiple_holding_registers(register, values)
_LOGGER.debug(f"service_write_multiple_holding_register: {register}, response: {response}")
return True
except Exception as e:
_LOGGER.warning(f"service_write_multiple_holding_registers: {register}, values: {values} failed, attempts left: {attempts_left}. [{format_exception(e)}]")
if not self.auto_reconnect:
await self.async_disconnect()
if not attempts_left > 0:
raise

await asyncio.sleep(TIMINGS_WRITE_EXCEPT_SLEEP)
Loading

0 comments on commit f220060

Please sign in to comment.