-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathpacket_handlers.py
144 lines (123 loc) · 4.84 KB
/
packet_handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from scapy.layers.dhcp import DHCP
from scapy.layers.dns import DNSQR, DNSRR
from scapy.layers.inet import ICMP, TCP, UDP
from scapy.layers.l2 import ARP
from scapy.layers.llmnr import LLMNRQuery, LLMNRResponse
from scapy.layers.netbios import NBNSQueryRequest, NBNSQueryResponse
from scapy.layers.snmp import SNMP
from scapy.packet import Raw
from utils.packet_utils import print_packet_details
def handle_arp(packet):
"""
Processes and logs details of ARP packets.
:param packet: The packet to be processed, expected to be an ARP packet.
"""
if not packet.haslayer(ARP):
return
details = {
"Operation": "Request" if packet[ARP].op == 1 else "Reply",
"Source IP": packet[ARP].psrc,
"Destination IP": packet[ARP].pdst,
"Source MAC": packet[ARP].hwsrc,
"Destination MAC": packet[ARP].hwdst
}
print_packet_details("ARP", details)
def handle_icmp(packet):
"""
Processes and logs details of ICMP packets.
:param packet: The packet to be processed, expected to be an ICMP packet.
"""
if not packet.haslayer(ICMP):
return
details = {
"Type": packet[ICMP].type,
"Code": packet[ICMP].code
}
print_packet_details("ICMP", details)
def handle_tcp(packet):
"""
Processes and logs details of TCP packets, including source and destination ports, and flags.
:param packet: The packet to be processed, expected to be a TCP packet.
"""
if not packet.haslayer(TCP):
return
details = {
"Source Port": packet[TCP].sport,
"Destination Port": packet[TCP].dport,
"Flags": packet[TCP].flags
}
print_packet_details("TCP", details)
def handle_udp(packet):
"""
Processes and logs details of UDP packets, including source and destination ports.
:param packet: The packet to be processed, expected to be a UDP packet.
"""
if not packet.haslayer(UDP):
return
details = {
"Source Port": packet[UDP].sport,
"Destination Port": packet[UDP].dport
}
print_packet_details("UDP", details)
def handle_dns(packet):
"""
Processes and logs details of DNS packets. Differentiates between DNS queries and responses.
:param packet: The packet to be processed, expected to be a DNS packet.
"""
if packet.haslayer(DNSQR):
details = {"Query Name": packet[DNSQR].qname.decode('utf-8')}
print_packet_details("DNS Request", details)
elif packet.haslayer(DNSRR):
details = {"Response Name": packet[DNSRR].rrname.decode('utf-8')}
print_packet_details("DNS Response", details)
def handle_dhcp(packet):
"""
Processes and logs details of DHCP packets, including DHCP options.
:param packet: The packet to be processed, expected to be a DHCP packet.
"""
if not packet.haslayer(DHCP):
return
details = {option[0]: option[1] for option in packet[DHCP].options if isinstance(option, tuple)}
print_packet_details("DHCP", details)
def handle_http(packet):
"""
Processes and logs details of HTTP packets, particularly focusing on the payload.
:param packet: The packet to be processed, expected to contain HTTP data.
"""
if packet.haslayer(Raw):
details = {"Payload": packet[Raw].load.decode(errors='replace')}
print_packet_details("HTTP", details)
def handle_snmp(packet):
"""
Processes and logs details of SNMP packets, including version and community string.
:param packet: The packet to be processed, expected to be an SNMP packet.
"""
if not packet.haslayer(SNMP):
return
details = {
"Version": packet[SNMP].version,
"Community": packet[SNMP].community.decode('utf-8')
}
print_packet_details("SNMP", details)
def handle_llmnr(packet):
"""
Processes and logs details of LLMNR packets, differentiating between queries and responses.
:param packet: The packet to be processed, expected to be an LLMNR packet.
"""
if packet.haslayer(LLMNRQuery):
details = {"Query Name": packet[LLMNRQuery].qname.decode('utf-8')}
print_packet_details("LLMNR Query", details)
elif packet.haslayer(LLMNRResponse):
details = {"Response Name": packet[LLMNRResponse].rrname.decode('utf-8')}
print_packet_details("LLMNR Response", details)
def handle_netbios(packet):
"""
Processes and logs details of NetBIOS packets, differentiating between query requests and responses.
:param packet: The packet to be processed, expected to be a NetBIOS packet.
"""
if packet.haslayer(NBNSQueryRequest):
details = {"NetBIOS Name": packet[NBNSQueryRequest].QUESTION_NAME}
print_packet_details("NetBIOS Query Request", details)
elif packet.haslayer(NBNSQueryResponse):
details = {"NetBIOS Name": packet[NBNSQueryResponse].RR_NAME}
print_packet_details("NetBIOS Query Response", details)