Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed analyzer.py to display collected flows whose version is not 9 #46

Open
wants to merge 1 commit into
base: release
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 46 additions & 22 deletions netflow/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def __init__(self, flow1, flow2):

# Assume the size that sent the most data is the source
# TODO: this might not always be right, maybe use earlier timestamp?
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS'])
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS'])
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS', "octetDeltaCount"])
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS', "octetDeltaCount"])
if size1 >= size2:
src = flow1
dest = flow2
Expand All @@ -120,12 +120,24 @@ def __init__(self, flow1, flow2):
ips = self.get_ips(src)
self.src = ips.src
self.dest = ips.dest
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT'])
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT'])
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS'])
self.proto = fallback(src, ['PROTOCOL', 'PROTO', 'protocolIdentifier'])
# ICMP and ICMPv6 does not include port fields
if self.proto == 1 or self.proto == 58:
self.src_port = 0
# ICMP field is treated as destination port
try:
self.dest_port = fallback(dest, ['ICMP_TYPE', 'icmpTypeCodeIPv4', 'icmpTypeCodeIPv6'])
except:
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT', 'destinationTransportPort'])
else:
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT', 'sourceTransportPort'])
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT', 'destinationTransportPort'])
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS', 'octetDeltaCount'])

# Duration is given in milliseconds
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']
lastSwitched = fallback(src, ['LAST_SWITCHED', 'flowEndSysUpTime'])
firstSwitched = fallback(src, ['FIRST_SWITCHED', 'flowStartSysUpTime'])
self.duration = lastSwitched - firstSwitched
if self.duration < 0:
# 32 bit int has its limits. Handling overflow here
# TODO: Should be handled in the collection phase
Expand All @@ -139,16 +151,17 @@ def __repr__(self):
def get_ips(flow):
# IPv4
if flow.get('IP_PROTOCOL_VERSION') == 4 or \
'sourceIPv4Address' in flow or 'destinationIPv4Address' in flow or\
'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow:
return Pair(
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
ipaddress.ip_address(flow['IPV4_DST_ADDR'])
ipaddress.ip_address(fallback(flow, ['IPV4_SRC_ADDR', 'sourceIPv4Address'])),
ipaddress.ip_address(fallback(flow, ['IPV4_DST_ADDR', 'destinationIPv4Address']))
)

# IPv6
return Pair(
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
ipaddress.ip_address(flow['IPV6_DST_ADDR'])
ipaddress.ip_address(fallback(flow, ['IPV6_SRC_ADDR', 'sourceIPv6Address'])),
ipaddress.ip_address(fallback(flow, ['IPV6_DST_ADDR', 'destinationIPv6Address']))
)

@property
Expand Down Expand Up @@ -179,7 +192,13 @@ def service(self):

@property
def total_packets(self):
return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"]
src_flow_packets = fallback(
self.src_flow, ["IN_PKTS", "IN_PACKETS", "packetDeltaCount"]
)
dest_flow_packets = fallback(
self.dest_flow, ["IN_PKTS", "IN_PACKETS", "packetDeltaCount"]
)
return src_flow_packets + dest_flow_packets


if __name__ == "netflow.analyzer":
Expand Down Expand Up @@ -236,10 +255,6 @@ def total_packets(self):
logger.error("No header dict in entry {}".format(ts))
raise ValueError

if entry[ts]["header"]["version"] == 10:
logger.warning("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented")
continue

data[ts] = entry[ts]

# Go through data and dissect every flow saved inside the dump
Expand All @@ -258,20 +273,29 @@ def total_packets(self):
client = data[key]["client"]
flows = data[key]["flows"]

for flow in sorted(flows, key=lambda x: x["FIRST_SWITCHED"]):
first_switched = flow["FIRST_SWITCHED"]
for flow in sorted(flows,
key=lambda x:fallback(x,
["FIRST_SWITCHED", "flowStartSysUpTime", "systemInitTimeMilliseconds"],
),
):
if "systemInitTimeMilliseconds" in flow:
# systemInitTimeMilliseconds exists in only option data record
continue
first_switched = fallback(flow, ["FIRST_SWITCHED", "flowStartSysUpTime"])

if first_switched - 1 in pending:
# TODO: handle fitting, yet mismatching (here: 1 second) pairs
pass

# Find the peer for this connection
if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4:
local_peer = flow["IPV4_SRC_ADDR"]
remote_peer = flow["IPV4_DST_ADDR"]
if ("IPV4_SRC_ADDR" in flow or "sourceIPv4Address" in flow
or fallback(flow, ["IP_PROTOCOL_VERSION", "ipVersion"]) == 4
):
local_peer = fallback(flow, ["IPV4_SRC_ADDR", "sourceIPv4Address"])
remote_peer = fallback(flow, ["IPV4_DST_ADDR", "destinationIPv4Address"])
else:
local_peer = flow["IPV6_SRC_ADDR"]
remote_peer = flow["IPV6_DST_ADDR"]
local_peer = fallback(flow, ["IPV6_SRC_ADDR", "sourceIPv6Address"])
remote_peer = fallback(flow, ["IPV6_DST_ADDR", "destinationIPv6Address"])

# Match on host filter passed in as argument
if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]):
Expand Down