Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add config support for TCP Bridges #55

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions interoperability/mqtt/brokers/bridges/TCPBridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,75 +46,104 @@ def clear(self):
self.__init__()

def disconnected(self, reasoncode, properties):
logging.info("disconnected %s %s", str(reasoncode), str(properties))
logger.info("Bridge: disconnected %s %s", str(reasoncode), str(properties))
self.disconnects.append({"reasonCode" : reasoncode, "properties" : properties})

def connectionLost(self, cause):
logging.info("connectionLost %s" % str(cause))
logger.info("Bridge: connectionLost %s" % str(cause))

def publishArrived(self, topicName, payload, qos, retained, msgid, properties=None):
logging.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties))
logger.info("Bridge: publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties))
self.messages.append((topicName, payload, qos, retained, msgid, properties))
self.messagedicts.append({"topicname" : topicName, "payload" : payload,
"qos" : qos, "retained" : retained, "msgid" : msgid, "properties" : properties})

# add to local broker
self.broker.broker.publish(aClientid, topic, message, qos, properties, receivedTime, retained)
self.broker.broker.publish("bridge", topicName, payload, qos, retained, properties, time.monotonic())
return True

def published(self, msgid):
logging.info("published %d", msgid)
logger.info("Bridge: published %d", msgid)
self.publisheds.append(msgid)

def subscribed(self, msgid, data):
logging.info("subscribed %d", msgid)
logger.info("Bridge: subscribed %d", msgid)
self.subscribeds.append((msgid, data))

def unsubscribed(self, msgid):
logging.info("unsubscribed %d", msgid)
logger.info("Bridge: unsubscribed %d", msgid)
self.unsubscribeds.append(msgid)


class Bridges:

def __init__(self, host, port):
def __init__(self, name="local", host="localhost", port=1883, topic="+", direction="both", localprefix="", remoteprefix=""):
self.name = name
self.host = host
self.port = port
self.client = mqtt.clients.V5.Client("local")
self.port = int(port)
self.topic = topic
self.direction = direction
self.localprefix = localprefix.strip('\"')
self.remoteprefix = remoteprefix.strip('\"')
self.client = mqtt.clients.V5.Client(name)
self.callback = Callbacks(broker5)
self.client.registerCallback(self.callback)
self.local_connect()

def local_connect(self):
# connect locally with V5, so we get noLocal and retainAsPublished
connect = MQTTV5.Connects()
connect.ClientIdentifier = "local"
connect.ClientIdentifier = self.name
logger.debug("Bridge: local_connect:"+connect.ClientIdentifier)
broker5.connect(self, connect)
subscribe = MQTTV5.Subscribes()
options = MQTTV5.SubscribeOptions()
options.noLocal = options.retainAsPublished = True
subscribe.data = [('+', options)]
subscribe.data = [(self.topic, options)]
broker5.subscribe(self, subscribe)

def connect(self):
self.client.connect(host=self.host, port=self.port, cleanstart=True)
logger.info("Bridge: connect: connecting to %s:%d"%(self.host, self.port))
connected = False
retry=2
while not connected:
try:
self.client.connect(host=self.host, port=self.port, cleanstart=True)
connected = True
except OSError as e:
#try again with a small amount of backoff, could ake this configurable
logger.debug("Bridge: failed to connect to remote end due to an OS Error (%s), retrying...",str(e))
time.sleep(retry)
retry *= 2
except MQTTV5.MQTTException as e:
#I think we'll retry this one too, the other end probably wasn't ready
logger.debug("Bridge: failed to connect to remote end due to an MQTT Error (%s), retrying...",str(e))
time.sleep(retry)
retry *= 2


# subscribe if necessary
options = MQTTV5.SubscribeOptions()
options.noLocal = options.retainAsPublished = True
self.client.subscribe(["+"], [options])
if self.direction == "both" or self.direction == "in":
self.client.subscribe([self.topic], [options])
else:
logger.info("Bridge: not subscribing to remote")

def getPacket(self):
# get packet from remote
pass

def handlePacket(self, packet):
# response from local broker
logger.info("from local broker %s", str(packet))
logger.info("Bridge: from local broker %s", str(packet))
if packet.fh.PacketType == MQTTV5.PacketTypes.PUBLISH:
self.client.publish(packet.topicName, packet.data, packet.fh.QoS) #retained=False, properties=None)
logger.info("Bridge: sending on %s", self.remoteprefix+packet.topicName)
self.client.publish(self.remoteprefix+packet.topicName, packet.data, packet.fh.QoS) #retained=False, properties=None)

def run(self):
while True:
logger.info("Bridge: initiating connect logic")
self.connect()
time.sleep(300)
self.shutdown()
Expand All @@ -123,14 +152,14 @@ def setBroker5(aBroker5):
global broker5
broker5 = aBroker5

def create(port, host="", TLS=False,
def create(name="local", host="localhost", port=1883, topic="+", direction="both", localprefix="", remoteprefix="", TLS=False,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=None, certfile=None, keyfile=None):

if host == "":
host = "localhost"
logger.info("Starting TCP bridge for address '%s' port %d %s", host, port, "with TLS support" if TLS else "")
bridge = Bridges(host, port)
logger.info("Bridge: Starting TCP bridge '%s' for address '%s' port %d %s", name, host, int(port), "with TLS support" if TLS else "")
bridge = Bridges(name, host, port, topic, direction, localprefix, remoteprefix)
thread = threading.Thread(target=bridge.run)
thread.start()
return bridge
Expand Down
64 changes: 59 additions & 5 deletions interoperability/mqtt/brokers/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def setup_persistence(persistence_filename):
def process_config(config):
options = {}
servers_to_create = []
bridges_to_create = []
lineno = 0
while lineno < len(config):
curline = config[lineno].strip()
Expand Down Expand Up @@ -81,7 +82,8 @@ def process_config(config):
if len(words) >= 4:
if words[3] in ["mqttsn", "http"]:
protocol = words[3]
while lineno < len(config) and not config[lineno].strip().startswith("listener"):
while lineno < len(config) and not (config[lineno].strip().startswith("listener") or
config[lineno].strip().startswith("connection") ):
curline = config[lineno].strip()
lineno += 1
if curline.startswith('#') or len(curline) == 0:
Expand All @@ -104,8 +106,54 @@ def process_config(config):
elif protocol == "http":
servers_to_create.append((HTTPListeners, {"host":bind_address, "port":port, "TLS":TLS, "cert_reqs":cert_reqs,
"ca_certs":ca_certs, "certfile":certfile, "keyfile":keyfile}))
elif words[0] == "connection":
# Bridge connection, pull out address, protocol and topic lines.
bridgename="local"
address = "localhost"
host = "localhost"
port = "1883"
protocol = "mqtt"
topic = "+"
direction = "both"
localprefix = ""
remoteprefix = ""
if len(words) > 1:
bridgename = words[1]
while lineno < len(config) and not (config[lineno].strip().startswith("listener") or
config[lineno].strip().startswith("connection")) :
curline = config[lineno].strip()
lineno+=1
if curline.startswith('#') or len(curline) == 0:
continue
words = curline.split()
if words[0] == "protocol":
protocol = words[1]
elif words[0] == "address":
address = words[1]
parts = address.split(":")
host = parts[0]
if len(parts)>1:
port = int(parts[1])
elif words[0] == "topic":
if len(words) > 1:
topic = words[1]
if len(words) > 2:
direction = words[2]
if len(words) > 3:
localprefix = words[3]
if len(words) > 4:
remoteprefix = words[4]
if protocol == "mqtt":
bridges_to_create.append((TCPBridges, {"name":bridgename,
"host":host,
"port":port,
"topic":topic,
"direction":direction,
"localprefix":localprefix,
"remoteprefix":remoteprefix}))

servers_to_create[-1][1]["serve_forever"] = True
return servers_to_create, options
return servers_to_create, options, bridges_to_create

def run(config=None):
global logger, broker3, broker5, brokerSN, server
Expand All @@ -122,7 +170,7 @@ def run(config=None):

options = {}
if config != None:
servers_to_create, options = process_config(config)
servers_to_create, options, bridges_to_create = process_config(config)

broker3 = MQTTV3Brokers(options=options, lock=lock, sharedData=sharedData)

Expand All @@ -139,17 +187,23 @@ def run(config=None):
brokerSN.setBroker5(broker5)

servers = []
bridges = []
UDPListeners.setBroker(brokerSN)
TCPListeners.setBrokers(broker3, broker5)
HTTPListeners.setBrokers(broker3, broker5, brokerSN)
HTTPListeners.setSharedData(lock, sharedData)

try:
if config == None:
#TCPBridges.setBroker5(broker5)
#TCPBridges.create(1886)
# TCPBridges.setBroker5(broker5)
# TCPBridges.create("bridge",1883,"172.16.0.4")
servers.append(TCPListeners.create(1883, serve_forever=True))
else:
logger.debug("Starting bridges")
for bridge in bridges_to_create:
bridge[0].setBroker5(broker5)
bridges.append(bridge[0].create(**bridge[1]))
logger.debug("Starting servers")
for server in servers_to_create:
servers.append(server[0].create(**server[1]))
except KeyboardInterrupt:
Expand Down
21 changes: 13 additions & 8 deletions interoperability/mqtt/clients/V5/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
logger.addHandler(ch)

def sendtosocket(mysocket, data):
logger.debug("out: %s", str(data))
logger.debug("MQTTv5 Client: sendtosocket: %s", str(data))
sent = 0
length = len(data)
try:
Expand All @@ -48,23 +48,23 @@ def sendtosocket(mysocket, data):
class Callback:

def connectionLost(self, cause):
logger.debug("default connectionLost %s", str(cause))
logger.debug("MQTTv5 Client: default connectionLost %s", str(cause))

def publishArrived(self, topicName, payload, qos, retained, msgid):
logger.debug("default publishArrived %s %s %d %d %d", topicName, payload, qos, retained, msgid)
logger.debug("MQTTv5 Client: default publishArrived %s %s %d %d %d", topicName, payload, qos, retained, msgid)
return True

def published(self, msgid):
logger.debug("default published %d", msgid)
logger.debug("MQTTv5 Client: default published %d", msgid)

def subscribed(self, msgid):
logger.debug("default subscribed %d", msgid)
logger.debug("MQTTv5 Client: default subscribed %d", msgid)

def unsubscribed(self, msgid):
logger.debug("default unsubscribed %d", msgid)
logger.debug("MQTTv5 Client: default unsubscribed %d", msgid)

def disconnected(self, reasoncode, properties):
logger.debug("default disconnected")
logger.debug("MQTTv5 Client: default disconnected")



Expand All @@ -74,6 +74,7 @@ def getReceiver(self):
return self.__receiver

def __init__(self, clientid):
logger.debug("MQTTv5 Client: init(%s)", clientid)
self.clientid = clientid
self.msgid = 1
self.callback = None
Expand Down Expand Up @@ -105,6 +106,7 @@ def registerCallback(self, callback):
def connect(self, host="localhost", port=1883, cleanstart=True, keepalive=0, newsocket=True, protocolName=None,
willFlag=False, willTopic=None, willMessage=None, willQoS=2, willRetain=False, username=None, password=None,
properties=None, willProperties=None):
logger.debug("MQTTv5 Client: connect(host=%s, newsocket=%s)", host, newsocket)
if newsocket:
try:
self.sock.close()
Expand Down Expand Up @@ -181,6 +183,7 @@ def unsubscribe(self, topics):


def publish(self, topic, payload, qos=0, retained=False, properties=None):
logger.debug("MQTTv5 Client: publish on topic '%s'", topic)
publish = MQTTV5.Publishes()
publish.fh.QoS = qos
publish.fh.RETAIN = retained
Expand All @@ -200,11 +203,12 @@ def publish(self, topic, payload, qos=0, retained=False, properties=None):


def disconnect(self, properties=None):
logger.debug("MQTTv5 Client: disconnecting")
if self.__receiver:
self.__receiver.stopping = True
count = 0
while (len(self.__receiver.inMsgs) > 0 or len(self.__receiver.outMsgs) > 0) and self.__receiver.paused == False:
logger.debug("disconnecting %s %s", self.__receiver.inMsgs, self.__receiver.outMsgs)
logger.debug("MQTTv5 Client: disconnecting %s %s", self.__receiver.inMsgs, self.__receiver.outMsgs)
time.sleep(.2)
count += 1
if count == 20:
Expand All @@ -228,6 +232,7 @@ def disconnect(self, properties=None):
self.__receiver.stopping = False

def terminate(self):
logger.debug("MQTTv5 Client: terminating")
if self.__receiver:
self.__receiver.stopping = True
self.sock.shutdown(socket.SHUT_RDWR)
Expand Down
6 changes: 6 additions & 0 deletions interoperability/udp.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ loglevel debug
listener 1883

listener 1883 INADDR_ANY mqttsn

#connection loopback
# protocol mqtt
# address localhost
# topic + both "" ""