Skip to content

Commit

Permalink
Fix type annotation and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
twiggler committed Nov 20, 2024
1 parent 336f2df commit f4dbcf9
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions dissect/target/plugins/os/unix/linux/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@
from dataclasses import dataclass, field
from datetime import datetime, timezone
from ipaddress import ip_address, ip_interface
from typing import Iterator, Literal, NamedTuple
from typing import TYPE_CHECKING, Any, Iterator, Literal, NamedTuple

from dissect.target import Target
from dissect.target.helpers import configutil
from dissect.target.helpers.record import UnixInterfaceRecord
from dissect.target.helpers.utils import to_list
from dissect.target.plugins.general.network import NetworkPlugin
from dissect.target.target import TargetPath

if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv6Address, IPv4Interface, IPv6Interface

Check warning on line 15 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L15

Added line #L15 was not covered by tests

from dissect.target import Target
from dissect.target.target import TargetPath

Check warning on line 18 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L17-L18

Added lines #L17 - L18 were not covered by tests

NetAddress = IPv4Address | IPv6Address
NetInterface = IPv4Interface | IPv6Interface

Check warning on line 21 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L20-L21

Added lines #L20 - L21 were not covered by tests


class LinuxNetworkPlugin(NetworkPlugin):
Expand Down Expand Up @@ -67,9 +74,9 @@ class ParserContext:
name: str | None = None
mac_address: str | None = None
type: str = ""
dns: set[ip_address] = field(default_factory=set)
ip_interfaces: set[ip_interface] = field(default_factory=set)
gateways: set[ip_address] = field(default_factory=set)
dns: set[NetAddress] = field(default_factory=set)
ip_interfaces: set[NetInterface] = field(default_factory=set)
gateways: set[NetAddress] = field(default_factory=set)
dhcp_ipv4: bool = False
dhcp_ipv6: bool = False
vlan: set[int] = field(default_factory=set)
Expand Down Expand Up @@ -153,7 +160,7 @@ def _parse_ip_section_key(
return

Check warning on line 160 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L160

Added line #L160 was not covered by tests

if key == "dns":
context.dns.update({ip_address(addr) for addr in trimmed.split(";") if addr})
context.dns.update(ip_address(addr) for addr in trimmed.split(";") if addr)
elif key.startswith("address"):
# Undocumented: single gateway on address line. Observed when running:
# nmcli connection add type ethernet ... ip4 192.168.2.138/24 gw4 192.168.2.1
Expand All @@ -172,7 +179,7 @@ def _parse_ip_section_key(
if gateway := self._parse_route(value):
context.gateways.add(gateway)

def _parse_vlan(self, sub_type: dict["str", any], vlan_id_by_interface: VlanIdByInterface) -> None:
def _parse_vlan(self, sub_type: dict[str, Any], vlan_id_by_interface: VlanIdByInterface) -> None:
parent_interface = sub_type.get("parent")
vlan_id = sub_type.get("id")
if not parent_interface or not vlan_id:
Expand Down Expand Up @@ -228,7 +235,8 @@ def _parse_virtual_networks(self) -> VlanIdByInterface:
vlan_ids = virtual_networks.setdefault(name, set())
vlan_ids.add(int(vlan_id))
except Exception as e:
self._target.log.warning("Error parsing virtual network config file %s", config_file, exc_info=e)
self._target.log.warning("Error parsing virtual network config file %s", config_file)
self._target.log.debug("", exc_info=e)

Check warning on line 239 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L237-L239

Added lines #L237 - L239 were not covered by tests

return virtual_networks

Expand All @@ -242,9 +250,9 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI
network_section: dict[str, str] = config.get("Network", {})
link_section: dict[str, str] = config.get("Link", {})

ip_interfaces: set[ip_interface] = set()
gateways: set[ip_address] = set()
dns: set[ip_address] = set()
ip_interfaces: set[NetInterface] = set()
gateways: set[NetAddress] = set()
dns: set[NetAddress] = set()
mac_addresses: set[str] = set()

if link_mac := link_section.get("MACAddress"):
Expand All @@ -253,13 +261,13 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI
mac_addresses.update(match_section.get("PermanentMACAddress", "").split())

dns_value = to_list(network_section.get("DNS", []))
dns.update({self._parse_dns_ip(dns_ip) for dns_ip in dns_value})
dns.update(self._parse_dns_ip(dns_ip) for dns_ip in dns_value)

address_value = to_list(network_section.get("Address", []))
ip_interfaces.update({ip_interface(addr) for addr in address_value})
ip_interfaces.update(ip_interface(addr) for addr in address_value)

gateway_value = to_list(network_section.get("Gateway", []))
gateways.update({ip_address(gateway) for gateway in gateway_value})
gateways.update(ip_address(gateway) for gateway in gateway_value)

vlan_ids: set[int] = set()
vlan_names = to_list(network_section.get("VLAN", []))
Expand All @@ -268,19 +276,19 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI
vlan_ids.update(ids)

# There are possibly multiple route sections, but they are collapsed into one by the parser.
route_section: dict[str, any] = config.get("Route", {})
route_section: dict[str, Any] = config.get("Route", {})
gateway_values = to_list(route_section.get("Gateway", []))
gateways.update(filter(None, map(self._parse_gateway, gateway_values)))

dhcp_ipv4, dhcp_ipv6 = self._parse_dhcp(network_section.get("DHCP"))

yield UnixInterfaceRecord(
source=str(config_file),
type=match_section.get("Type", None),
type=match_section.get("Type"),
enabled=None, # Unknown, dependent on run-time state
dhcp_ipv4=dhcp_ipv4,
dhcp_ipv6=dhcp_ipv6,
name=match_section.get("Name", None),
name=match_section.get("Name"),
dns=list(dns),
mac=list(mac_addresses),
ip=[interface.ip for interface in ip_interfaces],
Expand All @@ -290,20 +298,20 @@ def _parse_networks(self, virtual_networks: VlanIdByInterface) -> Iterator[UnixI
configurator="systemd-networkd",
)
except Exception as e:
self._target.log.warning("Error parsing network config file %s", config_file, exc_info=e)
self._target.log.warning("Error parsing network config file %s", config_file)
self._target.log.debug("", exc_info=e)

Check warning on line 302 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L300-L302

Added lines #L300 - L302 were not covered by tests

def _parse_dns_ip(self, address: str) -> ip_address:
def _parse_dns_ip(self, address: str) -> NetAddress:
"""Parse DNS address from systemd network configuration file.
The optional brackets and port number make this hard to parse.
See https://www.freedesktop.org/software/systemd/man/latest/systemd.network.html and search for DNS.
"""

match = self.dns_ip_patttern.search(address)
if not match:
raise ValueError(f"Invalid DNS address format: {address}")
if match := self.dns_ip_patttern.search(address):
return ip_address(match.group("withoutBrackets") or match.group("withBrackets"))

return ip_address(match.group("withoutBrackets") or match.group("withBrackets"))
raise ValueError(f"Invalid DNS address format: {address}")

Check warning on line 314 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L314

Added line #L314 was not covered by tests

def _parse_dhcp(self, value: str | None) -> DhcpConfig:
"""Parse DHCP value from systemd network configuration file to a named tuple (ipv4, ipv6)."""
Expand All @@ -319,7 +327,7 @@ def _parse_dhcp(self, value: str | None) -> DhcpConfig:

raise ValueError(f"Invalid DHCP value: {value}")

Check warning on line 328 in dissect/target/plugins/os/unix/linux/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/network.py#L328

Added line #L328 was not covered by tests

def _parse_gateway(self, value: str | None) -> ip_address | None:
def _parse_gateway(self, value: str | None) -> NetAddress | None:
if (not value) or (value in {"_dhcp4", "_ipv6ra"}):
return None

Expand Down

0 comments on commit f4dbcf9

Please sign in to comment.