Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/hlku/OpenADM into hlku-ma…
Browse files Browse the repository at this point in the history
…ster
  • Loading branch information
waynelkh committed May 27, 2016
2 parents 1d8b1e4 + fea10a8 commit ce62d80
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 86 deletions.
4 changes: 2 additions & 2 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ class Hello:
class Echo:
def __init__(self,core,parm):
core.registerEventHandler("saySomething", self.repeat)
core.registerRestApi("echo", self.restHandler)
core.registerURLApi("echo", self.handler)
def repeat(self,event):
self.event = event
print self.event
def restHandler(self):
def handler(self):
return self.event
```

Expand Down
4 changes: 2 additions & 2 deletions core/etc/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
"port":"8080"
},
"NWInfo": {
"controller_ip": "localhost",
"controller_port": "6633",
"controller_ip": "127.0.0.1",
"controller_port": "6653",
"controller_name": "foobar",
"adapter_port": "8080"
}
Expand Down
8 changes: 4 additions & 4 deletions core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

setup(
name = 'omniui',
version = '1.0.0',
version = '1.0.1',
description = 'A Diagnosis, Analytic and Management Framework for SDN',
author = 'D-Link NCTU Joint Research Center',
url = 'https://github.com/dlinknctu/omniui',
install_requires = ['Flask==0.10.1', 'Flask_Cors', 'gevent', 'pymongo'],
url = 'https://github.com/dlinknctu/OpenADM',
install_requires = ['Flask==0.10.1', 'Flask_Cors', 'gevent', 'pymongo', 'flask_socketio'],
packages = ['src', 'src.floodlight_modules','src.trema_modules','src.pox_modules', 'test'],
data_files = [('etc', ['etc/config.json'])],
entry_points = {
'console_scripts': ['omniui=src.core:main'],
},
test_suite = 'test'
)
)
128 changes: 88 additions & 40 deletions core/src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import signal
import gevent
import urllib2
from gevent.wsgi import WSGIServer
from gevent.queue import Queue
from importlib import import_module
import logging
import threading
from threading import Thread
from flask import Flask, Response, request, abort, render_template
from flask import Flask, Response, request, abort, copy_current_request_context
from flask_cors import *
from flask_socketio import SocketIO, emit, send, disconnect
from pkg_resources import Requirement, resource_filename
Expand All @@ -22,6 +21,7 @@
app.config['SECRET_KEY'] = 'omniui'
socketio = SocketIO(app)
subscriptions = []
client_alive = {}
logger = logging.getLogger(__name__)

# define module state enum
Expand All @@ -47,9 +47,6 @@ def __init__(self, data):
self.data: 'data'
}

self.eventName = eventName
self.handler = handler

class EventHandler:
def __init__(self,eventName,handler):
self.eventName = eventName
Expand All @@ -62,8 +59,8 @@ def __init__(self):
self.threads = []
self.events = []
self.ipcHandlers = {}
global restHandlers # Necessary for bottle to access restHandlers
restHandlers = {}
global urlHandlers # Necessary for bottle to access urlHandlers
urlHandlers = {}
global sseHandlers
sseHandlers = {}

Expand Down Expand Up @@ -146,12 +143,44 @@ def notify(e, d, q=None):

@socketio.on('connect', namespace='/websocket')
def do_connect():
print('Client connected')

@socketio.on('leave', namespace='/websocket')
def gen():
q = Queue()
subscriptions.append(q)
for e in sseHandlers.keys():
rs = sseHandlers[e]('debut')
if rs is not None:
gevent.spawn(notify, e, rs, q)
try:
while True:
result = q.get()
ev = ServerSentEvent(result)
yield ev
except GeneratorExit:
subscriptions.remove(q)
except StopIteration:
print('get subscription error.')
subscriptions.remove(q)

@copy_current_request_context
def response(sid):
responses = gen()
client_alive[sid] = True
responses = gen()
while client_alive[sid]:
r = responses.next()
emit(r.event, {'data': r.data })
responses.close()

sid = request.sid
print('Client ' + request.remote_addr + '(sid:' + str(sid) + ') connected')
gevent.spawn(response, sid)

@socketio.on('disconnect', namespace='/websocket')
def do_disconnect():
print('Client disconnected')
disconnect()
sid = request.sid
if client_alive.get(sid, False):
client_alive[sid] = False
print('Client ' + request.remote_addr + '(sid:' + str(sid) + ') disconnected')

@socketio.on('debug', namespace='/websocket')
def debug():
Expand All @@ -177,24 +206,6 @@ def publish(event):

return 'OK'

# For clients to subscribe server-sent events
@socketio.on('subscribe', namespace='/websocket')
def subscribe():
def gen():
q = Queue()
subscriptions.append(q)
for e in sseHandlers.keys():
rs = sseHandlers[e]('debut')
if rs is not None:
gevent.spawn(notify, e, rs, q)
for result in q :
ev = ServerSentEvent(result)
yield ev
if q.empty(): break
subscriptions.remove(q)
for response in gen():
emit(response.event, {'data': response.data })

# handler for feature request
@socketio.on('feature', namespace='/websocket')
def featureRequest():
Expand All @@ -203,7 +214,10 @@ def featureRequest():

@socketio.on('setting_controller', namespace='/websocket')
def settingControllerRequest(message):
settings = message['data']
settings = message.get('data', None)
if settings is None:
emit('setting_controller', {'data', 'setting controller error.'} )
return

controller_url = settings['controllerURL']
core_url = settings['coreURL']
Expand All @@ -217,21 +231,55 @@ def settingControllerRequest(message):

emit('setting_controller', {'data' : result})

# handler other rest handlers
# handler other websocket handlers
@socketio.on('other', namespace='/websocket')
def topLevelRoute(message):
url = message['url']
if url in restHandlers:
emit('other', {'data' : restHandlers[url](request)} )
else:
url = message.get('url', None)
req = message.get('request', None)
result = handleRoute(url, rest=False, req=req)
if result is None:
emit('other', {'data' : "Not found: '/%s'" % url })
else:
emit('other', {'data' : result} )

# general top level rest handler
@app.route('/<url>', methods=['GET', 'POST', 'OPTIONS', 'PUT'])
@cross_origin()
def topLevelRoute(url):
result = handleRoute(url)
if result is None:
abort(404, "Not found: '/%s'" % url)
else:
return result

# general second level rest handler
@app.route('/<prefix>/<suffix>', methods=['GET', 'POST', 'OPTIONS', 'PUT'])
@cross_origin()
def secondLevelRoute(prefix, suffix):
url = prefix + '/' + suffix
result = handleRoute(url)
if result is None:
abort(404, "Not found: '/%s'" % url)
else:
return result

def handleRoute(url, rest=True, req=None):
if url is None:
return None
if url in urlHandlers:
if rest:
return urlHandlers[url](request)
else:
return urlHandlers[url](req)
else:
return None

app.debug = True
app.debug = False
socketio.run(app, host=handleIP, port=int(handlePort))

#Register WEBSOCKET API
def registerRestApi(self, requestName, handler):
restHandlers[requestName] = handler
#Register WEBSOCKET and RESTful API
def registerURLApi(self, requestName, handler):
urlHandlers[requestName] = handler

# Register SSE
def registerSSEHandler(self, sseName, handler):
Expand Down
2 changes: 1 addition & 1 deletion core/src/floodlight_modules/flow_mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self,core,parm):
# register rest api
core.registerRestApi("flowmod", self.flowHandler)

def flowHandler(self,data):
def flowHandler(self, body):
# return JSONP format
body = json.dumps(data.get_json(force=True))
conn = httplib.HTTPConnection(self.IP, self.Port)
Expand Down
23 changes: 13 additions & 10 deletions core/src/floodlight_modules/nwinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ def __registration(self, core):
core.registerSSEHandler('port', self.portHandler)
core.registerSSEHandler('flow', self.flowHandler)

# RESTful API for WebUI
core.registerRestApi('port', self.getPortCounter)
core.registerRestApi('flow', self.getAllFlows)
core.registerRestApi('flow/top', self.getTopFlows)
core.registerRestApi('reset_datastore', self.resetDatastore)
# websocket API for WebUI
core.registerURLApi('port', self.getPortCounter)
core.registerURLApi('flow', self.getAllFlows)
core.registerURLApi('flow/top', self.getTopFlows)
core.registerURLApi('reset_datastore', self.resetDatastore)

# IPC API for other modules
core.registerIPC('getAllFlows', self.getAllFlows)

logger.info('Handlers and RESTful APIs registered')

Expand Down Expand Up @@ -360,7 +363,7 @@ def flowHandler(self, raw):
return None


"""RESTful API handler
"""websocket API handler
"""

def getPortCounter(self, req):
Expand All @@ -372,8 +375,8 @@ def getPortCounter(self, req):
/port?port=<sw_port>
/port
"""
dpid = req.args.get('dpid')
port = req.args.get('port')
dpid = req.get('dpid', None)
port = req.get('port', None)
if dpid != None and port != None:
key = (dpid, port)
try:
Expand Down Expand Up @@ -405,7 +408,7 @@ def getAllFlows(self, req):
/flow?dpid=<sw_dpid>
/flow
"""
dpid = req.args.get('dpid')
dpid = req.get('dpid', None)
if dpid is not None:
try:
result = json.dumps({'dpid': dpid,
Expand All @@ -427,7 +430,7 @@ def getTopFlows(self, req):
/flow/top?dpid=<sw_dpid>
/flow/top
"""
dpid = req.args.get('dpid')
dpid = req.get('dpid', None)
if dpid is not None:
try:
result = json.dumps({'dpid': dpid,
Expand Down
10 changes: 4 additions & 6 deletions core/src/floodlight_modules/uipusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class UIPusher:
def __init__(self,core,parm):
# register event handler
core.registerEventHandler("controlleradapter", self.controllerHandler)
# register rest api
core.registerRestApi("info/topology", self.topologyHandler)
core.registerRestApi("stat", self.statisticHandler)
# register websocket api
core.registerURLApi("info/topology", self.topologyHandler)
core.registerURLApi("stat", self.statisticHandler)
# save core for ipc use
self.core = core

Expand Down Expand Up @@ -113,12 +113,10 @@ def writeToDB(self):
key['duration'] = self.tmpcache[hashkey][3]
self.db[self.intervalList[0]].save(key)

def statisticHandler(self,request):
def statisticHandler(self, date):

if self.enable == False:
return "Time\t1\n"
#parse json data
data = request.get_json(force=True)
#declare variable
multiGroup = {}
output = "Time"
Expand Down
7 changes: 3 additions & 4 deletions core/src/pox_modules/flow_mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ def __init__(self,core,parm):
'Content-type': 'application/json',
'Accept': 'application/json',
}
# register rest api
# register websocket api
self.Url="http://"+self.IP+":"+self.Port+"/wm/omniui/add/json"
core.registerRestApi("flowmod", self.flowHandler)
core.registerURLApi("flowmod", self.flowHandler)

def flowHandler(self,data):
def flowHandler(self,body):
# return JSONP format
body = json.dumps(data.get_json(force=True))
conn = httplib.HTTPConnection(self.IP,self.Port)
conn.request('POST',self.path,body,self.headers)
response = conn.getresponse()
Expand Down
10 changes: 4 additions & 6 deletions core/src/pox_modules/uipusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class UIPusher:
def __init__(self,core,parm):
# register event handler
core.registerEventHandler("controlleradapter", self.controllerHandler)
# register rest api
core.registerRestApi("info/topology", self.topologyHandler)
core.registerRestApi("stat", self.statisticHandler)
# register websocket api
core.registerURLApi("info/topology", self.topologyHandler)
core.registerURLApi("stat", self.statisticHandler)
# save core for ipc use
self.core = core

Expand Down Expand Up @@ -113,12 +113,10 @@ def writeToDB(self):
key['duration'] = self.tmpcache[hashkey][3]
self.db[self.intervalList[0]].save(key)

def statisticHandler(self,request):
def statisticHandler(self,data):

if self.enable == False:
return "Time\t1\n"
#parse json data
data = request.get_json(force=True)
#declare variable
multiGroup = {}
output = "Time"
Expand Down
Loading

0 comments on commit ce62d80

Please sign in to comment.