diff --git a/core/README.md b/core/README.md index dc88f4d..537f0d3 100644 --- a/core/README.md +++ b/core/README.md @@ -77,11 +77,11 @@ class Hello: class Echo: def __init__(self,core,parm): core.registerEventHandler("saySomething", self.repeat) - core.registerRestApi("echo", self.restHandler) + core.registerURLApi("echo", self.handler) def repeat(self,event): self.event = event print self.event - def restHandler(self): + def handler(self): return self.event ``` diff --git a/core/etc/config.json b/core/etc/config.json index 1ecc27c..8dea080 100644 --- a/core/etc/config.json +++ b/core/etc/config.json @@ -10,8 +10,8 @@ "port":"8080" }, "NWInfo": { - "controller_ip": "localhost", - "controller_port": "6633", + "controller_ip": "127.0.0.1", + "controller_port": "6653", "controller_name": "foobar", "adapter_port": "8080" } diff --git a/core/setup.py b/core/setup.py index 1d64ccc..72db02d 100644 --- a/core/setup.py +++ b/core/setup.py @@ -2,15 +2,15 @@ setup( name = 'omniui', - version = '1.0.0', + version = '1.0.1', description = 'A Diagnosis, Analytic and Management Framework for SDN', author = 'D-Link NCTU Joint Research Center', - url = 'https://github.com/dlinknctu/omniui', - install_requires = ['Flask==0.10.1', 'Flask_Cors', 'gevent', 'pymongo'], + url = 'https://github.com/dlinknctu/OpenADM', + install_requires = ['Flask==0.10.1', 'Flask_Cors', 'gevent', 'pymongo', 'flask_socketio'], packages = ['src', 'src.floodlight_modules','src.trema_modules','src.pox_modules', 'test'], data_files = [('etc', ['etc/config.json'])], entry_points = { 'console_scripts': ['omniui=src.core:main'], }, test_suite = 'test' -) \ No newline at end of file +) diff --git a/core/src/core.py b/core/src/core.py index ce2a54e..b30879b 100644 --- a/core/src/core.py +++ b/core/src/core.py @@ -5,13 +5,12 @@ import signal import gevent import urllib2 -from gevent.wsgi import WSGIServer from gevent.queue import Queue from importlib import import_module import logging import threading from threading import Thread -from flask import Flask, Response, request, abort, render_template +from flask import Flask, Response, request, abort, copy_current_request_context from flask_cors import * from flask_socketio import SocketIO, emit, send, disconnect from pkg_resources import Requirement, resource_filename @@ -22,6 +21,7 @@ app.config['SECRET_KEY'] = 'omniui' socketio = SocketIO(app) subscriptions = [] +client_alive = {} logger = logging.getLogger(__name__) # define module state enum @@ -47,9 +47,6 @@ def __init__(self, data): self.data: 'data' } - self.eventName = eventName - self.handler = handler - class EventHandler: def __init__(self,eventName,handler): self.eventName = eventName @@ -62,8 +59,8 @@ def __init__(self): self.threads = [] self.events = [] self.ipcHandlers = {} - global restHandlers # Necessary for bottle to access restHandlers - restHandlers = {} + global urlHandlers # Necessary for bottle to access urlHandlers + urlHandlers = {} global sseHandlers sseHandlers = {} @@ -146,12 +143,44 @@ def notify(e, d, q=None): @socketio.on('connect', namespace='/websocket') def do_connect(): - print('Client connected') - - @socketio.on('leave', namespace='/websocket') + def gen(): + q = Queue() + subscriptions.append(q) + for e in sseHandlers.keys(): + rs = sseHandlers[e]('debut') + if rs is not None: + gevent.spawn(notify, e, rs, q) + try: + while True: + result = q.get() + ev = ServerSentEvent(result) + yield ev + except GeneratorExit: + subscriptions.remove(q) + except StopIteration: + print('get subscription error.') + subscriptions.remove(q) + + @copy_current_request_context + def response(sid): + responses = gen() + client_alive[sid] = True + responses = gen() + while client_alive[sid]: + r = responses.next() + emit(r.event, {'data': r.data }) + responses.close() + + sid = request.sid + print('Client ' + request.remote_addr + '(sid:' + str(sid) + ') connected') + gevent.spawn(response, sid) + + @socketio.on('disconnect', namespace='/websocket') def do_disconnect(): - print('Client disconnected') - disconnect() + sid = request.sid + if client_alive.get(sid, False): + client_alive[sid] = False + print('Client ' + request.remote_addr + '(sid:' + str(sid) + ') disconnected') @socketio.on('debug', namespace='/websocket') def debug(): @@ -177,24 +206,6 @@ def publish(event): return 'OK' - # For clients to subscribe server-sent events - @socketio.on('subscribe', namespace='/websocket') - def subscribe(): - def gen(): - q = Queue() - subscriptions.append(q) - for e in sseHandlers.keys(): - rs = sseHandlers[e]('debut') - if rs is not None: - gevent.spawn(notify, e, rs, q) - for result in q : - ev = ServerSentEvent(result) - yield ev - if q.empty(): break - subscriptions.remove(q) - for response in gen(): - emit(response.event, {'data': response.data }) - # handler for feature request @socketio.on('feature', namespace='/websocket') def featureRequest(): @@ -203,7 +214,10 @@ def featureRequest(): @socketio.on('setting_controller', namespace='/websocket') def settingControllerRequest(message): - settings = message['data'] + settings = message.get('data', None) + if settings is None: + emit('setting_controller', {'data', 'setting controller error.'} ) + return controller_url = settings['controllerURL'] core_url = settings['coreURL'] @@ -217,21 +231,55 @@ def settingControllerRequest(message): emit('setting_controller', {'data' : result}) - # handler other rest handlers + # handler other websocket handlers @socketio.on('other', namespace='/websocket') def topLevelRoute(message): - url = message['url'] - if url in restHandlers: - emit('other', {'data' : restHandlers[url](request)} ) - else: + url = message.get('url', None) + req = message.get('request', None) + result = handleRoute(url, rest=False, req=req) + if result is None: emit('other', {'data' : "Not found: '/%s'" % url }) + else: + emit('other', {'data' : result} ) + + # general top level rest handler + @app.route('/', methods=['GET', 'POST', 'OPTIONS', 'PUT']) + @cross_origin() + def topLevelRoute(url): + result = handleRoute(url) + if result is None: + abort(404, "Not found: '/%s'" % url) + else: + return result + + # general second level rest handler + @app.route('//', methods=['GET', 'POST', 'OPTIONS', 'PUT']) + @cross_origin() + def secondLevelRoute(prefix, suffix): + url = prefix + '/' + suffix + result = handleRoute(url) + if result is None: + abort(404, "Not found: '/%s'" % url) + else: + return result + + def handleRoute(url, rest=True, req=None): + if url is None: + return None + if url in urlHandlers: + if rest: + return urlHandlers[url](request) + else: + return urlHandlers[url](req) + else: + return None - app.debug = True + app.debug = False socketio.run(app, host=handleIP, port=int(handlePort)) - #Register WEBSOCKET API - def registerRestApi(self, requestName, handler): - restHandlers[requestName] = handler + #Register WEBSOCKET and RESTful API + def registerURLApi(self, requestName, handler): + urlHandlers[requestName] = handler # Register SSE def registerSSEHandler(self, sseName, handler): diff --git a/core/src/floodlight_modules/flow_mod.py b/core/src/floodlight_modules/flow_mod.py index 1db186a..f720196 100644 --- a/core/src/floodlight_modules/flow_mod.py +++ b/core/src/floodlight_modules/flow_mod.py @@ -14,7 +14,7 @@ def __init__(self,core,parm): # register rest api core.registerRestApi("flowmod", self.flowHandler) - def flowHandler(self,data): + def flowHandler(self, body): # return JSONP format body = json.dumps(data.get_json(force=True)) conn = httplib.HTTPConnection(self.IP, self.Port) diff --git a/core/src/floodlight_modules/nwinfo.py b/core/src/floodlight_modules/nwinfo.py index db9feb0..595cb6b 100644 --- a/core/src/floodlight_modules/nwinfo.py +++ b/core/src/floodlight_modules/nwinfo.py @@ -49,11 +49,14 @@ def __registration(self, core): core.registerSSEHandler('port', self.portHandler) core.registerSSEHandler('flow', self.flowHandler) - # RESTful API for WebUI - core.registerRestApi('port', self.getPortCounter) - core.registerRestApi('flow', self.getAllFlows) - core.registerRestApi('flow/top', self.getTopFlows) - core.registerRestApi('reset_datastore', self.resetDatastore) + # websocket API for WebUI + core.registerURLApi('port', self.getPortCounter) + core.registerURLApi('flow', self.getAllFlows) + core.registerURLApi('flow/top', self.getTopFlows) + core.registerURLApi('reset_datastore', self.resetDatastore) + + # IPC API for other modules + core.registerIPC('getAllFlows', self.getAllFlows) logger.info('Handlers and RESTful APIs registered') @@ -360,7 +363,7 @@ def flowHandler(self, raw): return None - """RESTful API handler + """websocket API handler """ def getPortCounter(self, req): @@ -372,8 +375,8 @@ def getPortCounter(self, req): /port?port= /port """ - dpid = req.args.get('dpid') - port = req.args.get('port') + dpid = req.get('dpid', None) + port = req.get('port', None) if dpid != None and port != None: key = (dpid, port) try: @@ -405,7 +408,7 @@ def getAllFlows(self, req): /flow?dpid= /flow """ - dpid = req.args.get('dpid') + dpid = req.get('dpid', None) if dpid is not None: try: result = json.dumps({'dpid': dpid, @@ -427,7 +430,7 @@ def getTopFlows(self, req): /flow/top?dpid= /flow/top """ - dpid = req.args.get('dpid') + dpid = req.get('dpid', None) if dpid is not None: try: result = json.dumps({'dpid': dpid, diff --git a/core/src/floodlight_modules/uipusher.py b/core/src/floodlight_modules/uipusher.py index d0d9775..41b87c6 100644 --- a/core/src/floodlight_modules/uipusher.py +++ b/core/src/floodlight_modules/uipusher.py @@ -11,9 +11,9 @@ class UIPusher: def __init__(self,core,parm): # register event handler core.registerEventHandler("controlleradapter", self.controllerHandler) - # register rest api - core.registerRestApi("info/topology", self.topologyHandler) - core.registerRestApi("stat", self.statisticHandler) + # register websocket api + core.registerURLApi("info/topology", self.topologyHandler) + core.registerURLApi("stat", self.statisticHandler) # save core for ipc use self.core = core @@ -113,12 +113,10 @@ def writeToDB(self): key['duration'] = self.tmpcache[hashkey][3] self.db[self.intervalList[0]].save(key) - def statisticHandler(self,request): + def statisticHandler(self, date): if self.enable == False: return "Time\t1\n" - #parse json data - data = request.get_json(force=True) #declare variable multiGroup = {} output = "Time" diff --git a/core/src/pox_modules/flow_mod.py b/core/src/pox_modules/flow_mod.py index a8b41e2..34bd169 100644 --- a/core/src/pox_modules/flow_mod.py +++ b/core/src/pox_modules/flow_mod.py @@ -12,13 +12,12 @@ def __init__(self,core,parm): 'Content-type': 'application/json', 'Accept': 'application/json', } - # register rest api + # register websocket api self.Url="http://"+self.IP+":"+self.Port+"/wm/omniui/add/json" - core.registerRestApi("flowmod", self.flowHandler) + core.registerURLApi("flowmod", self.flowHandler) - def flowHandler(self,data): + def flowHandler(self,body): # return JSONP format - body = json.dumps(data.get_json(force=True)) conn = httplib.HTTPConnection(self.IP,self.Port) conn.request('POST',self.path,body,self.headers) response = conn.getresponse() diff --git a/core/src/pox_modules/uipusher.py b/core/src/pox_modules/uipusher.py index d0d9775..49c5d98 100644 --- a/core/src/pox_modules/uipusher.py +++ b/core/src/pox_modules/uipusher.py @@ -11,9 +11,9 @@ class UIPusher: def __init__(self,core,parm): # register event handler core.registerEventHandler("controlleradapter", self.controllerHandler) - # register rest api - core.registerRestApi("info/topology", self.topologyHandler) - core.registerRestApi("stat", self.statisticHandler) + # register websocket api + core.registerURLApi("info/topology", self.topologyHandler) + core.registerURLApi("stat", self.statisticHandler) # save core for ipc use self.core = core @@ -113,12 +113,10 @@ def writeToDB(self): key['duration'] = self.tmpcache[hashkey][3] self.db[self.intervalList[0]].save(key) - def statisticHandler(self,request): + def statisticHandler(self,data): if self.enable == False: return "Time\t1\n" - #parse json data - data = request.get_json(force=True) #declare variable multiGroup = {} output = "Time" diff --git a/core/src/trema_modules/flow_mod.py b/core/src/trema_modules/flow_mod.py index 809f27b..5d864a1 100644 --- a/core/src/trema_modules/flow_mod.py +++ b/core/src/trema_modules/flow_mod.py @@ -15,15 +15,14 @@ def __init__(self,core,parm): self.IP = parm['ip'] if parm.has_key("port"): self.Port = str(parm['port']) - # register rest api + # register websocket api self.addUrl="/uds/add" self.delUrl="/uds/del" - core.registerRestApi("uds/add", self.udsAddHandler) - core.registerRestApi("uds/del", self.udsDelHandler) + core.registerURLApi("uds/add", self.udsAddHandler) + core.registerURLApi("uds/del", self.udsDelHandler) - def udsAddHandler(self,request): + def udsAddHandler(self,entity): url = self.addUrl - entity = request.get_json(force=True) if not entity: print "No get data from WEBUI" return @@ -45,9 +44,8 @@ def udsAddHandler(self,request): msg = json.dumps(ret) return msg - def udsDelHandler(self,request): + def udsDelHandler(self,entity): url = self.delUrl - entity = request.get_json(force=True) if not entity: print "No get data from WEBUI" return diff --git a/core/src/trema_modules/uipusher.py b/core/src/trema_modules/uipusher.py index c46eda7..d5db979 100644 --- a/core/src/trema_modules/uipusher.py +++ b/core/src/trema_modules/uipusher.py @@ -13,7 +13,7 @@ def __init__(self,core,parm): core.registerEventHandler("controlleradapter", self.controllerHandler) # register sse handler core.registerSSEHandler('updateuds', self.udsHandler) - core.registerRestApi("stat", self.statisticHandler) + core.registerURLApi("stat", self.statisticHandler) # save core for ipc use self.core = core @@ -112,12 +112,10 @@ def writeToDB(self): key['duration'] = self.tmpcache[hashkey][3] self.db[self.intervalList[0]].save(key) - def statisticHandler(self,request): + def statisticHandler(self, data): if self.enable == False: return "Time\t1\n" - #parse json data - data = request.get_json(force=True) #declare variable multiGroup = {} output = "Time"