diff --git a/commlib/__init__.py b/commlib/__init__.py index 3dbc268..6cbf0d5 100644 --- a/commlib/__init__.py +++ b/commlib/__init__.py @@ -2,7 +2,7 @@ __author__ = """Konstantinos Panayiotou""" __email__ = "klpanagi@gmail.com" -__version__ = "0.11.1" +__version__ = "0.11.2" from .node import Node from .rest_proxy import RESTProxy diff --git a/commlib/bridges.py b/commlib/bridges.py index 690b73b..031731e 100644 --- a/commlib/bridges.py +++ b/commlib/bridges.py @@ -52,7 +52,6 @@ def logger(cls) -> logging.Logger: def __init__( self, - btype: Union[TopicBridgeType, RPCBridgeType], from_uri: str, to_uri: str, from_broker_params: BaseConnectionParameters, @@ -65,7 +64,6 @@ def __init__( btype: debug (bool): debug """ - self._btype = btype self._from_broker_params = from_broker_params self._to_broker_params = to_broker_params self._from_uri = from_uri @@ -77,7 +75,7 @@ def debug(self) -> bool: return self._debug @property - def log(self) -> Logger: + def log(self) -> logging.Logger: return self.logger() def run(self): @@ -99,42 +97,53 @@ class RPCBridge(Bridge): """ - def __init__( - self, btype: RPCBridgeType, msg_type: RPCMessage = None, *args, **kwargs - ): + def __init__(self, + msg_type: RPCMessage = None, + *args, **kwargs): """__init__. Args: btype (RPCBridgeType): RPC Bridge Type """ - super().__init__(btype, *args, **kwargs) + super().__init__(*args, **kwargs) self._msg_type = msg_type - if self._btype == RPCBridgeType.REDIS_TO_AMQP: + bA_type_str = str(type(self._from_broker_params)).split("'")[1] + bB_type_str = str(type(self._to_broker_params)).split("'")[1] + if 'redis' in bA_type_str and 'amqp' in bB_type_str: + self._btype = RPCBridgeType.REDIS_TO_AMQP from_transport = TransportType.REDIS to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.AMQP_TO_REDIS: + elif 'amqp' in bA_type_str and 'redis' in bB_type_str: + self._btype = RPCBridgeType.AMQP_TO_REDIS from_transport = TransportType.AMQP to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.AMQP_TO_AMQP: + elif 'amqp' in bA_type_str and 'amqp' in bB_type_str: + self._btype = RPCBridgeType.AMQP_TO_AMQP from_transport = TransportType.AMQP to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.REDIS_TO_REDIS: + elif 'redis' in bA_type_str and 'redis' in bB_type_str: + self._btype = RPCBridgeType.REDIS_TO_REDIS from_transport = TransportType.REDIS to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.MQTT_TO_REDIS: + elif 'mqtt' in bA_type_str and 'redis' in bB_type_str: + self._btype = RPCBridgeType.MQTT_TO_REDIS from_transport = TransportType.MQTT to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.MQTT_TO_AMQP: + elif 'mqtt' in bA_type_str and 'amqp' in bB_type_str: + self._btype = RPCBridgeType.MQTT_TO_AMQP from_transport = TransportType.MQTT to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.MQTT_TO_MQTT: + elif 'mqtt' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = RPCBridgeType.MQTT_TO_MQTT from_transport = TransportType.MQTT to_transport = TransportType.MQTT - elif self._btype == RPCBridgeType.REDIS_TO_MQTT: + elif 'redis' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = RPCBridgeType.REDIS_TO_MQTT from_transport = TransportType.REDIS to_transport = TransportType.MQTT - elif self._btype == RPCBridgeType.AMQP_TO_MQTT: + elif 'amqp' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = RPCBridgeType.AMQP_TO_MQTT from_transport = TransportType.AMQP to_transport = TransportType.MQTT self._server = endpoint_factory(EndpointType.RPCService, from_transport)( @@ -179,45 +188,57 @@ class TopicBridge(Bridge): """ - def __init__( - self, btype: TopicBridgeType, msg_type: PubSubMessage = None, *args, **kwargs - ): + def __init__(self, + msg_type: PubSubMessage = None, + *args, **kwargs): """__init__. Args: btype (TopicBridgeType): btype msg_type (PubSubMessage): msg_type """ - super().__init__(btype, *args, **kwargs) + super().__init__(*args, **kwargs) self._msg_type = msg_type - if self._btype == RPCBridgeType.REDIS_TO_AMQP: + bA_type_str = str(type(self._from_broker_params)).split("'")[1] + bB_type_str = str(type(self._to_broker_params)).split("'")[1] + if 'redis' in bA_type_str and 'amqp' in bB_type_str: + self._btype = TopicBridgeType.REDIS_TO_AMQP from_transport = TransportType.REDIS to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.AMQP_TO_REDIS: + elif 'amqp' in bA_type_str and 'redis' in bB_type_str: + self._btype = TopicBridgeType.AMQP_TO_REDIS from_transport = TransportType.AMQP to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.AMQP_TO_AMQP: + elif 'amqp' in bA_type_str and 'amqp' in bB_type_str: + self._btype = TopicBridgeType.AMQP_TO_AMQP from_transport = TransportType.AMQP to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.REDIS_TO_REDIS: + elif 'redis' in bA_type_str and 'redis' in bB_type_str: + self._btype = TopicBridgeType.REDIS_TO_REDIS from_transport = TransportType.REDIS to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.MQTT_TO_REDIS: + elif 'mqtt' in bA_type_str and 'redis' in bB_type_str: + self._btype = TopicBridgeType.MQTT_TO_REDIS from_transport = TransportType.MQTT to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.MQTT_TO_AMQP: + elif 'mqtt' in bA_type_str and 'amqp' in bB_type_str: + self._btype = TopicBridgeType.MQTT_TO_AMQP from_transport = TransportType.MQTT to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.MQTT_TO_MQTT: + elif 'mqtt' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = TopicBridgeType.MQTT_TO_MQTT from_transport = TransportType.MQTT to_transport = TransportType.MQTT - elif self._btype == RPCBridgeType.REDIS_TO_MQTT: + elif 'redis' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = TopicBridgeType.REDIS_TO_MQTT from_transport = TransportType.REDIS to_transport = TransportType.MQTT - elif self._btype == RPCBridgeType.AMQP_TO_MQTT: + elif 'amqp' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = TopicBridgeType.AMQP_TO_MQTT from_transport = TransportType.AMQP to_transport = TransportType.MQTT + self._sub = endpoint_factory(EndpointType.Subscriber, from_transport)( topic=self._from_uri, msg_type=self._msg_type, @@ -265,7 +286,6 @@ class PTopicBridge(Bridge): def __init__( self, - btype: TopicBridgeType, msg_type: PubSubMessage = None, uri_transform: List = [], *args, @@ -282,41 +302,46 @@ def __init__( msg_type (PubSubMessage): msg_type debug (bool): debug """ - super().__init__(btype, *args, **kwargs) - if "*" not in from_uri: - raise ValueError("from_uri must be defined using topic patterns") - self._from_broker_params = from_broker_params - self._to_broker_params = to_broker_params - self._from_uri = from_uri - self._to_uri = to_uri + super().__init__(*args, **kwargs) self._msg_type = msg_type self._uri_transform = uri_transform - if self._btype == RPCBridgeType.REDIS_TO_AMQP: + bA_type_str = str(type(self._from_broker_params)).split("'")[1] + bB_type_str = str(type(self._to_broker_params)).split("'")[1] + if 'redis' in bA_type_str and 'amqp' in bB_type_str: + self._btype = TopicBridgeType.REDIS_TO_AMQP from_transport = TransportType.REDIS to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.AMQP_TO_REDIS: + elif 'amqp' in bA_type_str and 'redis' in bB_type_str: + self._btype = TopicBridgeType.AMQP_TO_REDIS from_transport = TransportType.AMQP to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.AMQP_TO_AMQP: + elif 'amqp' in bA_type_str and 'amqp' in bB_type_str: + self._btype = TopicBridgeType.AMQP_TO_AMQP from_transport = TransportType.AMQP to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.REDIS_TO_REDIS: + elif 'redis' in bA_type_str and 'redis' in bB_type_str: + self._btype = TopicBridgeType.REDIS_TO_REDIS from_transport = TransportType.REDIS to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.MQTT_TO_REDIS: + elif 'mqtt' in bA_type_str and 'redis' in bB_type_str: + self._btype = TopicBridgeType.MQTT_TO_REDIS from_transport = TransportType.MQTT to_transport = TransportType.REDIS - elif self._btype == RPCBridgeType.MQTT_TO_AMQP: + elif 'mqtt' in bA_type_str and 'amqp' in bB_type_str: + self._btype = TopicBridgeType.MQTT_TO_AMQP from_transport = TransportType.MQTT to_transport = TransportType.AMQP - elif self._btype == RPCBridgeType.MQTT_TO_MQTT: + elif 'mqtt' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = TopicBridgeType.MQTT_TO_MQTT from_transport = TransportType.MQTT to_transport = TransportType.MQTT - elif self._btype == RPCBridgeType.REDIS_TO_MQTT: + elif 'redis' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = TopicBridgeType.REDIS_TO_MQTT from_transport = TransportType.REDIS to_transport = TransportType.MQTT - elif self._btype == RPCBridgeType.AMQP_TO_MQTT: + elif 'amqp' in bA_type_str and 'mqtt' in bB_type_str: + self._btype = TopicBridgeType.AMQP_TO_MQTT from_transport = TransportType.AMQP to_transport = TransportType.MQTT self._sub = endpoint_factory(EndpointType.PSubscriber, from_transport)( diff --git a/pyproject.toml b/pyproject.toml index 205eada..8d78be8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,18 +1,19 @@ [tool.poetry] name = "commlib-py" -version = "0.11.1" -description = "Internal DSL for communication and messaging in CPS" +version = "0.11.2" +description = "Internal DSL for communication and messaging in CyberPhysical Systems" authors = ["Konstantinos Panayiotou "] readme = "README.md" packages = [{include = "commlib"}] [tool.poetry.dependencies] +python = "^3.7" requests = "^2.1.0" pika = "^1.3.1" paho-mqtt = "^1.6.1" hiredis = "^2.1.1" ujson = "^5.7.0" -pydantic = "^1.10.4" +pydantic = "^2.0.0" [tool.poetry.group.dev.dependencies] bump2version = "0.5.11" diff --git a/setup.cfg b/setup.cfg index 6a545b5..36286e6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -46,7 +46,7 @@ install_requires = redis hiredis ujson - pydantic + pydantic>=2.0 requests [flake8]