diff --git a/adapter/ryu/README.md b/adapter/ryu/README.md index ca12acb..e0f1828 100644 --- a/adapter/ryu/README.md +++ b/adapter/ryu/README.md @@ -5,7 +5,9 @@ Ryu adapter module for OmniUI ###Installation### 1. Download the Ryu Controller $ `git clone git://github.com/osrg/ryu.git` -$ `cd ryu; sudo python ./setup.py install` +$ `cd OpenADM/adapter/ryu; cp event.py ~/ryu/ryu/topology` +$ `cp switches.py ~/ryu/ryu/topology` +$ `cd ~/ryu; sudo python ./setup.py install` ###Execution### **Ryu 1.0** diff --git a/adapter/ryu/event.py b/adapter/ryu/event.py new file mode 100644 index 0000000..148d31e --- /dev/null +++ b/adapter/ryu/event.py @@ -0,0 +1,178 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from ryu.controller import handler +from ryu.controller import event + +LOG = logging.getLogger(__name__) + + +class EventSwitchBase(event.EventBase): + def __init__(self, switch): + super(EventSwitchBase, self).__init__() + self.switch = switch + + def __str__(self): + return '%s' % \ + (self.__class__.__name__, + self.switch.dp.id, len(self.switch.ports)) + + +class EventSwitchEnter(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchEnter, self).__init__(switch) + + +class EventSwitchLeave(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchLeave, self).__init__(switch) + + +class EventSwitchReconnected(EventSwitchBase): + def __init__(self, switch): + super(EventSwitchReconnected, self).__init__(switch) + + +class EventPortBase(event.EventBase): + def __init__(self, port): + super(EventPortBase, self).__init__() + self.port = port + + def __str__(self): + return '%s<%s>' % (self.__class__.__name__, self.port) + + +class EventPortAdd(EventPortBase): + def __init__(self, port): + super(EventPortAdd, self).__init__(port) + + +class EventPortDelete(EventPortBase): + def __init__(self, port): + super(EventPortDelete, self).__init__(port) + + +class EventPortModify(EventPortBase): + def __init__(self, port): + super(EventPortModify, self).__init__(port) + + +class EventSwitchRequest(event.EventRequestBase): + # If dpid is None, reply all list + def __init__(self, dpid=None): + super(EventSwitchRequest, self).__init__() + self.dst = 'switches' + self.dpid = dpid + + def __str__(self): + return 'EventSwitchRequest' % \ + (self.src, self.dpid) + + +class EventSwitchReply(event.EventReplyBase): + def __init__(self, dst, switches): + super(EventSwitchReply, self).__init__(dst) + self.switches = switches + + def __str__(self): + return 'EventSwitchReply' % \ + (self.dst, self.switches) + + +class EventLinkBase(event.EventBase): + def __init__(self, link): + super(EventLinkBase, self).__init__() + self.link = link + + def __str__(self): + return '%s<%s>' % (self.__class__.__name__, self.link) + + +class EventLinkAdd(EventLinkBase): + def __init__(self, link): + super(EventLinkAdd, self).__init__(link) + + +class EventLinkDelete(EventLinkBase): + def __init__(self, link): + super(EventLinkDelete, self).__init__(link) + + +class EventLinkRequest(event.EventRequestBase): + # If dpid is None, reply all list + def __init__(self, dpid=None): + super(EventLinkRequest, self).__init__() + self.dst = 'switches' + self.dpid = dpid + + def __str__(self): + return 'EventLinkRequest' % \ + (self.src, self.dpid) + + +class EventLinkReply(event.EventReplyBase): + def __init__(self, dst, dpid, links): + super(EventLinkReply, self).__init__(dst) + self.dpid = dpid + self.links = links + + def __str__(self): + return 'EventLinkReply' % \ + (self.dst, self.dpid, len(self.links)) + + +class EventHostRequest(event.EventRequestBase): + # if dpid is None, replay all hosts + def __init__(self, dpid=None): + super(EventHostRequest, self).__init__() + self.dst = 'switches' + self.dpid = dpid + + def __str__(self): + return 'EventHostRequest' % \ + (self.src, self.dpid) + + +class EventHostReply(event.EventReplyBase): + def __init__(self, dst, dpid, hosts): + super(EventHostReply, self).__init__(dst) + self.dpid = dpid + self.hosts = hosts + + def __str__(self): + return 'EventHostReply' % \ + (self.dst, self.dpid, len(self.hosts)) + + +class EventHostBase(event.EventBase): + def __init__(self, host): + super(EventHostBase, self).__init__() + self.host = host + + def __str__(self): + return '%s<%s>' % (self.__class__.__name__, self.host) + + +class EventHostAdd(EventHostBase): + def __init__(self, host): + super(EventHostAdd, self).__init__(host) + + +class EventHostDelete(EventHostBase): + def __init__(self, host): + super(EventHostDelete, self).__init__(host) + +handler.register_service('ryu.topology.switches') diff --git a/adapter/ryu/omniui/omniui.py b/adapter/ryu/omniui/omniui.py index e071ed2..7087149 100644 --- a/adapter/ryu/omniui/omniui.py +++ b/adapter/ryu/omniui/omniui.py @@ -7,7 +7,7 @@ from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller import dpset -from ryu.controller.handler import MAIN_DISPATCHER +from ryu.controller.handler import MAIN_DISPATCHER, CONFIG_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.lib import dpid as dpid_lib from ryu.ofproto import ofproto_v1_0 @@ -18,8 +18,24 @@ from ryu.lib import ofctl_v1_3 from ryu.lib.dpid import dpid_to_str from ryu.topology.api import get_switch, get_link +import requests +import subprocess +from operator import attrgetter +from ryu.ofproto.ether import ETH_TYPE_LLDP, ETH_TYPE_IPV6 +from ryu.lib import hub +from ryu.lib.packet import * +from ryu.topology import event, switches + +global controllerName +controllerName = 'DEFAULT' +global coreIP +coreIP = '127.0.0.1' +global corePort +corePort = '5567' class OmniUI(app_manager.RyuApp): + OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION, ofproto_v1_3.OFP_VERSION] + _EVENTS = [event.EventPortAdd] _CONTEXTS = { 'wsgi': WSGIApplication, 'dpset': dpset.DPSet @@ -33,18 +49,19 @@ def __init__(self, *args, **kwargs): self.data['dpset'] = kwargs['dpset'] self.data['waiters'] = self.waiters self.data['omniui'] = self + self.port_to_feature = {} + self.datapaths = {} + self.monitor_thread = hub.spawn(self.monitor) + self.dpset = self.data['dpset'] mapper = wsgi.mapper wsgi.registory['RestController'] = self.data - mapper.connect('omniui', '/wm/omniui/switch/json', - controller=RestController, action='switches', - conditions=dict(method=['GET'])) - mapper.connect('omniui', '/wm/omniui/link/json', - controller=RestController, action='links', - conditions=dict(method=['GET'])) mapper.connect('omniui', '/wm/omniui/add/json', controller=RestController, action='mod_flow_entry', conditions=dict(method=['POST'])) + mapper.connect('omniui', '/wm/omniui/controller/name', + controller=RestController, action='get_controller_name', + conditions=dict(method=['POST'])) @set_ev_cls([ofp_event.EventOFPFlowStatsReply, ofp_event.EventOFPPortStatsReply], MAIN_DISPATCHER) def stats_reply_handler(self, ev): @@ -71,19 +88,430 @@ def stats_reply_handler(self, ev): del self.waiters[dp.id][msg.xid] lock.set() -class RestController(ControllerBase): - def __init__(self, req, link, data, **config): - super(RestController, self).__init__(req, link, data, **config) - self.omniui = data['omniui'] - self.dpset = data['dpset'] - self.waiters = data['waiters'] + # + # try post json to core + # + def post_json_to_core(self, url, data): + try: + resp = requests.post(url, data = data, headers = {'Content-Type': 'application/json'}) + print resp + except Exception, e: + print(str(e)) + + # + # handle add switch event + # + @set_ev_cls(event.EventSwitchEnter) + def add_device_handler(self, ev): + switch = ev.switch + print '*****add device*****' + + # format dpid + temp = "%016x" %switch.dp.id + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + + addDevice = { + 'dpid': ''.join(temp), + 'controller': controllerName + } + + self.port_to_feature[addDevice['dpid']] = {} + + print json.dumps(addDevice) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/adddevice" + self.post_json_to_core(tmpIP, json.dumps(addDevice)) + + # send add port event + for port in switch.ports: + n_ev = event.EventPortAdd(port) + self.send_event_to_observers(n_ev) + + # + # handle delete switch event + # + @set_ev_cls(event.EventSwitchLeave) + def del_device_handler(self, ev): + switch = ev.switch + print '*****del device*****' + + # format dpid + temp = "%016x" %switch.dp.id + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + + delDevice = { + 'dpid': ''.join(temp), + 'controller': controllerName + } + + if delDevice['dpid'] in self.port_to_feature: + del self.port_to_feature[delDevice['dpid']] + + print json.dumps(delDevice) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/deldevice" + self.post_json_to_core(tmpIP, json.dumps(delDevice)) + + # + # handle modify port event + # + @set_ev_cls(event.EventPortModify) + def mod_port_handler(self, ev): + port = ev.port + print '*****mod port*****' + + if port.is_down(): + print '***down***' + + # format dpid + temp = "%016x" %port.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + + modPort = { + 'dpid': ''.join(temp), + 'port': str(port.port_no), + 'controller': controllerName + } + + if modPort['dpid'] in self.port_to_feature: + if modPort['port'] in self.port_to_feature[modPort['dpid']]: + del self.port_to_feature[modPort['dpid']][modPort['port']] + + print json.dumps(modPort) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/delport" + self.post_json_to_core(tmpIP, json.dumps(modPort)) + else: + print '***live***' + + # format dpid + temp = "%016x" %port.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + + modPort = { + 'dpid': ''.join(temp), + 'port': str(port.port_no), + 'controller': controllerName + } + + if modPort['dpid'] in self.port_to_feature: + self.port_to_feature[modPort['dpid']][modPort['port']] = str(port.currentFeatures) + + print json.dumps(modPort) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/addport" + self.post_json_to_core(tmpIP, json.dumps(modPort)) + + # + # handle add port event + # + @set_ev_cls(event.EventPortAdd) + def add_port_handler(self, ev): + port = ev.port + print '*****add port*****' + + # format dpid + temp = "%016x" %port.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + + addPort = { + 'dpid': ''.join(temp), + 'port': str(port.port_no), + 'controller': controllerName + } + + if addPort['dpid'] in self.port_to_feature: + self.port_to_feature[addPort['dpid']][addPort['port']] = str(port.currentFeatures) + + print json.dumps(addPort) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/addport" + self.post_json_to_core(tmpIP, json.dumps(addPort)) + + # + # handle delete port event + # + @set_ev_cls(event.EventPortDelete) + def del_port_handler(self, ev): + port = ev.port + print '*****del port*****' + + # format dpid + temp = "%016x" %port.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + + delPort = { + 'dpid': ''.join(temp), + 'port': str(port.port_no), + 'controller': controllerName + } + + if delPort['dpid'] in self.port_to_feature: + if delPort['port'] in self.port_to_feature[delPort['dpid']]: + del self.port_to_feature[delPort['dpid']][delPort['port']] + + print json.dumps(delPort) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/delport" + self.post_json_to_core(tmpIP, json.dumps(delPort)) + + # + # handle add link event + # + @set_ev_cls(event.EventLinkAdd) + def add_link_handler(self, ev): + link = ev.link + print '*****add link*****' + + # format src dpid + temp = "%016x" %link.src.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + nodesrc = { + 'dpid': ''.join(temp), + 'port': str(link.src.port_no) + } + + # format dst dpid + temp = "%016x" %link.dst.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + nodedst = { + 'dpid': ''.join(temp), + 'port': str(link.dst.port_no) + } + + addLink = { + 'link': [nodesrc, nodedst], + 'controller': controllerName + } + + print json.dumps(addLink) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/addlink" + self.post_json_to_core(tmpIP, json.dumps(addLink)) + + # + # handle delete link event + # + @set_ev_cls(event.EventLinkDelete) + def del_link_handler(self, ev): + link = ev.link + print '*****del link*****' + + # format src dpid + temp = "%016x" %link.src.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + nodesrc = { + 'dpid': ''.join(temp), + 'port': str(link.src.port_no) + } + + # format dst dpid + temp = "%016x" %link.dst.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + nodedst = { + 'dpid': ''.join(temp), + 'port': str(link.dst.port_no) + } + + delLink = { + 'link': [nodesrc, nodedst], + 'controller': controllerName + } + + print json.dumps(delLink) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/dellink" + self.post_json_to_core(tmpIP, json.dumps(delLink)) + + # + # handle add host event + # + @set_ev_cls(event.EventHostAdd) + def add_host_handler(self, ev): + host = ev.host + print '*****add host*****' + + # format dpid + temp = "%016x" %host.port.dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + nodeloc = { + 'dpid': ''.join(temp), + 'port': str(host.port.port_no) + } + + addHost = { + 'mac': host.mac, + 'type': 'wired', + 'location': nodeloc, + 'vlan': str(host.vlan[len(host.vlan)-1]) if host.vlan else "0", + 'ip': str(host.ipv4[len(host.ipv4)-1]) if host.ipv4 else "0.0.0.0", + 'controller': controllerName + } + + print json.dumps(addHost) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/addhost" + self.post_json_to_core(tmpIP, json.dumps(addHost)) + + # + # handle delete host event + # + @set_ev_cls(event.EventHostDelete) + def del_host_handler(self, ev): + host = ev.host + print '*****del host*****' + + delHost = { + 'mac': host.mac, + 'controller': controllerName + } + + print json.dumps(delHost) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/delhost" + self.post_json_to_core(tmpIP, json.dumps(delHost)) + + # + # handle packet in event + # + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def packet_in_handler(self, ev): + msg = ev.msg + datapath = msg.datapath + ofproto = datapath.ofproto + parser = datapath.ofproto_parser + + dpid = datapath.id + + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + in_port = msg.in_port + else: + in_port = msg.match['in_port'] - # return dpid of all nodes - def getNodes(self): - return self.dpset.dps.keys() + pkt = packet.Packet(msg.data) + eth = pkt.get_protocols(ethernet.ethernet)[0] - # return flow table of specific dpid - def getFlows(self, dpid): + # ignore lldp packet & ipv6 + if (eth.ethertype == ETH_TYPE_LLDP) | (eth.ethertype == ETH_TYPE_IPV6): + return + + src = eth.src + dst = eth.dst + + print '*****packet in*****' + + packetIn = {} + + # format dpid + temp = "%016x" %dpid + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + packetIn["dpid"] = ''.join(temp) + + packetIn["in_port"] = str(in_port) + + pkt_ethernet = pkt.get_protocol(ethernet.ethernet) + if not pkt_ethernet: + return + else: + packetIn["mac_src"] = pkt_ethernet.src + packetIn["mac_dst"] = pkt_ethernet.dst + packetIn["ether_type"] = 'x0'.join(hex(pkt_ethernet.ethertype).split('x')) if len(hex(pkt_ethernet.ethertype)) < 6 else hex(pkt_ethernet.ethertype) + + for p in pkt.protocols: + if hasattr(p, 'src'): + packetIn["ip_src"] = p.src + packetIn["ip_dst"] = p.dst + + if hasattr(p, 'src_ip'): + packetIn["ip_src"] = p.src_ip + packetIn["ip_dst"] = p.dst_ip + + if hasattr(p, 'proto'): + packetIn["protocol"] = 'x0'.join(hex(p.proto).split('x')) if ((len(hex(p.proto)) < 4) or (len(hex(p.proto)) == 5)) else hex(p.proto) + + if hasattr(p, 'src_port'): + packetIn["port_src"] = str(p.src_port) + packetIn["port_dst"] = str(p.dst_port) + + packetIn["protocol"] = '0' if 'protocol' not in packetIn else packetIn["protocol"] + packetIn["ip_src"] = '0.0.0.0' if 'ip_src' not in packetIn else packetIn["ip_src"] + packetIn["ip_dst"] = '0.0.0.0' if 'ip_dst' not in packetIn else packetIn["ip_dst"] + packetIn["port_src"] = '0' if 'port_src' not in packetIn else packetIn["port_src"] + packetIn["port_dst"] = '0' if 'port_dst' not in packetIn else packetIn["port_dst"] + packetIn["controller"] = controllerName + + print json.dumps(packetIn) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/packet" + self.post_json_to_core(tmpIP, json.dumps(packetIn)) + + # + # for datapath + # + @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) + def state_change_handler(self, ev): + datapath = ev.datapath + if ev.state == MAIN_DISPATCHER: + if not datapath.id in self.datapaths: + self.logger.debug('register datapath: %016x', datapath.id) + self.datapaths[datapath.id] = datapath + elif ev.state == DEAD_DISPATCHER: + if datapath.id in self.datapaths: + self.logger.debug('unregister datapath: %016x', datapath.id) + del self.datapaths[datapath.id] + + # + # for polling + # + def monitor(self): + while True: + for dp in self.datapaths.values(): + tempflow = self.flow_stats_reply_handler_API(dp.id) # from OpenADM + tempport = self.port_stats_reply_handler_API(dp.id) # from OpenADM + self.controller_stats_reply_handler() + + hub.sleep(5) + + # + # polling controller + # + def controller_stats_reply_handler(self): + print '----------------------controller----------------------' + + os = subprocess.check_output("cat /etc/issue".split()) + mem = subprocess.check_output("free -h", shell=True) + cpu = subprocess.check_output("cat /proc/loadavg".split()) + controllerstatsReply = { + 'controller': controllerName, + 'type': 'ryu', + 'os': ' '.join(os.split()), + 'mem_total': mem.split()[7], + 'mem_used': mem.split()[8], + 'mem_free': mem.split()[9], + 'cpu': cpu.split()[0] + } + + print json.dumps(controllerstatsReply) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/controller" + self.post_json_to_core(tmpIP, json.dumps(controllerstatsReply)) + + # + # polling flows + # + def flow_stats_reply_handler_API(self, dpid): flow = {} dp = self.dpset.get(int(dpid)) if dp is None: @@ -97,10 +525,71 @@ def getFlows(self, dpid): else: LOG.debug('Unsupported OF protocol') return None + + print '----------------------flowAPI----------------------' + + flowstatsReplyAPI = {} + flowstatsReplyAPI["controller"] = controllerName + + for key in flows: + temp = "%016x" %int(key) + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + flowstatsReplyAPI["dpid"] = ''.join(temp) + + flowstatsReplyAPI["flows"] = [] + i = 0 + for inflow in flows[key]: + if 'priority' in inflow: + flowstatsReplyAPI["flows"].append({}) + flowstatsReplyAPI["flows"][i]["ingressPort"] = str(inflow['match']['in_port']) if 'in_port' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["dstMac"] = inflow['match']['dl_dst'] if 'dl_dst' in inflow['match'] else "00:00:00:00:00:00" + flowstatsReplyAPI["flows"][i]["srcMac"] = inflow['match']['dl_src'] if 'dl_src' in inflow['match'] else "00:00:00:00:00:00" + flowstatsReplyAPI["flows"][i]["dstIP"] = inflow['match']['nw_dst'] if 'nw_dst' in inflow['match'] else "0.0.0.0" + flowstatsReplyAPI["flows"][i]["dstIPMask"] = "0" # not support in ryu + flowstatsReplyAPI["flows"][i]["srcIP"] = inflow['match']['nw_src'] if 'nw_src' in inflow['match'] else "0.0.0.0" + flowstatsReplyAPI["flows"][i]["srcIPMask"] = "0" # not support in ryu + flowstatsReplyAPI["flows"][i]["netProtocol"] = hex(inflow['match']['nw_proto']) if 'nw_proto' in inflow['match'] else "0x00" + flowstatsReplyAPI["flows"][i]["dstPort"] = str(inflow['match']['tp_dst']) if 'tp_dst' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["srcPort"] = str(inflow['match']['tp_src']) if 'tp_src' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["vlan"] = str(inflow['match']['dl_vlan']) if 'dl_vlan' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["vlanP"] = str(inflow['match']['dl_vlan_pcp']) if 'dl_vlan_pcp' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["wildcards"] = str(inflow['match']['wildcards']) if 'wildcards' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["tosBits"] = str(inflow['match']['nw_tos']) if 'nw_tos' in inflow['match'] else "0" + flowstatsReplyAPI["flows"][i]["counterByte"] = str(inflow['byte_count']) + flowstatsReplyAPI["flows"][i]["counterPacket"] = str(inflow['packet_count']) + flowstatsReplyAPI["flows"][i]["idleTimeout"] = str(inflow['idle_timeout']) + flowstatsReplyAPI["flows"][i]["hardTimeout"] = str(inflow['hard_timeout']) + flowstatsReplyAPI["flows"][i]["priority"] = str(inflow['priority']) + flowstatsReplyAPI["flows"][i]["duration"] = str(inflow['duration_sec']) + flowstatsReplyAPI["flows"][i]["dlType"] = hex(inflow['match']['dl_type']) if 'dl_type' in inflow['match'] else "0x0000" + + flowstatsReplyAPI["flows"][i]["actions"] = [] + for action in inflow['actions']: + if len(action.split(':')) == 1: + act = { + "value": "0", + "type": action + } + else: + act = { + "value": action.split(':')[1], + "type": action.split(':')[0] + } + flowstatsReplyAPI["flows"][i]["actions"].append(act) + + i += 1 + + print json.dumps(flowstatsReplyAPI) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/flow" + self.post_json_to_core(tmpIP, json.dumps(flowstatsReplyAPI)) return flows - # return port information of specific dpid - def getPorts(self, dpid): + # + # polling ports + # + def port_stats_reply_handler_API(self, dpid): dp = self.dpset.get(int(dpid)) if dp is None: return None @@ -113,105 +602,47 @@ def getPorts(self, dpid): else: LOG.debug('Unsupported OF protocol') return None + + print '----------------------portAPI----------------------' + + for key in ports: + for port in ports[key]: + portstatsReplyAPI = { + 'controller': controllerName, + 'port': str(port['port_no']), + 'rxbyte': str(port['rx_bytes']), + 'rxpacket': str(port['rx_packets']), + 'txbyte': str(port['tx_bytes']), + 'txpacket': str(port['tx_packets']) + } + + temp = "%016x" %int(key) + temp = map(str, temp) + for i in range(2, 23, 3): + temp.insert(i, ':') + portstatsReplyAPI["dpid"] = ''.join(temp) + portstatsReplyAPI["capacity"] = self.port_to_feature[portstatsReplyAPI['dpid']][portstatsReplyAPI['port']] if portstatsReplyAPI['port'] in self.port_to_feature[portstatsReplyAPI['dpid']] else '0' + + print json.dumps(portstatsReplyAPI) + tmpIP = "http://" + coreIP + ":" + corePort + "/publish/port" + self.post_json_to_core(tmpIP, json.dumps(portstatsReplyAPI)) return ports - # return links in network topology - # notice: --observe-link is needed when running ryu-manager - def getLinks(self): - dpid = None - links = get_link(self.omniui, dpid) - return links - - # repack switch information - def switches(self, req, **kwargs): - result = [] - nodes = self.getNodes() - for node in nodes: - omniNode = { - 'dpid': self.colonDPID(dpid_to_str(node)), - 'flows':[], - 'ports':[] - } - # repack flow information - flows = self.getFlows(node) - for key in flows: - for flow in flows[key]: - omniFlow = { - 'ingressPort': flow['match']['in_port'] if 'in_port' in flow['match'] else 0, - 'srcMac': flow['match']['dl_src'] if 'dl_src' in flow['match'] else 0, - 'dstMac': flow['match']['dl_dst'] if 'dl_dst' in flow['match'] else 0, - 'dstIP': flow['match']['nw_dst'] if 'nw_dst' in flow['match'] else 0, - 'dstIPMask': '-', # not support in ryu - 'netProtocol': flow['match']['nw_proto'] if 'nw_proto' in flow['match'] else 0, - 'srcIP': flow['match']['nw_src'] if 'nw_src' in flow['match'] else 0, - 'srcIPMask': '-', # not support in ryu - 'dstPort': flow['match']['tp_dst'] if 'tp_dst' in flow['match'] else 0, - 'srcPort': flow['match']['tp_src'] if 'tp_src' in flow['match'] else 0, - 'vlan': flow['match']['dl_vlan'] if 'dl_vlan' in flow['match'] else 0, - 'vlanP': flow['match']['dl_vlan_pcp'] if 'dl_vlan_pcp' in flow['match'] else 0, - 'wildcards': flow['match']['wildcards'] if 'wildcards' in flow['match'] else '-', - 'tosBits': flow['match']['nw_tos'] if 'nw_tos' in flow['match'] else 0, - 'counterByte': flow['byte_count'], - 'counterPacket': flow['packet_count'], - 'idleTimeout': flow['idle_timeout'], - 'hardTimeout': flow['hard_timeout'], - 'priority': flow['priority'], - 'duration': flow['duration_sec'], - 'dlType': flow['match']['dl_type'] if 'dl_type' in flow['match'] else 0, - 'actions': [] - } - # repack action field - for action in flow['actions']: - if (len(action.split(':')) == 1): - omniAction = { - 'type': action, - } - omniFlow['actions'].append(omniAction) - else: - omniAction = { - 'type': action.split(':')[0], - 'value': action.split(':')[1] - } - omniFlow['actions'].append(omniAction) - omniNode['flows'].append(omniFlow) - # repack port information - ports = self.getPorts(node) - for key in ports: - for port in ports[key]: - omniPort = { - 'PortNumber': port['port_no'], - 'recvPackets': port['rx_packets'], - 'transmitPackets': port['tx_packets'], - 'recvBytes': port['rx_bytes'], - 'transmitBytes': port['tx_bytes'] - } - omniNode['ports'].append(omniPort) - result.append(omniNode) - body = json.dumps(result) - return Response(content_type='application/json', body=body) - - # repack link information - def links(self, req, **kwargs): - result = [] - links = self.getLinks() - for link in links: - omniLink = { - 'src-switch': self.colonDPID(link.to_dict()['src']['dpid']), - 'dst-switch': self.colonDPID(link.to_dict()['dst']['dpid']), - 'src-port': (int)(link.to_dict()['src']['port_no']), - 'dst-port': (int)(link.to_dict()['dst']['port_no']) - } - # remove bi-direction link - reverse = False - for link in result: - if (link['src-switch'] == omniLink['dst-switch'] and - link['dst-switch'] == omniLink['src-switch'] and - link['src-port'] == omniLink['dst-port'] and - link['dst-port'] == omniLink['src-port']): - reverse = True - result.append(omniLink) if reverse is False else None - body = json.dumps(result) - return Response(content_type='application/json', body=body) + +class RestController(ControllerBase): + def __init__(self, req, link, data, **config): + super(RestController, self).__init__(req, link, data, **config) + self.dpset = data['dpset'] + + def get_controller_name(self, req, **kwargs): + try: + global controllerName + controllerName = req.body + print '*****NAME: ' + controllerName + '*****' + except SyntaxError: + return Response(status=400) + + return Response(status=200) def mod_flow_entry(self, req, **kwargs): try: @@ -275,15 +706,15 @@ def ryuFlow_v1_0(self, dp, flows): 'match': { 'wildcards': wildcards, 'in_port': int(flows.get('ingressPort', 0)), - 'dl_src': flows.get('srcMac'), - 'dl_dst': flows.get('dstMac'), + 'dl_src': flows.get('srcMac', '00:00:00:00:00:00'), + 'dl_dst': flows.get('dstMac', '00:00:00:00:00:00'), 'dl_vlan': int(flows.get('vlan', 0)), 'dl_vlan_pcp': int(flows.get('vlanP', 0)), - 'dl_type': int(flows.get('dlType', 0)), + 'dl_type': int(flows.get('dlType', '0x0000'), 16), 'nw_tos': int(flows.get('tosBits', 0)), - 'nw_proto': int(flows.get('netProtocol', 0)), - 'nw_src': flows.get('srcIP').split('/')[0], - 'nw_dst': flows.get('dstIP').split('/')[0], + 'nw_proto': int(flows.get('netProtocol', '0x00'), 16), + 'nw_src': flows.get('srcIP', '0.0.0.0').split('/')[0], + 'nw_dst': flows.get('dstIP', '0.0.0.0').split('/')[0], 'tp_src': int(flows.get('srcPort', 0)), 'tp_dst': int(flows.get('dstPort', 0)) } @@ -294,6 +725,9 @@ def ryuFlow_v1_0(self, dp, flows): for act in actions: action = self.to_action_v1_0(dp, act) ryuFlow['actions'].append(action) + for matchfield in ryuFlow['match'].copy(): + if (ryuFlow['match'][matchfield] == 0) or (ryuFlow['match'][matchfield] == '0.0.0.0') or (ryuFlow['match'][matchfield] == '00:00:00:00:00:00'): + del ryuFlow['match'][matchfield] return ryuFlow @@ -303,18 +737,18 @@ def to_action_v1_0(self, dp, actions): if actions_type == 'OUTPUT': ryuAction = { 'type': actions_type, - 'port': actions.split('=')[1], + 'port': int(actions.split('=')[1]), 'max_len': 0xffe5 } elif actions_type == 'SET_VLAN_VID': ryuAction = { 'type': actions_type, - 'vlan_vid': actions.split('=')[1] + 'vlan_vid': int(actions.split('=')[1]) } elif actions_type == 'SET_VLAN_PCP': ryuAction = { 'type': actions_type, - 'vlan_pcp': actions.split('=')[1] + 'vlan_pcp': int(actions.split('=')[1]) } elif actions_type == 'STRIP_VLAN': ryuAction = { @@ -343,25 +777,25 @@ def to_action_v1_0(self, dp, actions): elif actions_type == 'SET_NW_TOS': ryuAction = { 'type': actions_type, - 'nw_tos': actions.split('=')[1] + 'nw_tos': int(actions.split('=')[1]) } elif actions_type == 'SET_TP_SRC': ryuAction = { 'type': actions_type, - 'tp_src': actions.split('=')[1] + 'tp_src': int(actions.split('=')[1]) } elif actions_type == 'SET_TP_DST': ryuAction = { 'type': actions_type, - 'tp_dst': actions.split('=')[1] + 'tp_dst': int(actions.split('=')[1]) } elif actions_type == 'ENQUEUE': actions_port = actions.split('=')[1].split(':')[0] actions_qid = actions.split('=')[1].split(':')[1] ryuAction = { 'type': actions_type, - 'port': actions_port, - 'queue_id': actions_qid + 'port': int(actions_port), + 'queue_id': int(actions_qid) } else: LOG.debug('Unknown action type') @@ -392,7 +826,7 @@ def ryuFlow_v1_3(self, dp, omniFlow): ryuFlow['match'].update(match) # handle mutiple actions - acts = omniFlow.get('actions').split(',') + acts = omniFlow.get('actions', '').split(',') for a in acts: action = self.to_action_v1_3(dp, a) if action is not None: @@ -411,15 +845,15 @@ def to_match_v1_3(self, dp, omni_key, omniFlow): 'srcMac': ['dl_src', str], 'eth_dst': ['eth_dst', str], 'eth_src': ['eth_src', str], - 'dlType': ['dl_type', int], - 'eth_type': ['eth_type', int], + 'dlType': ['dl_type', int, 16], + 'eth_type': ['eth_type', int, 16], 'vlan': ['dl_vlan', str], 'vlan_vid': ['vlan_vid', str], 'vlanP': ['vlan_pcp', int], 'ip_dscp': ['ip_dscp', int], 'ip_ecn': ['ip_ecn', int], - 'netProtocol': ['nw_proto', int], - 'ip_proto': ['ip_proto', int], + 'netProtocol': ['nw_proto', int, 16], + 'ip_proto': ['ip_proto', int, 16], 'srcIP': ['nw_src', str], 'dstIP': ['nw_dst', str], 'ipv4_src': ['ipv4_src', str], @@ -455,8 +889,15 @@ def to_match_v1_3(self, dp, omni_key, omniFlow): 'ipv6_exthdr': ['ipv6_exthdr', int] } + if (omniFlow.get(omni_key) == '0') or (omniFlow.get(omni_key) == '0.0.0.0') or (omniFlow.get(omni_key) == '00:00:00:00:00:00') or (omniFlow.get(omni_key) == '0x00') or (omniFlow.get(omni_key) == '0x0000'): + return None for key, value in convert.items(): if omni_key == key: + if len(value) == 3: + ryuMatch = { + value[0]: value[1](omniFlow.get(omni_key), value[2]) + } + return ryuMatch ryuMatch = { value[0]: value[1](omniFlow.get(omni_key)) } @@ -470,7 +911,7 @@ def to_action_v1_3(self, dp, dic): if action_type == 'OUTPUT': ryuAction = { 'type': action_type, - 'port': dic.split('=')[1] + 'port': int(dic.split('=')[1]) } elif action_type == 'COPY_TTL_OUT': ryuAction = { @@ -550,7 +991,3 @@ def to_action_v1_3(self, dp, dic): # restore Ryu-format dpid def nospaceDPID(self, dpid): return "".join(dpid) - - # repack dpid - def colonDPID(self, dpid): - return ':'.join(a+b for a,b in zip(dpid[::2], dpid[1::2])) diff --git a/adapter/ryu/switches.py b/adapter/ryu/switches.py new file mode 100644 index 0000000..17a483c --- /dev/null +++ b/adapter/ryu/switches.py @@ -0,0 +1,1088 @@ +# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import six +import struct +import time +import json +from ryu import cfg + +from ryu.topology import event +from ryu.base import app_manager +from ryu.controller import ofp_event +from ryu.controller.handler import set_ev_cls +from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER +from ryu.exception import RyuException +from ryu.lib import addrconv, hub +from ryu.lib.mac import DONTCARE_STR +from ryu.lib.dpid import dpid_to_str, str_to_dpid +from ryu.lib.port_no import port_no_to_str +from ryu.lib.packet import packet, ethernet +from ryu.lib.packet import lldp, ether_types +from ryu.lib.packet import arp, ipv4, ipv6, vlan +from ryu.ofproto.ether import ETH_TYPE_LLDP +from ryu.ofproto import nx_match +from ryu.ofproto import ofproto_v1_0 +from ryu.ofproto import ofproto_v1_2 +from ryu.ofproto import ofproto_v1_3 +from ryu.ofproto import ofproto_v1_4 + + +LOG = logging.getLogger(__name__) + + +CONF = cfg.CONF + +CONF.register_cli_opts([ + cfg.BoolOpt('observe-links', default=False, + help='observe link discovery events.'), + cfg.BoolOpt('install-lldp-flow', default=True, + help='link discovery: explicitly install flow entry ' + 'to send lldp packet to controller'), + cfg.BoolOpt('explicit-drop', default=True, + help='link discovery: explicitly drop lldp packet in') +]) + + +class Port(object): + # This is data class passed by EventPortXXX + def __init__(self, dpid, ofproto, ofpport): + super(Port, self).__init__() + + self.dpid = dpid + self._ofproto = ofproto + self._config = ofpport.config + self._state = ofpport.state + + self.port_no = ofpport.port_no + self.hw_addr = ofpport.hw_addr + self.name = ofpport.name + self.currentFeatures = ofpport.curr + + def is_reserved(self): + return self.port_no > self._ofproto.OFPP_MAX + + def is_down(self): + return (self._state & self._ofproto.OFPPS_LINK_DOWN) > 0 \ + or (self._config & self._ofproto.OFPPC_PORT_DOWN) > 0 + + def is_live(self): + # NOTE: OF1.2 has OFPPS_LIVE state + # return (self._state & self._ofproto.OFPPS_LIVE) > 0 + return not self.is_down() + + def to_dict(self): + return {'dpid': dpid_to_str(self.dpid), + 'port_no': port_no_to_str(self.port_no), + 'hw_addr': self.hw_addr, + 'name': self.name.rstrip('\0')} + + # for Switch.del_port() + def __eq__(self, other): + return self.dpid == other.dpid and self.port_no == other.port_no + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash((self.dpid, self.port_no)) + + def __str__(self): + LIVE_MSG = {False: 'DOWN', True: 'LIVE'} + return 'Port' % \ + (self.dpid, self.port_no, LIVE_MSG[self.is_live()]) + + +class Switch(object): + # This is data class passed by EventSwitchXXX + def __init__(self, dp): + super(Switch, self).__init__() + + self.dp = dp + self.ports = [] + + def add_port(self, ofpport): + port = Port(self.dp.id, self.dp.ofproto, ofpport) + if not port.is_reserved(): + self.ports.append(port) + + def del_port(self, ofpport): + self.ports.remove(Port(ofpport)) + + def to_dict(self): + d = {'dpid': dpid_to_str(self.dp.id), + 'ports': [port.to_dict() for port in self.ports]} + return d + + def __str__(self): + msg = 'Switch Host class + def __init__(self): + super(HostState, self).__init__() + + def add(self, host): + mac = host.mac + self.setdefault(mac, host) + + def update_vlan(self, host, vlan=None): + mac = host.mac + host = None + if mac in self: + host = self[mac] + + if not host: + return + + if vlan != None: + if vlan in host.vlan: + if host.vlan.index(vlan) == (len(host.vlan)-1): + return 'old' + host.vlan.remove(vlan) + host.vlan.append(vlan) + return 'new' + + def update_ip(self, host, ip_v4=None, ip_v6=None): + mac = host.mac + host = None + if mac in self: + host = self[mac] + + if not host: + return + + if ip_v4 != None: + if ip_v4 in host.ipv4: + if host.ipv4.index(ip_v4) == (len(host.ipv4)-1): + return 'old' + host.ipv4.remove(ip_v4) + host.ipv4.append(ip_v4) + return 'new' + + if ip_v6 != None: + if ip_v6 in host.ipv6: + if host.ipv6.index(ip_v6) == (len(host.ipv6)-1): + return 'old' + host.ipv6.remove(ip_v6) + host.ipv6.append(ip_v6) + return 'new' + + def get_by_dpid(self, dpid): + result = [] + + for mac in self: + host = self[mac] + if host.port.dpid == dpid: + result.append(host) + + return result + + +class PortState(dict): + # dict: int port_no -> OFPPort port + # OFPPort is defined in ryu.ofproto.ofproto_v1_X_parser + def __init__(self): + super(PortState, self).__init__() + + def add(self, port_no, port): + self[port_no] = port + + def remove(self, port_no): + del self[port_no] + + def modify(self, port_no, port): + self[port_no] = port + + +class PortData(object): + def __init__(self, is_down, lldp_data): + super(PortData, self).__init__() + self.is_down = is_down + self.lldp_data = lldp_data + self.timestamp = None + self.sent = 0 + + def lldp_sent(self): + self.timestamp = time.time() + self.sent += 1 + + def lldp_received(self): + self.sent = 0 + + def lldp_dropped(self): + return self.sent + + def clear_timestamp(self): + self.timestamp = None + + def set_down(self, is_down): + self.is_down = is_down + + def __str__(self): + return 'PortData' \ + % (not self.is_down, self.timestamp, self.sent) + + +class PortDataState(dict): + # dict: Port class -> PortData class + # slimed down version of OrderedDict as python 2.6 doesn't support it. + _PREV = 0 + _NEXT = 1 + _KEY = 2 + + def __init__(self): + super(PortDataState, self).__init__() + self._root = root = [] # sentinel node + root[:] = [root, root, None] # [_PREV, _NEXT, _KEY] + # doubly linked list + self._map = {} + + def _remove_key(self, key): + link_prev, link_next, key = self._map.pop(key) + link_prev[self._NEXT] = link_next + link_next[self._PREV] = link_prev + + def _append_key(self, key): + root = self._root + last = root[self._PREV] + last[self._NEXT] = root[self._PREV] = self._map[key] = [last, root, + key] + + def _prepend_key(self, key): + root = self._root + first = root[self._NEXT] + first[self._PREV] = root[self._NEXT] = self._map[key] = [root, first, + key] + + def _move_last_key(self, key): + self._remove_key(key) + self._append_key(key) + + def _move_front_key(self, key): + self._remove_key(key) + self._prepend_key(key) + + def add_port(self, port, lldp_data): + if port not in self: + self._prepend_key(port) + self[port] = PortData(port.is_down(), lldp_data) + else: + self[port].is_down = port.is_down() + + def lldp_sent(self, port): + port_data = self[port] + port_data.lldp_sent() + self._move_last_key(port) + return port_data + + def lldp_received(self, port): + self[port].lldp_received() + + def move_front(self, port): + port_data = self.get(port, None) + if port_data is not None: + port_data.clear_timestamp() + self._move_front_key(port) + + def set_down(self, port): + is_down = port.is_down() + port_data = self[port] + port_data.set_down(is_down) + port_data.clear_timestamp() + if not is_down: + self._move_front_key(port) + return is_down + + def get_port(self, port): + return self[port] + + def del_port(self, port): + del self[port] + self._remove_key(port) + + def __iter__(self): + root = self._root + curr = root[self._NEXT] + while curr is not root: + yield curr[self._KEY] + curr = curr[self._NEXT] + + def clear(self): + for node in self._map.values(): + del node[:] + root = self._root + root[:] = [root, root, None] + self._map.clear() + dict.clear(self) + + def items(self): + 'od.items() -> list of (key, value) pairs in od' + return [(key, self[key]) for key in self] + + def iteritems(self): + 'od.iteritems -> an iterator over the (key, value) pairs in od' + for k in self: + yield (k, self[k]) + + +class LinkState(dict): + # dict: Link class -> timestamp + def __init__(self): + super(LinkState, self).__init__() + self._map = {} + + def get_peer(self, src): + return self._map.get(src, None) + + def update_link(self, src, dst): + link = Link(src, dst) + + self[link] = time.time() + self._map[src] = dst + + # return if the reverse link is also up or not + rev_link = Link(dst, src) + return rev_link in self + + def link_down(self, link): + del self[link] + del self._map[link.src] + + def rev_link_set_timestamp(self, rev_link, timestamp): + # rev_link may or may not in LinkSet + if rev_link in self: + self[rev_link] = timestamp + + def port_deleted(self, src): + dst = self.get_peer(src) + if dst is None: + raise KeyError() + + link = Link(src, dst) + rev_link = Link(dst, src) + del self[link] + del self._map[src] + # reverse link might not exist + self.pop(rev_link, None) + rev_link_dst = self._map.pop(dst, None) + + return dst, rev_link_dst + + +class LLDPPacket(object): + # make a LLDP packet for link discovery. + + CHASSIS_ID_PREFIX = 'dpid:' + CHASSIS_ID_PREFIX_LEN = len(CHASSIS_ID_PREFIX) + CHASSIS_ID_FMT = CHASSIS_ID_PREFIX + '%s' + + PORT_ID_STR = '!I' # uint32_t + PORT_ID_SIZE = 4 + + class LLDPUnknownFormat(RyuException): + message = '%(msg)s' + + @staticmethod + def lldp_packet(dpid, port_no, dl_addr, ttl): + pkt = packet.Packet() + + dst = lldp.LLDP_MAC_NEAREST_BRIDGE + src = dl_addr + ethertype = ETH_TYPE_LLDP + eth_pkt = ethernet.ethernet(dst, src, ethertype) + pkt.add_protocol(eth_pkt) + + tlv_chassis_id = lldp.ChassisID( + subtype=lldp.ChassisID.SUB_LOCALLY_ASSIGNED, + chassis_id=(LLDPPacket.CHASSIS_ID_FMT % + dpid_to_str(dpid)).encode('ascii')) + + tlv_port_id = lldp.PortID(subtype=lldp.PortID.SUB_PORT_COMPONENT, + port_id=struct.pack( + LLDPPacket.PORT_ID_STR, + port_no)) + + tlv_ttl = lldp.TTL(ttl=ttl) + tlv_end = lldp.End() + + tlvs = (tlv_chassis_id, tlv_port_id, tlv_ttl, tlv_end) + lldp_pkt = lldp.lldp(tlvs) + pkt.add_protocol(lldp_pkt) + + pkt.serialize() + return pkt.data + + @staticmethod + def lldp_parse(data): + pkt = packet.Packet(data) + i = iter(pkt) + eth_pkt = six.next(i) + assert type(eth_pkt) == ethernet.ethernet + + lldp_pkt = six.next(i) + if type(lldp_pkt) != lldp.lldp: + raise LLDPPacket.LLDPUnknownFormat() + + tlv_chassis_id = lldp_pkt.tlvs[0] + if tlv_chassis_id.subtype != lldp.ChassisID.SUB_LOCALLY_ASSIGNED: + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown chassis id subtype %d' % tlv_chassis_id.subtype) + chassis_id = tlv_chassis_id.chassis_id + if not chassis_id.startswith(LLDPPacket.CHASSIS_ID_PREFIX): + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown chassis id format %s' % chassis_id) + src_dpid = str_to_dpid(chassis_id[LLDPPacket.CHASSIS_ID_PREFIX_LEN:]) + + tlv_port_id = lldp_pkt.tlvs[1] + if tlv_port_id.subtype != lldp.PortID.SUB_PORT_COMPONENT: + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown port id subtype %d' % tlv_port_id.subtype) + port_id = tlv_port_id.port_id + if len(port_id) != LLDPPacket.PORT_ID_SIZE: + raise LLDPPacket.LLDPUnknownFormat( + msg='unknown port id %d' % port_id) + (src_port_no, ) = struct.unpack(LLDPPacket.PORT_ID_STR, port_id) + + return src_dpid, src_port_no + + +class Switches(app_manager.RyuApp): + OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION, ofproto_v1_2.OFP_VERSION, + ofproto_v1_3.OFP_VERSION, ofproto_v1_4.OFP_VERSION] + _EVENTS = [event.EventSwitchEnter, event.EventSwitchLeave, + event.EventSwitchReconnected, + event.EventPortAdd, event.EventPortDelete, + event.EventPortModify, + event.EventLinkAdd, event.EventLinkDelete, + event.EventHostAdd, event.EventHostDelete] + + DEFAULT_TTL = 120 # unused. ignored. + LLDP_PACKET_LEN = len(LLDPPacket.lldp_packet(0, 0, DONTCARE_STR, 0)) + + LLDP_SEND_GUARD = .05 + LLDP_SEND_PERIOD_PER_PORT = .9 + TIMEOUT_CHECK_PERIOD = 5. + LINK_TIMEOUT = TIMEOUT_CHECK_PERIOD * 2 + LINK_LLDP_DROP = 5 + + def __init__(self, *args, **kwargs): + super(Switches, self).__init__(*args, **kwargs) + + self.name = 'switches' + self.dps = {} # datapath_id => Datapath class + self.port_state = {} # datapath_id => ports + self.ports = PortDataState() # Port class -> PortData class + self.links = LinkState() # Link class -> timestamp + self.hosts = HostState() # mac address -> Host class list + self.is_active = True + + self.link_discovery = self.CONF.observe_links + if self.link_discovery: + self.install_flow = self.CONF.install_lldp_flow + self.explicit_drop = self.CONF.explicit_drop + self.lldp_event = hub.Event() + self.link_event = hub.Event() + self.threads.append(hub.spawn(self.lldp_loop)) + self.threads.append(hub.spawn(self.link_loop)) + + def close(self): + self.is_active = False + if self.link_discovery: + self.lldp_event.set() + self.link_event.set() + hub.joinall(self.threads) + + def _register(self, dp): + assert dp.id is not None + + self.dps[dp.id] = dp + if dp.id not in self.port_state: + self.port_state[dp.id] = PortState() + for port in dp.ports.values(): + self.port_state[dp.id].add(port.port_no, port) + + def _unregister(self, dp): + if dp.id in self.dps: + if (self.dps[dp.id] == dp): + del self.dps[dp.id] + del self.port_state[dp.id] + + def _get_switch(self, dpid): + if dpid in self.dps: + switch = Switch(self.dps[dpid]) + for ofpport in self.port_state[dpid].values(): + switch.add_port(ofpport) + return switch + + def _get_port(self, dpid, port_no): + switch = self._get_switch(dpid) + if switch: + for p in switch.ports: + if p.port_no == port_no: + return p + + def _port_added(self, port): + lldp_data = LLDPPacket.lldp_packet( + port.dpid, port.port_no, port.hw_addr, self.DEFAULT_TTL) + self.ports.add_port(port, lldp_data) + # LOG.debug('_port_added dpid=%s, port_no=%s, live=%s', + # port.dpid, port.port_no, port.is_live()) + + def _link_down(self, port): + try: + dst, rev_link_dst = self.links.port_deleted(port) + except KeyError: + # LOG.debug('key error. src=%s, dst=%s', + # port, self.links.get_peer(port)) + return + link = Link(port, dst) + self.send_event_to_observers(event.EventLinkDelete(link)) + if rev_link_dst: + rev_link = Link(dst, rev_link_dst) + self.send_event_to_observers(event.EventLinkDelete(rev_link)) + self.ports.move_front(dst) + + def _is_edge_port(self, port): + for link in self.links: + if port == link.src or port == link.dst: + return False + + return True + + @set_ev_cls(ofp_event.EventOFPStateChange, + [MAIN_DISPATCHER, DEAD_DISPATCHER]) + def state_change_handler(self, ev): + dp = ev.datapath + assert dp is not None + LOG.debug(dp) + + if ev.state == MAIN_DISPATCHER: + dp_multiple_conns = False + if dp.id in self.dps: + LOG.warning('Multiple connections from %s', dpid_to_str(dp.id)) + dp_multiple_conns = True + (self.dps[dp.id]).close() + + self._register(dp) + switch = self._get_switch(dp.id) + LOG.debug('register %s', switch) + + if not dp_multiple_conns: + self.send_event_to_observers(event.EventSwitchEnter(switch)) + else: + self.send_event_to_observers(event.EventSwitchReconnected(switch)) + + if not self.link_discovery: + return + + if self.install_flow: + ofproto = dp.ofproto + ofproto_parser = dp.ofproto_parser + + # TODO:XXX need other versions + if ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + rule = nx_match.ClsRule() + rule.set_dl_dst(addrconv.mac.text_to_bin( + lldp.LLDP_MAC_NEAREST_BRIDGE)) + rule.set_dl_type(ETH_TYPE_LLDP) + actions = [ofproto_parser.OFPActionOutput( + ofproto.OFPP_CONTROLLER, self.LLDP_PACKET_LEN)] + dp.send_flow_mod( + rule=rule, cookie=0, command=ofproto.OFPFC_ADD, + idle_timeout=0, hard_timeout=0, actions=actions, + priority=0xFFFF) + elif ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION: + match = ofproto_parser.OFPMatch( + eth_type=ETH_TYPE_LLDP, + eth_dst=lldp.LLDP_MAC_NEAREST_BRIDGE) + # OFPCML_NO_BUFFER is set so that the LLDP is not + # buffered on switch + parser = ofproto_parser + actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, + ofproto.OFPCML_NO_BUFFER + )] + inst = [parser.OFPInstructionActions( + ofproto.OFPIT_APPLY_ACTIONS, actions)] + mod = parser.OFPFlowMod(datapath=dp, match=match, + idle_timeout=0, hard_timeout=0, + instructions=inst, + priority=0xFFFF) + dp.send_msg(mod) + else: + LOG.error('cannot install flow. unsupported version. %x', + dp.ofproto.OFP_VERSION) + + # Do not add ports while dp has multiple connections to controller. + if not dp_multiple_conns: + for port in switch.ports: + if not port.is_reserved(): + self._port_added(port) + + self.lldp_event.set() + + elif ev.state == DEAD_DISPATCHER: + # dp.id is None when datapath dies before handshake + if dp.id is None: + return + + # if switch delete + # then delete host on it + hostlist = self.hosts.get_by_dpid(dp.id) + for host in hostlist: + ev = event.EventHostDelete(host) + self.send_event_to_observers(ev) + del self.hosts[host.mac] + + switch = self._get_switch(dp.id) + if switch: + if switch.dp is dp: + self._unregister(dp) + LOG.debug('unregister %s', switch) + + self.send_event_to_observers(event.EventSwitchLeave(switch)) + + if not self.link_discovery: + return + + for port in switch.ports: + if not port.is_reserved(): + self.ports.del_port(port) + self._link_down(port) + self.lldp_event.set() + + @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) + def port_status_handler(self, ev): + msg = ev.msg + reason = msg.reason + dp = msg.datapath + ofpport = msg.desc + + if reason == dp.ofproto.OFPPR_ADD: + # LOG.debug('A port was added.' + + # '(datapath id = %s, port number = %s)', + # dp.id, ofpport.port_no) + self.port_state[dp.id].add(ofpport.port_no, ofpport) + self.send_event_to_observers( + event.EventPortAdd(Port(dp.id, dp.ofproto, ofpport))) + + if not self.link_discovery: + return + + port = self._get_port(dp.id, ofpport.port_no) + if port and not port.is_reserved(): + self._port_added(port) + self.lldp_event.set() + + elif reason == dp.ofproto.OFPPR_DELETE: + # LOG.debug('A port was deleted.' + + # '(datapath id = %s, port number = %s)', + # dp.id, ofpport.port_no) + self.port_state[dp.id].remove(ofpport.port_no) + self.send_event_to_observers( + event.EventPortDelete(Port(dp.id, dp.ofproto, ofpport))) + + if not self.link_discovery: + return + + port = self._get_port(dp.id, ofpport.port_no) + if port and not port.is_reserved(): + # if port delete + # then delete host on it + for host in self.hosts.values(): + if port.__eq__(host.port): + ev = event.EventHostDelete(host) + self.send_event_to_observers(ev) + del self.hosts[host.mac] + break + self.ports.del_port(port) + self._link_down(port) + self.lldp_event.set() + + else: + assert reason == dp.ofproto.OFPPR_MODIFY + # LOG.debug('A port was modified.' + + # '(datapath id = %s, port number = %s)', + # dp.id, ofpport.port_no) + self.port_state[dp.id].modify(ofpport.port_no, ofpport) + self.send_event_to_observers( + event.EventPortModify(Port(dp.id, dp.ofproto, ofpport))) + + if not self.link_discovery: + return + + port = self._get_port(dp.id, ofpport.port_no) + if port and not port.is_reserved(): + if self.ports.set_down(port): + # if port down + # then delete host on it + for host in self.hosts.values(): + if port.__eq__(host.port): + ev = event.EventHostDelete(host) + self.send_event_to_observers(ev) + del self.hosts[host.mac] + break + self._link_down(port) + self.lldp_event.set() + + @staticmethod + def _drop_packet(msg): + buffer_id = msg.buffer_id + if buffer_id == msg.datapath.ofproto.OFP_NO_BUFFER: + return + + dp = msg.datapath + # TODO:XXX + if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + dp.send_packet_out(buffer_id, msg.in_port, []) + elif dp.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION: + dp.send_packet_out(buffer_id, msg.match['in_port'], []) + else: + LOG.error('cannot drop_packet. unsupported version. %x', + dp.ofproto.OFP_VERSION) + + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def lldp_packet_in_handler(self, ev): + if not self.link_discovery: + return + + msg = ev.msg + try: + src_dpid, src_port_no = LLDPPacket.lldp_parse(msg.data) + except LLDPPacket.LLDPUnknownFormat as e: + # This handler can receive all the packets which can be + # not-LLDP packet. Ignore it silently + return + + dst_dpid = msg.datapath.id + if msg.datapath.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + dst_port_no = msg.in_port + elif msg.datapath.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION: + dst_port_no = msg.match['in_port'] + else: + LOG.error('cannot accept LLDP. unsupported version. %x', + msg.datapath.ofproto.OFP_VERSION) + + src = self._get_port(src_dpid, src_port_no) + if not src or src.dpid == dst_dpid: + return + try: + self.ports.lldp_received(src) + except KeyError: + # There are races between EventOFPPacketIn and + # EventDPPortAdd. So packet-in event can happend before + # port add event. In that case key error can happend. + # LOG.debug('lldp_received: KeyError %s', e) + pass + + dst = self._get_port(dst_dpid, dst_port_no) + if not dst: + return + + old_peer = self.links.get_peer(src) + # LOG.debug("Packet-In") + # LOG.debug(" src=%s", src) + # LOG.debug(" dst=%s", dst) + # LOG.debug(" old_peer=%s", old_peer) + if old_peer and old_peer != dst: + old_link = Link(src, old_peer) + del self.links[old_link] + self.send_event_to_observers(event.EventLinkDelete(old_link)) + + link = Link(src, dst) + if link not in self.links: + self.send_event_to_observers(event.EventLinkAdd(link)) + + # remove hosts from edge port + for host in self.hosts.values(): + if not self._is_edge_port(host.port): + ev = event.EventHostDelete(host) + self.send_event_to_observers(ev) + del self.hosts[host.mac] + + if not self.links.update_link(src, dst): + # reverse link is not detected yet. + # So schedule the check early because it's very likely it's up + self.ports.move_front(dst) + self.lldp_event.set() + if self.explicit_drop: + self._drop_packet(msg) + + @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) + def host_discovery_packet_in_handler(self, ev): + msg = ev.msg + pkt = packet.Packet(msg.data) + eth = pkt.get_protocols(ethernet.ethernet)[0] + + # ignore lldp packet + if eth.ethertype == ETH_TYPE_LLDP: + return + + datapath = msg.datapath + dpid = datapath.id + port_no = -1 + + if msg.datapath.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + port_no = msg.in_port + else: + port_no = msg.match['in_port'] + + port = self._get_port(dpid, port_no) + + # can't find this port(ex: logic port) + if not port: + return + # ignore switch-to-switch port + if not self._is_edge_port(port): + return + + host_mac = eth.src + host = Host(host_mac, port) + + if host_mac not in self.hosts: + self.hosts.add(host) + ev = event.EventHostAdd(host) + self.send_event_to_observers(ev) + + # vlan packet, update vlan tag + vlan_pkt = pkt.get_protocol(vlan.vlan) + if vlan_pkt: + ans = self.hosts.update_vlan(host, vlan=vlan_pkt.vid) + if ans == 'new': + ev = event.EventHostAdd(host) + self.send_event_to_observers(ev) + + # arp packet, update ip address + if eth.ethertype == ether_types.ETH_TYPE_ARP: + arp_pkt = pkt.get_protocols(arp.arp)[0] + ans = self.hosts.update_ip(host, ip_v4=arp_pkt.src_ip) + if ans == 'new': + ev = event.EventHostAdd(host) + self.send_event_to_observers(ev) + + # ipv4 packet, update ipv4 address + elif eth.ethertype == ether_types.ETH_TYPE_IP: + ipv4_pkt = pkt.get_protocols(ipv4.ipv4)[0] + ans = self.hosts.update_ip(host, ip_v4=ipv4_pkt.src) + if ans == 'new': + ev = event.EventHostAdd(host) + self.send_event_to_observers(ev) + + # ipv6 packet, update ipv6 address + elif eth.ethertype == ether_types.ETH_TYPE_IPV6: + # TODO: need to handle NDP + ipv6_pkt = pkt.get_protocols(ipv6.ipv6)[0] + self.hosts.update_ip(host, ip_v6=ipv6_pkt.src) + + def send_lldp_packet(self, port): + try: + port_data = self.ports.lldp_sent(port) + except KeyError as e: + # ports can be modified during our sleep in self.lldp_loop() + # LOG.debug('send_lldp: KeyError %s', e) + return + if port_data.is_down: + return + + dp = self.dps.get(port.dpid, None) + if dp is None: + # datapath was already deleted + return + + # LOG.debug('lldp sent dpid=%s, port_no=%d', dp.id, port.port_no) + # TODO:XXX + if dp.ofproto.OFP_VERSION == ofproto_v1_0.OFP_VERSION: + actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)] + dp.send_packet_out(actions=actions, data=port_data.lldp_data) + elif dp.ofproto.OFP_VERSION >= ofproto_v1_2.OFP_VERSION: + actions = [dp.ofproto_parser.OFPActionOutput(port.port_no)] + out = dp.ofproto_parser.OFPPacketOut( + datapath=dp, in_port=dp.ofproto.OFPP_CONTROLLER, + buffer_id=dp.ofproto.OFP_NO_BUFFER, actions=actions, + data=port_data.lldp_data) + dp.send_msg(out) + else: + LOG.error('cannot send lldp packet. unsupported version. %x', + dp.ofproto.OFP_VERSION) + + def lldp_loop(self): + while self.is_active: + self.lldp_event.clear() + + now = time.time() + timeout = None + ports_now = [] + ports = [] + for (key, data) in self.ports.items(): + if data.timestamp is None: + ports_now.append(key) + continue + + expire = data.timestamp + self.LLDP_SEND_PERIOD_PER_PORT + if expire <= now: + ports.append(key) + continue + + timeout = expire - now + break + + for port in ports_now: + self.send_lldp_packet(port) + for port in ports: + self.send_lldp_packet(port) + hub.sleep(self.LLDP_SEND_GUARD) # don't burst + + if timeout is not None and ports: + timeout = 0 # We have already slept + # LOG.debug('lldp sleep %s', timeout) + self.lldp_event.wait(timeout=timeout) + + def link_loop(self): + while self.is_active: + self.link_event.clear() + + now = time.time() + deleted = [] + for (link, timestamp) in self.links.items(): + # LOG.debug('%s timestamp %d (now %d)', link, timestamp, now) + if timestamp + self.LINK_TIMEOUT < now: + src = link.src + if src in self.ports: + port_data = self.ports.get_port(src) + # LOG.debug('port_data %s', port_data) + if port_data.lldp_dropped() > self.LINK_LLDP_DROP: + deleted.append(link) + + for link in deleted: + self.links.link_down(link) + # LOG.debug('delete %s', link) + self.send_event_to_observers(event.EventLinkDelete(link)) + + dst = link.dst + rev_link = Link(dst, link.src) + if rev_link not in deleted: + # It is very likely that the reverse link is also + # disconnected. Check it early. + expire = now - self.LINK_TIMEOUT + self.links.rev_link_set_timestamp(rev_link, expire) + if dst in self.ports: + self.ports.move_front(dst) + self.lldp_event.set() + + self.link_event.wait(timeout=self.TIMEOUT_CHECK_PERIOD) + + @set_ev_cls(event.EventSwitchRequest) + def switch_request_handler(self, req): + # LOG.debug(req) + dpid = req.dpid + + switches = [] + if dpid is None: + # reply all list + for dp in self.dps.values(): + switches.append(self._get_switch(dp.id)) + elif dpid in self.dps: + switches.append(self._get_switch(dpid)) + + rep = event.EventSwitchReply(req.src, switches) + self.reply_to_request(req, rep) + + @set_ev_cls(event.EventLinkRequest) + def link_request_handler(self, req): + # LOG.debug(req) + dpid = req.dpid + + if dpid is None: + links = self.links + else: + links = [link for link in self.links if link.src.dpid == dpid] + rep = event.EventLinkReply(req.src, dpid, links) + self.reply_to_request(req, rep) + + @set_ev_cls(event.EventHostRequest) + def host_request_handler(self, req): + dpid = req.dpid + hosts = [] + if dpid is None: + for mac in self.hosts: + hosts.append(self.hosts[mac]) + else: + hosts = self.hosts.get_by_dpid(dpid) + + rep = event.EventHostReply(req.src, dpid, hosts) + self.reply_to_request(req, rep) diff --git a/core/etc/config.json b/core/etc/config.json index 8dea080..79ebf0e 100644 --- a/core/etc/config.json +++ b/core/etc/config.json @@ -14,5 +14,12 @@ "controller_port": "6653", "controller_name": "foobar", "adapter_port": "8080" + }, + "BusyLink_Detect": { + "ip": "localhost", + "port": "5567", + "controller_name": "foobar", + "interval": "5", + "version": "1.0" } } diff --git a/core/src/floodlight_modules/busylink_detect.py b/core/src/floodlight_modules/busylink_detect.py index 533688f..028e05e 100644 --- a/core/src/floodlight_modules/busylink_detect.py +++ b/core/src/floodlight_modules/busylink_detect.py @@ -4,181 +4,185 @@ logger = logging.getLogger(__name__) class BusyLink_Detect: - def __init__(self,core,parm): - """ BusyLinkDetect init""" - self.controllerIP = "localhost" - self.controllerPort = "8080" - self.timerInterval = 5 - - self.coreIP = "localhost" - self.corePort = "5567" - - self.baseState = 1 - self.finalState = 3 - self.threshold = 0.8 - self.statistics = {} - self.capacity = {} - self.links = {} - self.switches= {} - self.BLD_result = [] - - #load config - if(parm): - if(parm.has_key("ip")): - self.controllerIP = parm["ip"] - if(parm.has_key("port")): - self.controllerPort = parm["port"] - if(parm.has_key("interval")): - self.timerInterval = int(parm["interval"]) - logger.debug('IP =%s port = %s interval = %s' % (self.controllerIP,self.controllerPort,self.timerInterval)) - core.registerEvent("periodicQuery",self.periodicQuery,self.timerInterval) - core.registerEventHandler("periodicQuery", self.busyLinkDetect) - core.registerSSEHandler("busylink", self.busylinkHandler) - - def busylinkHandler(self): - data = {} - for i in range(len(self.BLD_result)): - data[i] = self.BLD_result[i] - return json.dumps(data) - - def overthreshold(self,link,id): - link['state'] += 1 - if link['state'] >= self.finalState: - link['state'] = self.finalState - self.BLD_result.append(id) - - def underthreshold(self,link): - link['state'] -= 1 - if link['state'] < self.baseState: - link['state'] = self.baseState - - def periodicQuery(self): - self.periodicQueryLink() - self.periodicQueryPort() - - def parsePortFeatures(self,features): - if features == 0: - return 0 - turn_binary = bin(features)[2:] - binary_len = len(turn_binary) - if binary_len < 12: - turn_binary = '0'*(12-binary_len) + turn_binary - - if turn_binary[5] == '1': - return 10*(1024**3)/8.0 #10Gb - if turn_binary[6] == '1' or turn_binary[7] == '1': - return 1024**3/8.0 #1Gb - if turn_binary[8] == '1' or turn_binary[9] == '1': - return 100*(1024**2)/8.0 #100Mb - if turn_binary[10] == '1' or turn_binary[11] == '1': - return 10*(1024**2)/8.0 #10Mb - return 0 - - def queryLinkCapacity(self): - try: - conn = httplib.HTTPConnection(self.controllerIP, int(self.controllerPort)) - conn.request("GET", "/wm/core/switch/all/features/json") - response = conn.getresponse().read() - except Exception, e: - logger.error("connection error for inquiring features: "+str(e)) - return - finally: - conn.close() - try: - data = json.loads(response) - self.capacity = {} - for switch_id in data: - switch = data[switch_id] - ports = switch['ports'] - for port in ports: - result = self.parsePortFeatures(port['currentFeatures']) - self.capacity["%s_%d" % (switch_id,port['portNumber'])] = result - except Exception, e: - logger.error("json parse error for features: "+str(e)) - - def periodicQueryLink(self): - try: - conn = httplib.HTTPConnection(self.controllerIP, int(self.controllerPort)) - conn.request("GET", "/wm/omniui/link/json") - response = conn.getresponse().read() - except Exception, e: - logger.error("connection error for inquiring links: "+str(e)) - return - finally: - conn.close() - try: - data = json.loads(response) - self.links = {} - for link in data: - tmp = {} - tmp['source'] = link['src-switch'] - tmp['target'] = link['dst-switch'] - tmp['sourcePort'] = link['src-port'] - tmp['targetPort'] = link['dst-port'] - id = "dpid %s, port %s -- dpid %s, port %s" % (tmp['source'],tmp['sourcePort'],tmp['target'],tmp['targetPort']) - self.links[id] = tmp - except Exception, e: - logger.error("json parse error for links: "+str(e)) - self.queryLinkCapacity() - - def periodicQueryPort(self): - try: - conn = httplib.HTTPConnection(self.controllerIP, int(self.controllerPort)) - conn.request("GET", "/wm/omniui/switch/json") - response = conn.getresponse().read() - except Exception, e: - logger.error("connection error for inquiring switches: "+str(e)) - return - finally: - conn.close() - try: - data = json.loads(response) - self.switches= {} - for switch in data: - tmp = {} - for port in switch['ports']: - tmp[int(port['port'])] = int(port['rxbyte']) - self.switches[switch['dpid']] = tmp - except Exception, e: - logger.error("json parse error for switch: "+str(e)) - - def busyLinkDetect(self,event): - self.BLD_result = [] - #calculate link's countBytes and capacity - for link_id in self.links: - link = self.links[link_id] - src = link['source'] - srcp = link['sourcePort'] - dest = link['target'] - destp = link['targetPort'] - total_bytes = self.switches[src][srcp] + self.switches[dest][destp] - link['countBytes'] = total_bytes - link['capacity'] = min(self.capacity["%s_%d" % (src,srcp)],self.capacity["%s_%d" % (dest,destp)]) - - #initialize self.statistics value - if len(self.statistics) == 0: - self.statistics = dict(self.links) - for link_id in self.statistics: - link = self.statistics[link_id] - link['state'] = self.baseState - - #check threshold - for link_id in self.links: - if link_id in self.statistics: - if (self.links[link_id]['countBytes'] - self.statistics[link_id]['countBytes']) / self.statistics[link_id]['capacity'] >= self.threshold: - self.overthreshold(self.statistics[link_id],link_id) - else: - self.underthreshold(self.statistics[link_id]) - self.statistics[link_id]['countBytes'] = self.links[link_id]['countBytes'] - else: - self.statistics[link_id] = dict(self.links[link_id]) - self.statistics[link_id]['state'] = self.baseState - #remove unexisted link info - for link_id in self.statistics: - if link_id not in self.links: - del self.statistics[link_id] - - #return result - if len(self.BLD_result)>0: - conn = httplib.HTTPConnection(self.coreIP,self.corePort) - conn.request('POST','/publish/busylink') + def __init__(self,core,parm): + """ BusyLinkDetect init""" + self.coreIP = "localhost" + self.corePort = "5567" + self.controllerName = "DEFAULT" + self.openflowVersion = "1.0" + + self.baseState = 1 + self.finalState = 3 + self.threshold = 0.000001 + self.statistics = {} + self.capacity = {} + self.links = {} + self.switches = {} + self.BLD_result = [] + + # Load config + if parm: + if parm.has_key("ip"): + self.coreIP = parm["ip"] + if parm.has_key("port"): + self.corePort = parm["port"] + if parm.has_key("controller_name"): + self.controllerName = parm["controller_name"] + if parm.has_key("version"): + self.openflowVersion = parm["version"] + + # Get link and port information from nwinfo + core.registerEventHandler("linkbag", self.getLink) + core.registerEventHandler("portbag", self.getPort) + + def getLink(self, linksbag): + #print '----------links----------' + #print json.dumps(linksbag.values()) + + try: + data = linksbag.values() + self.links = {} + for link in data: + tmp = {} + tmp['source'] = link[0]['dpid'] + tmp['target'] = link[1]['dpid'] + tmp['sourcePort'] = int(link[0]['port']) + tmp['targetPort'] = int(link[1]['port']) + id = "dpid %s, port %d -- dpid %s, port %d" % (tmp['source'], tmp['sourcePort'], tmp['target'], tmp['targetPort']) + self.links[id] = tmp + except Exception, e: + logger.error("json parse error for links: "+str(e)) + + def getPort(self, portsbag): + #print '----------ports----------' + #print json.dumps(portsbag.values()) + + try: + data = portsbag.values() + self.capacity = {} + self.switches = {} + for port in data: + if int(port['capacity']) == 0: + continue + if self.openflowVersion == "1.3": + self.capacity["%s_%d" % (port['dpid'],int(port['port']))] = self.parsePortFeatures_v1_3(int(port['capacity'])) + else: + self.capacity["%s_%d" % (port['dpid'],int(port['port']))] = self.parsePortFeatures_v1_0(int(port['capacity'])) + if port['dpid'] in self.switches: + self.switches[port['dpid']][int(port['port'])] = int(port['rxbyte']) + else: + self.switches[port['dpid']] = {} + self.switches[port['dpid']][int(port['port'])] = int(port['rxbyte']) + except Exception, e: + logger.error("json parse error for switch and features: "+str(e)) + + self.busyLinkDetect() + + def overthreshold(self,id): + self.statistics[id]['state'] += 1 + if self.statistics[id]['state'] >= self.finalState: + self.statistics[id]['state'] = self.finalState + self.BLD_result.append(id) + + def underthreshold(self,id): + self.statistics[id]['state'] -= 1 + if self.statistics[id]['state'] < self.baseState: + self.statistics[id]['state'] = self.baseState + + def parsePortFeatures_v1_0(self,features): + if features == 0: + return 0 + turn_binary = bin(features)[2:] + binary_len = len(turn_binary) + if binary_len < 12: + turn_binary = '0'*(12-binary_len) + turn_binary + + if turn_binary[5] == '1': + return 10*(1024**3)/8.0 #10Gb + if turn_binary[6] == '1' or turn_binary[7] == '1': + return 1024**3/8.0 #1Gb + if turn_binary[8] == '1' or turn_binary[9] == '1': + return 100*(1024**2)/8.0 #100Mb + if turn_binary[10] == '1' or turn_binary[11] == '1': + return 10*(1024**2)/8.0 #10Mb + return 0 + + def parsePortFeatures_v1_3(self,features): + if features == 0: + return 0 + turn_binary = bin(features)[2:] + binary_len = len(turn_binary) + if binary_len < 12: + turn_binary = '0'*(12-binary_len) + turn_binary + + if turn_binary[2] == '1': + return 1024**4/8.0 #1Tb + if turn_binary[3] == '1': + return 100*(1024**3)/8.0 #100Gb + if turn_binary[4] == '1': + return 40*(1024**3)/8.0 #40Gb + if turn_binary[5] == '1': + return 10*(1024**3)/8.0 #10Gb + if turn_binary[6] == '1' or turn_binary[7] == '1': + return 1024**3/8.0 #1Gb + if turn_binary[8] == '1' or turn_binary[9] == '1': + return 100*(1024**2)/8.0 #100Mb + if turn_binary[10] == '1' or turn_binary[11] == '1': + return 10*(1024**2)/8.0 #10Mb + return 0 + + def busyLinkDetect(self): + self.BLD_result = [] + # Calculate link's countBytes and capacity + for link_id in self.links: + src = self.links[link_id]['source'] + srcp = self.links[link_id]['sourcePort'] + dest = self.links[link_id]['target'] + destp = self.links[link_id]['targetPort'] + + # Check if needed information all arrived + if (src not in self.switches) or (dest not in self.switches): + print 'Not Ready' + return + elif (srcp not in self.switches[src]) or (destp not in self.switches[dest]): + print 'Not Ready' + return + + total_bytes = self.switches[src][srcp] + self.switches[dest][destp] + self.links[link_id]['countBytes'] = total_bytes + self.links[link_id]['capacity'] = min(self.capacity["%s_%d" % (src,srcp)],self.capacity["%s_%d" % (dest,destp)]) + + # Initialize self.statistics value + if len(self.statistics) == 0: + self.statistics = dict(self.links) + for link_id in self.statistics: + self.statistics[link_id]['state'] = self.baseState + + # Check threshold + for link_id in self.links: + if link_id in self.statistics: + if (self.links[link_id]['countBytes'] - self.statistics[link_id]['countBytes']) / self.statistics[link_id]['capacity'] >= self.threshold: + self.overthreshold(link_id) + else: + self.underthreshold(link_id) + self.statistics[link_id]['countBytes'] = self.links[link_id]['countBytes'] + else: + self.statistics[link_id] = dict(self.links[link_id]) + self.statistics[link_id]['state'] = self.baseState + # Remove unexisted link info + for link_id in self.statistics.copy(): + if link_id not in self.links: + del self.statistics[link_id] + + # Return result + if len(self.BLD_result) > 0: + print '*****Busy Link ID*****' + for i in range(len(self.BLD_result)): + data = {'link': [], 'controller': self.controllerName} + data['link'].append({'dpid': self.links[self.BLD_result[i]]['source'], 'port': self.links[self.BLD_result[i]]['sourcePort']}) + data['link'].append({'dpid': self.links[self.BLD_result[i]]['target'], 'port': self.links[self.BLD_result[i]]['targetPort']}) + print json.dumps(data) + else: + print 'No BusyLink' + #conn = httplib.HTTPConnection(self.coreIP,self.corePort) + #conn.request('POST','/publish/busylink') diff --git a/core/src/floodlight_modules/nwinfo.py b/core/src/floodlight_modules/nwinfo.py index 595cb6b..25524e4 100644 --- a/core/src/floodlight_modules/nwinfo.py +++ b/core/src/floodlight_modules/nwinfo.py @@ -48,6 +48,10 @@ def __registration(self, core): core.registerSSEHandler('delhost', self.delhostHandler) core.registerSSEHandler('port', self.portHandler) core.registerSSEHandler('flow', self.flowHandler) + + # Send link and port info for busylink detection + core.registerEvent('linkbag', self.sendLink, 5) + core.registerEvent('portbag', self.sendPort, 5) # websocket API for WebUI core.registerURLApi('port', self.getPortCounter) @@ -60,6 +64,12 @@ def __registration(self, core): logger.info('Handlers and RESTful APIs registered') + def sendLink(self): + return self.links + + def sendPort(self): + return self.portstats + def __trigger(self, controller_ip, controller_port, adapter_port, controller_name): """Trigger controller adapter to send events to us