Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/devel'
Browse files Browse the repository at this point in the history
  • Loading branch information
klpanagi committed Nov 28, 2023
2 parents 1584e26 + 7803e15 commit 777a281
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 51 deletions.
2 changes: 1 addition & 1 deletion commlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__author__ = """Konstantinos Panayiotou"""
__email__ = "[email protected]"
__version__ = "0.11.1"
__version__ = "0.11.2"

from .node import Node
from .rest_proxy import RESTProxy
117 changes: 71 additions & 46 deletions commlib/bridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def logger(cls) -> logging.Logger:

def __init__(
self,
btype: Union[TopicBridgeType, RPCBridgeType],
from_uri: str,
to_uri: str,
from_broker_params: BaseConnectionParameters,
Expand All @@ -65,7 +64,6 @@ def __init__(
btype:
debug (bool): debug
"""
self._btype = btype
self._from_broker_params = from_broker_params
self._to_broker_params = to_broker_params
self._from_uri = from_uri
Expand All @@ -77,7 +75,7 @@ def debug(self) -> bool:
return self._debug

@property
def log(self) -> Logger:
def log(self) -> logging.Logger:
return self.logger()

def run(self):
Expand All @@ -99,42 +97,53 @@ class RPCBridge(Bridge):
<from> <to>
"""

def __init__(
self, btype: RPCBridgeType, msg_type: RPCMessage = None, *args, **kwargs
):
def __init__(self,
msg_type: RPCMessage = None,
*args, **kwargs):
"""__init__.
Args:
btype (RPCBridgeType): RPC Bridge Type
"""
super().__init__(btype, *args, **kwargs)
super().__init__(*args, **kwargs)
self._msg_type = msg_type

if self._btype == RPCBridgeType.REDIS_TO_AMQP:
bA_type_str = str(type(self._from_broker_params)).split("'")[1]
bB_type_str = str(type(self._to_broker_params)).split("'")[1]
if 'redis' in bA_type_str and 'amqp' in bB_type_str:
self._btype = RPCBridgeType.REDIS_TO_AMQP
from_transport = TransportType.REDIS
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.AMQP_TO_REDIS:
elif 'amqp' in bA_type_str and 'redis' in bB_type_str:
self._btype = RPCBridgeType.AMQP_TO_REDIS
from_transport = TransportType.AMQP
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.AMQP_TO_AMQP:
elif 'amqp' in bA_type_str and 'amqp' in bB_type_str:
self._btype = RPCBridgeType.AMQP_TO_AMQP
from_transport = TransportType.AMQP
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.REDIS_TO_REDIS:
elif 'redis' in bA_type_str and 'redis' in bB_type_str:
self._btype = RPCBridgeType.REDIS_TO_REDIS
from_transport = TransportType.REDIS
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.MQTT_TO_REDIS:
elif 'mqtt' in bA_type_str and 'redis' in bB_type_str:
self._btype = RPCBridgeType.MQTT_TO_REDIS
from_transport = TransportType.MQTT
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.MQTT_TO_AMQP:
elif 'mqtt' in bA_type_str and 'amqp' in bB_type_str:
self._btype = RPCBridgeType.MQTT_TO_AMQP
from_transport = TransportType.MQTT
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.MQTT_TO_MQTT:
elif 'mqtt' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = RPCBridgeType.MQTT_TO_MQTT
from_transport = TransportType.MQTT
to_transport = TransportType.MQTT
elif self._btype == RPCBridgeType.REDIS_TO_MQTT:
elif 'redis' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = RPCBridgeType.REDIS_TO_MQTT
from_transport = TransportType.REDIS
to_transport = TransportType.MQTT
elif self._btype == RPCBridgeType.AMQP_TO_MQTT:
elif 'amqp' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = RPCBridgeType.AMQP_TO_MQTT
from_transport = TransportType.AMQP
to_transport = TransportType.MQTT
self._server = endpoint_factory(EndpointType.RPCService, from_transport)(
Expand Down Expand Up @@ -179,45 +188,57 @@ class TopicBridge(Bridge):
<from> <to>
"""

def __init__(
self, btype: TopicBridgeType, msg_type: PubSubMessage = None, *args, **kwargs
):
def __init__(self,
msg_type: PubSubMessage = None,
*args, **kwargs):
"""__init__.
Args:
btype (TopicBridgeType): btype
msg_type (PubSubMessage): msg_type
"""
super().__init__(btype, *args, **kwargs)
super().__init__(*args, **kwargs)
self._msg_type = msg_type

if self._btype == RPCBridgeType.REDIS_TO_AMQP:
bA_type_str = str(type(self._from_broker_params)).split("'")[1]
bB_type_str = str(type(self._to_broker_params)).split("'")[1]
if 'redis' in bA_type_str and 'amqp' in bB_type_str:
self._btype = TopicBridgeType.REDIS_TO_AMQP
from_transport = TransportType.REDIS
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.AMQP_TO_REDIS:
elif 'amqp' in bA_type_str and 'redis' in bB_type_str:
self._btype = TopicBridgeType.AMQP_TO_REDIS
from_transport = TransportType.AMQP
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.AMQP_TO_AMQP:
elif 'amqp' in bA_type_str and 'amqp' in bB_type_str:
self._btype = TopicBridgeType.AMQP_TO_AMQP
from_transport = TransportType.AMQP
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.REDIS_TO_REDIS:
elif 'redis' in bA_type_str and 'redis' in bB_type_str:
self._btype = TopicBridgeType.REDIS_TO_REDIS
from_transport = TransportType.REDIS
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.MQTT_TO_REDIS:
elif 'mqtt' in bA_type_str and 'redis' in bB_type_str:
self._btype = TopicBridgeType.MQTT_TO_REDIS
from_transport = TransportType.MQTT
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.MQTT_TO_AMQP:
elif 'mqtt' in bA_type_str and 'amqp' in bB_type_str:
self._btype = TopicBridgeType.MQTT_TO_AMQP
from_transport = TransportType.MQTT
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.MQTT_TO_MQTT:
elif 'mqtt' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = TopicBridgeType.MQTT_TO_MQTT
from_transport = TransportType.MQTT
to_transport = TransportType.MQTT
elif self._btype == RPCBridgeType.REDIS_TO_MQTT:
elif 'redis' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = TopicBridgeType.REDIS_TO_MQTT
from_transport = TransportType.REDIS
to_transport = TransportType.MQTT
elif self._btype == RPCBridgeType.AMQP_TO_MQTT:
elif 'amqp' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = TopicBridgeType.AMQP_TO_MQTT
from_transport = TransportType.AMQP
to_transport = TransportType.MQTT

self._sub = endpoint_factory(EndpointType.Subscriber, from_transport)(
topic=self._from_uri,
msg_type=self._msg_type,
Expand Down Expand Up @@ -265,7 +286,6 @@ class PTopicBridge(Bridge):

def __init__(
self,
btype: TopicBridgeType,
msg_type: PubSubMessage = None,
uri_transform: List = [],
*args,
Expand All @@ -282,41 +302,46 @@ def __init__(
msg_type (PubSubMessage): msg_type
debug (bool): debug
"""
super().__init__(btype, *args, **kwargs)
if "*" not in from_uri:
raise ValueError("from_uri must be defined using topic patterns")
self._from_broker_params = from_broker_params
self._to_broker_params = to_broker_params
self._from_uri = from_uri
self._to_uri = to_uri
super().__init__(*args, **kwargs)
self._msg_type = msg_type
self._uri_transform = uri_transform

if self._btype == RPCBridgeType.REDIS_TO_AMQP:
bA_type_str = str(type(self._from_broker_params)).split("'")[1]
bB_type_str = str(type(self._to_broker_params)).split("'")[1]
if 'redis' in bA_type_str and 'amqp' in bB_type_str:
self._btype = TopicBridgeType.REDIS_TO_AMQP
from_transport = TransportType.REDIS
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.AMQP_TO_REDIS:
elif 'amqp' in bA_type_str and 'redis' in bB_type_str:
self._btype = TopicBridgeType.AMQP_TO_REDIS
from_transport = TransportType.AMQP
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.AMQP_TO_AMQP:
elif 'amqp' in bA_type_str and 'amqp' in bB_type_str:
self._btype = TopicBridgeType.AMQP_TO_AMQP
from_transport = TransportType.AMQP
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.REDIS_TO_REDIS:
elif 'redis' in bA_type_str and 'redis' in bB_type_str:
self._btype = TopicBridgeType.REDIS_TO_REDIS
from_transport = TransportType.REDIS
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.MQTT_TO_REDIS:
elif 'mqtt' in bA_type_str and 'redis' in bB_type_str:
self._btype = TopicBridgeType.MQTT_TO_REDIS
from_transport = TransportType.MQTT
to_transport = TransportType.REDIS
elif self._btype == RPCBridgeType.MQTT_TO_AMQP:
elif 'mqtt' in bA_type_str and 'amqp' in bB_type_str:
self._btype = TopicBridgeType.MQTT_TO_AMQP
from_transport = TransportType.MQTT
to_transport = TransportType.AMQP
elif self._btype == RPCBridgeType.MQTT_TO_MQTT:
elif 'mqtt' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = TopicBridgeType.MQTT_TO_MQTT
from_transport = TransportType.MQTT
to_transport = TransportType.MQTT
elif self._btype == RPCBridgeType.REDIS_TO_MQTT:
elif 'redis' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = TopicBridgeType.REDIS_TO_MQTT
from_transport = TransportType.REDIS
to_transport = TransportType.MQTT
elif self._btype == RPCBridgeType.AMQP_TO_MQTT:
elif 'amqp' in bA_type_str and 'mqtt' in bB_type_str:
self._btype = TopicBridgeType.AMQP_TO_MQTT
from_transport = TransportType.AMQP
to_transport = TransportType.MQTT
self._sub = endpoint_factory(EndpointType.PSubscriber, from_transport)(
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
[tool.poetry]
name = "commlib-py"
version = "0.11.1"
description = "Internal DSL for communication and messaging in CPS"
version = "0.11.2"
description = "Internal DSL for communication and messaging in CyberPhysical Systems"
authors = ["Konstantinos Panayiotou <[email protected]>"]
readme = "README.md"
packages = [{include = "commlib"}]

[tool.poetry.dependencies]
python = "^3.7"
requests = "^2.1.0"
pika = "^1.3.1"
paho-mqtt = "^1.6.1"
hiredis = "^2.1.1"
ujson = "^5.7.0"
pydantic = "^1.10.4"
pydantic = "^2.0.0"

[tool.poetry.group.dev.dependencies]
bump2version = "0.5.11"
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ install_requires =
redis
hiredis
ujson
pydantic
pydantic>=2.0
requests

[flake8]
Expand Down

0 comments on commit 777a281

Please sign in to comment.