diff --git a/src/navv/bll.py b/src/navv/bll.py index df0a5cc..aa3ff75 100644 --- a/src/navv/bll.py +++ b/src/navv/bll.py @@ -159,3 +159,46 @@ def get_snmp_df(zeek_data: list): "community", ], ) + +@timeit +def get_mac_df(zeek_df: pd.DataFrame): + smac_df = zeek_df[ + [ + "src_mac", + "src_ip", + ] + ].reset_index(drop=True) + + dmac_df = zeek_df[ + [ + "dst_mac", + "dst_ip", + ] + ].reset_index(drop=True) + + smac_df = smac_df.rename(columns={'src_mac': 'mac', 'src_ip': 'ip'}) + dmac_df = dmac_df.rename(columns={'dst_mac': 'mac', 'dst_ip': 'ip'}) + mac_df = smac_df._append(dmac_df, ignore_index=True) + mac_df = mac_df.groupby('mac')['ip'].apply(list).reset_index(name='associated_ip') + + for index, row in enumerate(mac_df.to_dict(orient="records"), start=0): + # Source IPs - Need to get unique values + ips = set(row["associated_ip"]) + list_ips = (list(ips)) + if len(list_ips) > 1: + ip_list = ', '.join([str(item) for item in list_ips]) + + else: + ip_list = list_ips[0] + + mac_df.at[index, 'associated_ip'] = ip_list + + # Source Manufacturer column + mac_vendors = {} + with open(MAC_VENDORS_JSON_FILE) as f: + mac_vendors = json.load(f) + mac_df["vendor"] = mac_df["mac"].apply( + lambda mac: get_mac_vendor(mac_vendors, mac) + ) + + return mac_df \ No newline at end of file diff --git a/src/navv/commands.py b/src/navv/commands.py index fc912bf..a939ebc 100644 --- a/src/navv/commands.py +++ b/src/navv/commands.py @@ -8,7 +8,7 @@ # cisagov Libraries from navv.gui.app import app -from navv.bll import get_inventory_report_df, get_snmp_df, get_zeek_df +from navv.bll import get_inventory_report_df, get_snmp_df, get_zeek_df, get_mac_df from navv.message_handler import success_msg, warning_msg from navv.spreadsheet_tools import ( auto_adjust_width, @@ -24,6 +24,7 @@ write_snmp_sheet, write_stats_sheet, write_unknown_internals_sheet, + write_mac_sheet, ) from navv.zeek import ( get_conn_data, @@ -90,6 +91,7 @@ def generate(customer_name, output_dir, pcap, zeek_logs): # Get inventory report dataframe inventory_df = get_inventory_report_df(zeek_df) + mac_df = get_mac_df(zeek_df) # Turn zeekcut data into rows for spreadsheet rows = create_analysis_array(zeek_data, timer=timer_data) @@ -118,6 +120,8 @@ def generate(customer_name, output_dir, pcap, zeek_logs): write_snmp_sheet(snmp_df, wb) + write_mac_sheet(mac_df, wb) + auto_adjust_width(wb["Analysis"]) times = ( diff --git a/src/navv/spreadsheet_tools.py b/src/navv/spreadsheet_tools.py index e17ecbb..8b4b87c 100644 --- a/src/navv/spreadsheet_tools.py +++ b/src/navv/spreadsheet_tools.py @@ -14,6 +14,7 @@ import openpyxl import openpyxl.styles +from openpyxl.styles import Alignment from openpyxl.worksheet.table import Table import netaddr from tqdm import tqdm @@ -478,6 +479,31 @@ def write_stats_sheet(wb, stats): stats_sheet[f"{string.ascii_uppercase[col_index]}2"].value = stats[stat] auto_adjust_width(stats_sheet) +def write_mac_sheet(mac_df, wb): + """Fill spreadsheet with MAC address -> IP address translation with manufacturer information""" + sheet = make_sheet(wb, "MAC", idx=4) + sheet.append( + ["MAC", "Manufacturer", "IPs"] + ) + for index, row in enumerate(mac_df.to_dict(orient="records"), start=2): + # Source MAC column + sheet[f"A{index}"].value = row["mac"] + + # Source Manufacturer column + sheet[f"B{index}"].value = row["vendor"] + + # Source IPs + sheet[f"C{index}"].value = row["associated_ip"] + if len(row["associated_ip"]) > 16: + sel_cell = sheet[f"C{index}"] + sel_cell.alignment = Alignment(wrap_text=True) + est_row_hght = int(len(row["associated_ip"])/50) + if est_row_hght < 1: + est_row_hght = 1 + sheet.row_dimensions[index].height = est_row_hght * 15 + + auto_adjust_width(sheet) + sheet.column_dimensions["C"].width = 39 * 1.2 def make_sheet(wb, sheet_name, idx=None): """Create the sheet if it doesn't already exist otherwise remove it and recreate it"""