Skip to content

Commit

Permalink
add config support for TCP Bridges
Browse files Browse the repository at this point in the history
Signed-off-by: Andreas Martens <[email protected]>
  • Loading branch information
andreas-ibm committed Aug 9, 2018
1 parent 989221f commit 5b8ad8d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 20 deletions.
40 changes: 25 additions & 15 deletions interoperability/mqtt/brokers/bridges/TCPBridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,45 @@ def clear(self):
self.__init__()

def disconnected(self, reasoncode, properties):
logging.info("disconnected %s %s", str(reasoncode), str(properties))
logger.info("disconnected %s %s", str(reasoncode), str(properties))
self.disconnects.append({"reasonCode" : reasoncode, "properties" : properties})

def connectionLost(self, cause):
logging.info("connectionLost %s" % str(cause))
logger.info("connectionLost %s" % str(cause))

def publishArrived(self, topicName, payload, qos, retained, msgid, properties=None):
logging.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties))
logger.info("publishArrived %s %s %d %s %d %s", topicName, payload, qos, retained, msgid, str(properties))
self.messages.append((topicName, payload, qos, retained, msgid, properties))
self.messagedicts.append({"topicname" : topicName, "payload" : payload,
"qos" : qos, "retained" : retained, "msgid" : msgid, "properties" : properties})

# add to local broker
self.broker.broker.publish(aClientid, topic, message, qos, properties, receivedTime, retained)
self.broker.broker.publish("bridge", topicName, payload, qos, retained, properties, time.monotonic())
return True

def published(self, msgid):
logging.info("published %d", msgid)
logger.info("published %d", msgid)
self.publisheds.append(msgid)

def subscribed(self, msgid, data):
logging.info("subscribed %d", msgid)
logger.info("subscribed %d", msgid)
self.subscribeds.append((msgid, data))

def unsubscribed(self, msgid):
logging.info("unsubscribed %d", msgid)
logger.info("unsubscribed %d", msgid)
self.unsubscribeds.append(msgid)


class Bridges:

def __init__(self, host, port):
def __init__(self, name="local", host="localhost", port=1883, topic="+", direction="both", localprefix="", remoteprefix=""):
self.name = name
self.host = host
self.port = port
self.port = int(port)
self.topic = topic
self.direction = direction
self.localprefix = localprefix
self.remoteprefix = remoteprefix
self.client = mqtt.clients.V5.Client("local")
self.callback = Callbacks(broker5)
self.client.registerCallback(self.callback)
Expand All @@ -88,20 +93,25 @@ def __init__(self, host, port):
def local_connect(self):
# connect locally with V5, so we get noLocal and retainAsPublished
connect = MQTTV5.Connects()
connect.ClientIdentifier = "local"
connect.ClientIdentifier = self.name
logger.debug(connect.ClientIdentifier)
broker5.connect(self, connect)
subscribe = MQTTV5.Subscribes()
options = MQTTV5.SubscribeOptions()
options.noLocal = options.retainAsPublished = True
subscribe.data = [('+', options)]
subscribe.data = [(self.topic, options)]
broker5.subscribe(self, subscribe)

def connect(self):
logger.info("Bridge: connecting to %s:%d"%(self.host, self.port))
self.client.connect(host=self.host, port=self.port, cleanstart=True)
# subscribe if necessary
options = MQTTV5.SubscribeOptions()
options.noLocal = options.retainAsPublished = True
self.client.subscribe(["+"], [options])
if self.direction == "both" or self.direction == "in":
self.client.subscribe([self.topic], [options])
else:
logger.info("Bridge: not subscribing to remote")

def getPacket(self):
# get packet from remote
Expand All @@ -123,14 +133,14 @@ def setBroker5(aBroker5):
global broker5
broker5 = aBroker5

def create(port, host="", TLS=False,
def create(name="local", host="localhost", port=1883, topic="+", direction="both", localprefix="", remoteprefix="", TLS=False,
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=None, certfile=None, keyfile=None):

if host == "":
host = "localhost"
logger.info("Starting TCP bridge for address '%s' port %d %s", host, port, "with TLS support" if TLS else "")
bridge = Bridges(host, port)
logger.info("Starting TCP bridge for address '%s' port %d %s", host, int(port), "with TLS support" if TLS else "")
bridge = Bridges(name, host, port, topic, direction, localprefix, remoteprefix)
thread = threading.Thread(target=bridge.run)
thread.start()
return bridge
Expand Down
64 changes: 59 additions & 5 deletions interoperability/mqtt/brokers/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def setup_persistence(persistence_filename):
def process_config(config):
options = {}
servers_to_create = []
bridges_to_create = []
lineno = 0
while lineno < len(config):
curline = config[lineno].strip()
Expand Down Expand Up @@ -81,7 +82,8 @@ def process_config(config):
if len(words) >= 4:
if words[3] in ["mqttsn", "http"]:
protocol = words[3]
while lineno < len(config) and not config[lineno].strip().startswith("listener"):
while lineno < len(config) and not (config[lineno].strip().startswith("listener") or
config[lineno].strip().startswith("connection") ):
curline = config[lineno].strip()
lineno += 1
if curline.startswith('#') or len(curline) == 0:
Expand All @@ -104,8 +106,54 @@ def process_config(config):
elif protocol == "http":
servers_to_create.append((HTTPListeners, {"host":bind_address, "port":port, "TLS":TLS, "cert_reqs":cert_reqs,
"ca_certs":ca_certs, "certfile":certfile, "keyfile":keyfile}))
elif words[0] == "connection":
# Bridge connection, pull out address, protocol and topic lines.
bridgename="local"
address = "localhost"
host = "localhost"
port = "1883"
protocol = "mqtt"
topic = "+"
direction = "both"
localprefix = ""
remoteprefix = ""
if len(words) > 1:
bridgename = words[1]
while lineno < len(config) and not (config[lineno].strip().startswith("listener") or
config[lineno].strip().startswith("connection")) :
curline = config[lineno].strip()
lineno+=1
if curline.startswith('#') or len(curline) == 0:
continue
words = curline.split()
if words[0] == "protocol":
protocol = words[1]
elif words[0] == "address":
address = words[1]
parts = address.split(":")
host = parts[0]
if len(parts)>1:
port = int(parts[1])
elif words[0] == "topic":
if len(words) > 1:
topic = words[1]
if len(words) > 2:
direction = words[2]
if len(words) > 3:
localprefix = words[3]
if len(words) > 4:
remoteprefix = words[4]
if protocol == "mqtt":
bridges_to_create.append((TCPBridges, {"name":bridgename,
"host":host,
"port":port,
"topic":topic,
"direction":direction,
"localprefix":localprefix,
"remoteprefix":remoteprefix}))

servers_to_create[-1][1]["serve_forever"] = True
return servers_to_create, options
return servers_to_create, options, bridges_to_create

def run(config=None):
global logger, broker3, broker5, brokerSN, server
Expand All @@ -122,7 +170,7 @@ def run(config=None):

options = {}
if config != None:
servers_to_create, options = process_config(config)
servers_to_create, options, bridges_to_create = process_config(config)

broker3 = MQTTV3Brokers(options=options, lock=lock, sharedData=sharedData)

Expand All @@ -139,17 +187,23 @@ def run(config=None):
brokerSN.setBroker5(broker5)

servers = []
bridges = []
UDPListeners.setBroker(brokerSN)
TCPListeners.setBrokers(broker3, broker5)
HTTPListeners.setBrokers(broker3, broker5, brokerSN)
HTTPListeners.setSharedData(lock, sharedData)

try:
if config == None:
#TCPBridges.setBroker5(broker5)
#TCPBridges.create(1886)
# TCPBridges.setBroker5(broker5)
# TCPBridges.create("bridge",1883,"172.16.0.4")
servers.append(TCPListeners.create(1883, serve_forever=True))
else:
logger.debug("Starting bridges")
for bridge in bridges_to_create:
bridge[0].setBroker5(broker5)
bridges.append(bridge[0].create(**bridge[1]))
logger.debug("Starting servers")
for server in servers_to_create:
servers.append(server[0].create(**server[1]))
except KeyboardInterrupt:
Expand Down
6 changes: 6 additions & 0 deletions interoperability/udp.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ loglevel debug
listener 1883

listener 1883 INADDR_ANY mqttsn

#connection loopback
# protocol mqtt
# address localhost
# topic + both "" ""

0 comments on commit 5b8ad8d

Please sign in to comment.