From c781a225f60b1b5d39cfdad713d3d5bb4799c844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Colomb?= Date: Thu, 17 Oct 2024 21:36:11 +0200 Subject: [PATCH] Add a dummy object to designate "uninitialized" networks (fixes #511) (#525) Add an _UnitializedNetwork class and a singleton _UNINITIALIZED_NETWORK instance. It can replace the dummy "None" value for attribute initializations, which can then be properly typed as Network to avoid static type checking errors. This has the benefit of not needing `self.network is not None` checks at run-time wherever a method or attribute access is used, but still satisfies static type checking. When hitting such a code path at run-time, of course it will lead to an exception because the attributes required in the Network methods are not set. But that is a case of wrong API usage (accessing a network without associating it first), which a static checker cannot detect reliably. The dummy class provides a descriptive exception message when any attribute is accessed on it. --- canopen/emcy.py | 5 ++++- canopen/lss.py | 7 +++++-- canopen/network.py | 17 ++++++++++++++++- canopen/nmt.py | 4 +++- canopen/node/base.py | 8 +++++++- canopen/node/local.py | 19 +++++++++++-------- canopen/node/remote.py | 21 ++++++++++++--------- canopen/pdo/base.py | 4 ++-- canopen/sdo/base.py | 3 ++- 9 files changed, 62 insertions(+), 26 deletions(-) diff --git a/canopen/emcy.py b/canopen/emcy.py index 1bbdeb75..520cfdc0 100644 --- a/canopen/emcy.py +++ b/canopen/emcy.py @@ -4,6 +4,9 @@ import time from typing import Callable, List, Optional +import canopen.network + + # Error code, error register, vendor specific data EMCY_STRUCT = struct.Struct(" None: + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self._node_id = 0 self._data = None self.responses = queue.Queue() diff --git a/canopen/network.py b/canopen/network.py index cd754c8c..9d6e66c6 100644 --- a/canopen/network.py +++ b/canopen/network.py @@ -3,7 +3,7 @@ from collections.abc import MutableMapping import logging import threading -from typing import Callable, Dict, Iterator, List, Optional, Union +from typing import Callable, Dict, Final, Iterator, List, Optional, Union import can from can import Listener @@ -282,6 +282,21 @@ def __len__(self) -> int: return len(self.nodes) +class _UninitializedNetwork(Network): + """Empty network implementation as a placeholder before actual initialization.""" + + def __init__(self, bus: Optional[can.BusABC] = None): + """Do not initialize attributes, by skipping the parent constructor.""" + + def __getattribute__(self, name): + raise RuntimeError("No actual Network object was assigned, " + "try associating to a real network first.") + + +#: Singleton instance +_UNINITIALIZED_NETWORK: Final[Network] = _UninitializedNetwork() + + class PeriodicMessageTask: """ Task object to transmit a message periodically using python-can's diff --git a/canopen/nmt.py b/canopen/nmt.py index 125042d3..6f29e917 100644 --- a/canopen/nmt.py +++ b/canopen/nmt.py @@ -4,6 +4,8 @@ import time from typing import Callable, Optional, TYPE_CHECKING +import canopen.network + if TYPE_CHECKING: from canopen.network import PeriodicMessageTask @@ -49,7 +51,7 @@ class NmtBase: def __init__(self, node_id: int): self.id = node_id - self.network = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self._state = 0 def on_command(self, can_id, data, timestamp): diff --git a/canopen/node/base.py b/canopen/node/base.py index bf72d959..45ad35b4 100644 --- a/canopen/node/base.py +++ b/canopen/node/base.py @@ -1,4 +1,6 @@ from typing import TextIO, Union + +import canopen.network from canopen.objectdictionary import ObjectDictionary, import_od @@ -17,10 +19,14 @@ def __init__( node_id: int, object_dictionary: Union[ObjectDictionary, str, TextIO], ): - self.network = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK if not isinstance(object_dictionary, ObjectDictionary): object_dictionary = import_od(object_dictionary, node_id) self.object_dictionary = object_dictionary self.id = node_id or self.object_dictionary.node_id + + def has_network(self) -> bool: + """Check whether the node has been associated to a network.""" + return not isinstance(self.network, canopen.network._UninitializedNetwork) diff --git a/canopen/node/local.py b/canopen/node/local.py index eb74b98d..eb614601 100644 --- a/canopen/node/local.py +++ b/canopen/node/local.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import logging from typing import Dict, Union +import canopen.network from canopen.node.base import BaseNode from canopen.sdo import SdoServer, SdoAbortedError from canopen.pdo import PDO, TPDO, RPDO @@ -34,7 +37,7 @@ def __init__( self.add_write_callback(self.nmt.on_write) self.emcy = EmcyProducer(0x80 + self.id) - def associate_network(self, network): + def associate_network(self, network: canopen.network.Network): self.network = network self.sdo.network = network self.tpdo.network = network @@ -44,15 +47,15 @@ def associate_network(self, network): network.subscribe(self.sdo.rx_cobid, self.sdo.on_request) network.subscribe(0, self.nmt.on_command) - def remove_network(self): + def remove_network(self) -> None: self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request) self.network.unsubscribe(0, self.nmt.on_command) - self.network = None - self.sdo.network = None - self.tpdo.network = None - self.rpdo.network = None - self.nmt.network = None - self.emcy.network = None + self.network = canopen.network._UNINITIALIZED_NETWORK + self.sdo.network = canopen.network._UNINITIALIZED_NETWORK + self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.nmt.network = canopen.network._UNINITIALIZED_NETWORK + self.emcy.network = canopen.network._UNINITIALIZED_NETWORK def add_read_callback(self, callback): self._read_callbacks.append(callback) diff --git a/canopen/node/remote.py b/canopen/node/remote.py index 4f3281db..b354b8f9 100644 --- a/canopen/node/remote.py +++ b/canopen/node/remote.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import logging from typing import Union, TextIO +import canopen.network from canopen.sdo import SdoClient, SdoCommunicationError, SdoAbortedError from canopen.nmt import NmtMaster from canopen.emcy import EmcyConsumer @@ -46,7 +49,7 @@ def __init__( if load_od: self.load_configuration() - def associate_network(self, network): + def associate_network(self, network: canopen.network.Network): self.network = network self.sdo.network = network self.pdo.network = network @@ -59,18 +62,18 @@ def associate_network(self, network): network.subscribe(0x80 + self.id, self.emcy.on_emcy) network.subscribe(0, self.nmt.on_command) - def remove_network(self): + def remove_network(self) -> None: for sdo in self.sdo_channels: self.network.unsubscribe(sdo.tx_cobid, sdo.on_response) self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat) self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy) self.network.unsubscribe(0, self.nmt.on_command) - self.network = None - self.sdo.network = None - self.pdo.network = None - self.tpdo.network = None - self.rpdo.network = None - self.nmt.network = None + self.network = canopen.network._UNINITIALIZED_NETWORK + self.sdo.network = canopen.network._UNINITIALIZED_NETWORK + self.pdo.network = canopen.network._UNINITIALIZED_NETWORK + self.tpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.rpdo.network = canopen.network._UNINITIALIZED_NETWORK + self.nmt.network = canopen.network._UNINITIALIZED_NETWORK def add_sdo(self, rx_cobid, tx_cobid): """Add an additional SDO channel. @@ -87,7 +90,7 @@ def add_sdo(self, rx_cobid, tx_cobid): """ client = SdoClient(rx_cobid, tx_cobid, self.object_dictionary) self.sdo_channels.append(client) - if self.network is not None: + if self.has_network(): self.network.subscribe(client.tx_cobid, client.on_response) return client diff --git a/canopen/pdo/base.py b/canopen/pdo/base.py index f2a7d205..dc81aa60 100644 --- a/canopen/pdo/base.py +++ b/canopen/pdo/base.py @@ -6,12 +6,12 @@ import logging import binascii +import canopen.network from canopen.sdo import SdoAbortedError from canopen import objectdictionary from canopen import variable if TYPE_CHECKING: - from canopen.network import Network from canopen import LocalNode, RemoteNode from canopen.pdo import RPDO, TPDO from canopen.sdo import SdoRecord @@ -30,7 +30,7 @@ class PdoBase(Mapping): """ def __init__(self, node: Union[LocalNode, RemoteNode]): - self.network: Optional[Network] = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self.map: Optional[PdoMaps] = None self.node: Union[LocalNode, RemoteNode] = node diff --git a/canopen/sdo/base.py b/canopen/sdo/base.py index 81d5e710..39b10634 100644 --- a/canopen/sdo/base.py +++ b/canopen/sdo/base.py @@ -4,6 +4,7 @@ from typing import Iterator, Optional, Union from collections.abc import Mapping +import canopen.network from canopen import objectdictionary from canopen import variable from canopen.utils import pretty_index @@ -43,7 +44,7 @@ def __init__( """ self.rx_cobid = rx_cobid self.tx_cobid = tx_cobid - self.network = None + self.network: canopen.network.Network = canopen.network._UNINITIALIZED_NETWORK self.od = od def __getitem__(