From afde9ef05993b0d0fe67b2145e39611a77d56d4c Mon Sep 17 00:00:00 2001 From: Yufeng Duan <55268016+didovesei@users.noreply.github.com> Date: Wed, 28 Apr 2021 13:07:44 -0700 Subject: [PATCH] Traffic analyzer standalone entry (#848) --- daq/acl_state_collector.py | 10 +- daq/entry.py | 9 +- daq/proto/acl_counting_pb2.py | 75 +++++++------- daq/traffic_analyzer.py | 182 ++++++++++++++++++++++++++++++++++ daq/utils.py | 13 +++ daq/varz_state_collector.py | 2 +- firebase/public/protos.hash | 2 +- firebase/public/protos.html | 27 +++-- proto/acl_counting.proto | 15 ++- testing/test_mud.sh | 41 ++++---- testing/test_preamble.sh | 7 +- 11 files changed, 304 insertions(+), 79 deletions(-) create mode 100644 daq/traffic_analyzer.py diff --git a/daq/acl_state_collector.py b/daq/acl_state_collector.py index 31cd368bef..7cb66c056f 100644 --- a/daq/acl_state_collector.py +++ b/daq/acl_state_collector.py @@ -41,6 +41,11 @@ def _get_port_rule_counts(self, switch, port, acl_config, rule_samples): acl_config._id, rule_config.get('description')) continue + rule_description = rule_config.get('description') + if not rule_description: + LOGGER.error('Rule with cookie %s does not have a description', cookie_num) + continue + has_sample = False for sample in rule_samples: if str(sample.labels.get('cookie')) != str(cookie_num): @@ -49,14 +54,15 @@ def _get_port_rule_counts(self, switch, port, acl_config, rule_samples): continue if int(sample.labels.get('in_port')) != port: continue - rule_map = rules_map.setdefault(rule_config['description'], {}) + + rule_map = rules_map.setdefault(rule_description, {}) rule_map['packet_count'] = int(sample.value) has_sample = True break if not has_sample: error = (f'No ACL metric sample available for switch, port, ACL, rule: ' - f'{switch}, {port}, {acl_config._id}, {rule_config["description"]} ' + f'{switch}, {port}, {acl_config._id}, {rule_description} ' f'(cookie={cookie_num})') errors.append(error) LOGGER.error(error) diff --git a/daq/entry.py b/daq/entry.py index 7f1fd1d1d7..e98e3bbe04 100644 --- a/daq/entry.py +++ b/daq/entry.py @@ -100,13 +100,6 @@ def _stripped_alt_logger(self, level, msg, *args, **kwargs): ALT_LOG._log(level, stripped, *args, **kwargs) -def _write_pid_file(): - pid = os.getpid() - LOGGER.info('pid is %d', pid) - with open(_PID_FILE, 'w') as pid_file: - pid_file.write(str(pid)) - - def _execute(): daq = DAQ(sys.argv) configurator.print_config(daq.config) @@ -120,7 +113,7 @@ def _execute(): if not daq.validate_config(): return 1 - _write_pid_file() + utils.write_pid_file(_PID_FILE, LOGGER) signal.signal(signal.SIGINT, signal.default_int_handler) signal.signal(signal.SIGTERM, signal.default_int_handler) diff --git a/daq/proto/acl_counting_pb2.py b/daq/proto/acl_counting_pb2.py index a8715758a8..5ddf1061ed 100644 --- a/daq/proto/acl_counting_pb2.py +++ b/daq/proto/acl_counting_pb2.py @@ -19,29 +19,29 @@ syntax='proto3', serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x1c\x64\x61q/proto/acl_counting.proto\"r\n\tAclCounts\x12(\n\x07\x64\x65vices\x18\x01 \x03(\x0b\x32\x17.AclCounts.DevicesEntry\x1a;\n\x0c\x44\x65vicesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1a\n\x05value\x18\x02 \x01(\x0b\x32\x0b.RuleCounts:\x02\x38\x01\"}\n\nRuleCounts\x12%\n\x05rules\x18\x01 \x03(\x0b\x32\x16.RuleCounts.RulesEntry\x12\x0e\n\x06\x65rrors\x18\x02 \x03(\t\x1a\x38\n\nRulesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x19\n\x05value\x18\x02 \x01(\x0b\x32\n.RuleCount:\x02\x38\x01\"!\n\tRuleCount\x12\x14\n\x0cpacket_count\x18\x01 \x01(\x05\x62\x06proto3' + serialized_pb=b'\n\x1c\x64\x61q/proto/acl_counting.proto\"\xa6\x01\n\x10\x44\x65viceRuleCounts\x12?\n\x10\x64\x65vice_mac_rules\x18\x01 \x03(\x0b\x32%.DeviceRuleCounts.DeviceMacRulesEntry\x12\r\n\x05\x65rror\x18\x02 \x01(\t\x1a\x42\n\x13\x44\x65viceMacRulesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1a\n\x05value\x18\x02 \x01(\x0b\x32\x0b.RuleCounts:\x02\x38\x01\"}\n\nRuleCounts\x12%\n\x05rules\x18\x01 \x03(\x0b\x32\x16.RuleCounts.RulesEntry\x12\x0e\n\x06\x65rrors\x18\x02 \x03(\t\x1a\x38\n\nRulesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x19\n\x05value\x18\x02 \x01(\x0b\x32\n.RuleCount:\x02\x38\x01\"!\n\tRuleCount\x12\x14\n\x0cpacket_count\x18\x01 \x01(\x05\x62\x06proto3' ) -_ACLCOUNTS_DEVICESENTRY = _descriptor.Descriptor( - name='DevicesEntry', - full_name='AclCounts.DevicesEntry', +_DEVICERULECOUNTS_DEVICEMACRULESENTRY = _descriptor.Descriptor( + name='DeviceMacRulesEntry', + full_name='DeviceRuleCounts.DeviceMacRulesEntry', filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( - name='key', full_name='AclCounts.DevicesEntry.key', index=0, + name='key', full_name='DeviceRuleCounts.DeviceMacRulesEntry.key', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), _descriptor.FieldDescriptor( - name='value', full_name='AclCounts.DevicesEntry.value', index=1, + name='value', full_name='DeviceRuleCounts.DeviceMacRulesEntry.value', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, @@ -59,29 +59,36 @@ extension_ranges=[], oneofs=[ ], - serialized_start=87, - serialized_end=146, + serialized_start=133, + serialized_end=199, ) -_ACLCOUNTS = _descriptor.Descriptor( - name='AclCounts', - full_name='AclCounts', +_DEVICERULECOUNTS = _descriptor.Descriptor( + name='DeviceRuleCounts', + full_name='DeviceRuleCounts', filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( - name='devices', full_name='AclCounts.devices', index=0, + name='device_mac_rules', full_name='DeviceRuleCounts.device_mac_rules', index=0, number=1, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='error', full_name='DeviceRuleCounts.error', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], - nested_types=[_ACLCOUNTS_DEVICESENTRY, ], + nested_types=[_DEVICERULECOUNTS_DEVICEMACRULESENTRY, ], enum_types=[ ], serialized_options=None, @@ -90,8 +97,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=32, - serialized_end=146, + serialized_start=33, + serialized_end=199, ) @@ -129,8 +136,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=217, - serialized_end=273, + serialized_start=270, + serialized_end=326, ) _RULECOUNTS = _descriptor.Descriptor( @@ -167,8 +174,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=148, - serialized_end=273, + serialized_start=201, + serialized_end=326, ) @@ -199,35 +206,35 @@ extension_ranges=[], oneofs=[ ], - serialized_start=275, - serialized_end=308, + serialized_start=328, + serialized_end=361, ) -_ACLCOUNTS_DEVICESENTRY.fields_by_name['value'].message_type = _RULECOUNTS -_ACLCOUNTS_DEVICESENTRY.containing_type = _ACLCOUNTS -_ACLCOUNTS.fields_by_name['devices'].message_type = _ACLCOUNTS_DEVICESENTRY +_DEVICERULECOUNTS_DEVICEMACRULESENTRY.fields_by_name['value'].message_type = _RULECOUNTS +_DEVICERULECOUNTS_DEVICEMACRULESENTRY.containing_type = _DEVICERULECOUNTS +_DEVICERULECOUNTS.fields_by_name['device_mac_rules'].message_type = _DEVICERULECOUNTS_DEVICEMACRULESENTRY _RULECOUNTS_RULESENTRY.fields_by_name['value'].message_type = _RULECOUNT _RULECOUNTS_RULESENTRY.containing_type = _RULECOUNTS _RULECOUNTS.fields_by_name['rules'].message_type = _RULECOUNTS_RULESENTRY -DESCRIPTOR.message_types_by_name['AclCounts'] = _ACLCOUNTS +DESCRIPTOR.message_types_by_name['DeviceRuleCounts'] = _DEVICERULECOUNTS DESCRIPTOR.message_types_by_name['RuleCounts'] = _RULECOUNTS DESCRIPTOR.message_types_by_name['RuleCount'] = _RULECOUNT _sym_db.RegisterFileDescriptor(DESCRIPTOR) -AclCounts = _reflection.GeneratedProtocolMessageType('AclCounts', (_message.Message,), { +DeviceRuleCounts = _reflection.GeneratedProtocolMessageType('DeviceRuleCounts', (_message.Message,), { - 'DevicesEntry' : _reflection.GeneratedProtocolMessageType('DevicesEntry', (_message.Message,), { - 'DESCRIPTOR' : _ACLCOUNTS_DEVICESENTRY, + 'DeviceMacRulesEntry' : _reflection.GeneratedProtocolMessageType('DeviceMacRulesEntry', (_message.Message,), { + 'DESCRIPTOR' : _DEVICERULECOUNTS_DEVICEMACRULESENTRY, '__module__' : 'daq.proto.acl_counting_pb2' - # @@protoc_insertion_point(class_scope:AclCounts.DevicesEntry) + # @@protoc_insertion_point(class_scope:DeviceRuleCounts.DeviceMacRulesEntry) }) , - 'DESCRIPTOR' : _ACLCOUNTS, + 'DESCRIPTOR' : _DEVICERULECOUNTS, '__module__' : 'daq.proto.acl_counting_pb2' - # @@protoc_insertion_point(class_scope:AclCounts) + # @@protoc_insertion_point(class_scope:DeviceRuleCounts) }) -_sym_db.RegisterMessage(AclCounts) -_sym_db.RegisterMessage(AclCounts.DevicesEntry) +_sym_db.RegisterMessage(DeviceRuleCounts) +_sym_db.RegisterMessage(DeviceRuleCounts.DeviceMacRulesEntry) RuleCounts = _reflection.GeneratedProtocolMessageType('RuleCounts', (_message.Message,), { @@ -252,6 +259,6 @@ _sym_db.RegisterMessage(RuleCount) -_ACLCOUNTS_DEVICESENTRY._options = None +_DEVICERULECOUNTS_DEVICEMACRULESENTRY._options = None _RULECOUNTS_RULESENTRY._options = None # @@protoc_insertion_point(module_scope) diff --git a/daq/traffic_analyzer.py b/daq/traffic_analyzer.py new file mode 100644 index 0000000000..22fc6dfeb9 --- /dev/null +++ b/daq/traffic_analyzer.py @@ -0,0 +1,182 @@ +"""Traffic analysis module""" + +from __future__ import absolute_import + +import argparse +import json +import logging +import os +import signal +import sys +import threading +import time + +from acl_state_collector import AclStateCollector +import logger +from utils import dict_proto, proto_dict, write_pid_file +from varz_state_collector import VarzStateCollector + +from proto.acl_counting_pb2 import DeviceRuleCounts + +from faucet import config_parser +from forch.proto.devices_state_pb2 import DevicePlacement + +LOGGER = logger.get_logger('ta') +PID_FILE = 'inst/ta.pid' + + +class TrafficAnalyzer: + """Analyzing traffic statistics""" + + _RULE_COUNT_METRIC = 'flow_packet_count_port_acl' + _DEVICE_LEARNING_METRIC = 'learned_l2_port' + _SEC_SWITCH = 'sec' + _PERIODIC_TASKS_INTERVAL_SEC = 30 + + def __init__(self, device_specs_file, faucet_config_file): + self._device_specs_file = device_specs_file + self._faucet_config_file = faucet_config_file + self._acl_state_collector = AclStateCollector() + self._varz_state_collector = VarzStateCollector() + self._duts = set() + self._device_placements = {} + self._lock = threading.Lock() + self._initialized = False + self._run = False + + def initialize(self): + """Initialize internal data""" + self._reload_device_specs() + self._initialized = True + + def start(self): + """Start periodic tasks""" + assert self._initialized + self._run = True + threading.Thread(target=self._periodic_tasks, daemon=True).start() + + def stop(self): + """Stop periodic tasks""" + self._run = False + + def get_device_rule_counts(self): + """Return the rule counts for all the learned devices""" + with self._lock: + return dict_proto(self._get_device_rule_counts(), DeviceRuleCounts) + + def _get_device_rule_counts(self): + port_acl_metrics, error = self._get_rule_count_metric() + + if error: + LOGGER.error(error) + return {'error': error} + + device_rule_counts = {} + + for mac, device_placement in self._device_placements.items(): + rule_counts_map = device_rule_counts.setdefault('device_mac_rules', {}) + port_rule_counts = self._acl_state_collector.get_port_rule_counts( + device_placement.switch, device_placement.port, port_acl_metrics.samples) + rule_counts_map[mac] = proto_dict(port_rule_counts) + + return device_rule_counts + + def _get_rule_count_metric(self): + try: + metrics = self._varz_state_collector.retry_get_gauge_metrics([self._RULE_COUNT_METRIC]) + except Exception as e: + return None, str(e) + + rule_count_metric = metrics.get(self._RULE_COUNT_METRIC) + + if not rule_count_metric: + error = f'No {self._RULE_COUNT_METRIC} metric available' + return None, error + + return rule_count_metric, None + + def _reload_faucet_config(self): + _, _, dps_config, _ = config_parser.dp_parser(self._faucet_config_file, 'fconfig') + switches_config = {str(dp): dp for dp in dps_config} + self._acl_state_collector.update_switch_configs(switches_config) + + def _reload_device_specs(self): + with open(self._device_specs_file) as file: + device_specs = json.load(file) + self._duts = set(device_specs.get('macAddrs', {}).keys()) + LOGGER.info('Loaded %s devices', len(self._duts)) + + def _update_device_placements(self): + try: + metrics = self._varz_state_collector.retry_get_faucet_metrics( + [self._DEVICE_LEARNING_METRIC]) + except Exception as e: + LOGGER.error('Could not get %s metric: %s', self._DEVICE_LEARNING_METRIC, e) + + device_learning_metric = metrics.get(self._DEVICE_LEARNING_METRIC) + if not device_learning_metric: + LOGGER.info('No devices are learned') + + self._device_placements = {} + for sample in device_learning_metric.samples: + if sample.labels.get('dp_name') != self._SEC_SWITCH: + continue + + mac = sample.labels.get('eth_src') + if mac not in self._duts: + continue + + port = int(sample.value) + self._device_placements[mac] = DevicePlacement(switch=self._SEC_SWITCH, port=port) + + def _periodic_tasks(self): + if not self._run: + return + + with self._lock: + self._reload_faucet_config() + self._update_device_placements() + threading.Timer(self._PERIODIC_TASKS_INTERVAL_SEC, self._periodic_tasks).start() + + +def parse_args(raw_args): + """Parse sys args""" + parser = argparse.ArgumentParser(description='Varz collector') + parser.add_argument('-f', '--faucet-config', type=str, default='inst/faucet.yaml', + help='Faucet config file') + parser.add_argument('device_specs', type=str, help='Device specs file') + parser.add_argument('output_file', type=str, help='Output file for device rule counts') + return parser.parse_args(raw_args) + + +def main(): + """Entry point for standalone Traffic Analyzer""" + args = parse_args(sys.argv[1:]) + logging.basicConfig(level='INFO') + write_pid_file(PID_FILE, LOGGER) + signal.signal(signal.SIGINT, signal.default_int_handler) + + logging.info( + 'Initializing Traffic Analyzer with: %s, %s', args.device_specs, args.faucet_config) + + traffic_analyzer = TrafficAnalyzer(args.device_specs, args.faucet_config) + traffic_analyzer.initialize() + traffic_analyzer.start() + + logging.info('Periodically saving device rule counts to file %s.', args.output_file) + + try: + while True: + device_rule_counts = proto_dict(traffic_analyzer.get_device_rule_counts()) + with open(args.output_file, 'w') as file: + json.dump(device_rule_counts, file) + time.sleep(30) + except KeyboardInterrupt: + logging.info('Keyboard interrupt. Exiting.') + + traffic_analyzer.stop() + os.remove(PID_FILE) + + +if __name__ == '__main__': + main() diff --git a/daq/utils.py b/daq/utils.py index 6bb093d484..8507b0536f 100644 --- a/daq/utils.py +++ b/daq/utils.py @@ -1,5 +1,9 @@ """Utility functions for DAQ""" +from __future__ import absolute_import + +import os + from google.protobuf import json_format import yaml @@ -34,3 +38,12 @@ def proto_json(message): def dict_proto(message, proto_func, ignore_unknown_fields=False): """Convert a standard dict object to a proto object""" return json_format.ParseDict(message, proto_func(), ignore_unknown_fields) + + +def write_pid_file(pid_file, logger=None): + """Write the PID of current process to file""" + pid = os.getpid() + if logger: + logger.info('Writing pid %d to file %s', pid, pid_file) + with open(pid_file, 'w') as file: + file.write(str(pid)) diff --git a/daq/varz_state_collector.py b/daq/varz_state_collector.py index 1281bcda3b..4ebabb85f0 100644 --- a/daq/varz_state_collector.py +++ b/daq/varz_state_collector.py @@ -23,7 +23,7 @@ class VarzStateCollector: METRIC_RETRY_INTERVAL_SEC = 3 - def __init__(self, varz_address, faucet_varz_port, gauge_varz_port): + def __init__(self, varz_address=None, faucet_varz_port=None, gauge_varz_port=None): endpoint_address = varz_address or DEFAULT_VARZ_ADDRESS self._faucet_varz_endpoint = ( f'http://{endpoint_address}:{faucet_varz_port or DEFAULT_FAUCET_VARZ_PORT}') diff --git a/firebase/public/protos.hash b/firebase/public/protos.hash index 73b414fd28..3e636df4f6 100644 --- a/firebase/public/protos.hash +++ b/firebase/public/protos.hash @@ -1,3 +1,3 @@ -47f095e49f2bceb48fdc25fc55a80e8f57c1583d proto/acl_counting.proto +56713c6cdd7b93379f964971d5ba18c18ee89cc5 proto/acl_counting.proto a12e531019407b1d02b603dc5b5f38d9e331ed16 proto/session_server.proto d4696b72047d5af67131529acdc53522b8937df8 proto/system_config.proto diff --git a/firebase/public/protos.html b/firebase/public/protos.html index 13324b6b5b..35594509ab 100644 --- a/firebase/public/protos.html +++ b/firebase/public/protos.html @@ -179,11 +179,11 @@
ACL counts for devices
+Rule counts for devices
devices | -AclCounts.DevicesEntry | +device_mac_rules | +DeviceRuleCounts.DeviceMacRulesEntry | repeated | -List of ACL counts indexed by device MAC addresses |
+ List of Rule counts indexed by device MAC addresses |
+
error | +string | ++ | error that is found when getting the rule counts for the devices |
errors | string | repeated | -
|
+ errors that are found when analyzing the rule counts of a device |
diff --git a/proto/acl_counting.proto b/proto/acl_counting.proto
index 30b9fb46d1..3abf3feffb 100644
--- a/proto/acl_counting.proto
+++ b/proto/acl_counting.proto
@@ -4,19 +4,24 @@
syntax = "proto3";
/*
- * ACL counts for devices
+ * Rule counts for devices
*/
-message AclCounts {
- // List of ACL counts indexed by device MAC addresses
- map