diff --git a/scapy/ansmachine.py b/scapy/ansmachine.py index 3c5cb865d33..b95a552ddfe 100644 --- a/scapy/ansmachine.py +++ b/scapy/ansmachine.py @@ -11,6 +11,8 @@ # Answering machines # ######################## +from __future__ import annotations + import abc import functools import threading @@ -26,10 +28,8 @@ from typing import ( Any, Callable, - Dict, Generic, Optional, - Tuple, Type, TypeVar, cast, @@ -39,12 +39,10 @@ class ReferenceAM(type): - def __new__(cls, - name, # type: str - bases, # type: Tuple[type, ...] - dct # type: Dict[str, Any] - ): - # type: (...) -> Type['AnsweringMachine[_T]'] + def __new__(cls, name: str, + bases: tuple[type, ...], + dct: dict[str, Any] + ) -> type[AnsweringMachine[_T]]: obj = cast('Type[AnsweringMachine[_T]]', super(ReferenceAM, cls).__new__(cls, name, bases, dct)) try: @@ -69,55 +67,52 @@ def __new__(cls, class AnsweringMachine(Generic[_T], metaclass=ReferenceAM): function_name = "" - filter = None # type: Optional[str] - sniff_options = {"store": 0} # type: Dict[str, Any] + filter: str | None = None + sniff_options: dict[str, Any] = {"store": 0} sniff_options_list = ["store", "iface", "count", "promisc", "filter", "type", "prn", "stop_filter", "opened_socket"] - send_options = {"verbose": 0} # type: Dict[str, Any] + send_options: dict[str, Any] = {"verbose": 0} send_options_list = ["iface", "inter", "loop", "verbose", "socket"] send_function = staticmethod(sendp) - def __init__(self, **kargs): - # type: (Any) -> None + def __init__(self, **kargs: Any) -> None: self.mode = 0 self.verbose = kargs.get("verbose", conf.verb >= 0) if self.filter: kargs.setdefault("filter", self.filter) kargs.setdefault("prn", self.reply) - self.optam1 = {} # type: Dict[str, Any] - self.optam2 = {} # type: Dict[str, Any] - self.optam0 = {} # type: Dict[str, Any] + self.optam1: dict[str, Any] = {} + self.optam2: dict[str, Any] = {} + self.optam0: dict[str, Any] = {} doptsend, doptsniff = self.parse_all_options(1, kargs) self.defoptsend = self.send_options.copy() self.defoptsend.update(doptsend) self.defoptsniff = self.sniff_options.copy() self.defoptsniff.update(doptsniff) - self.optsend = {} # type: Dict[str, Any] - self.optsniff = {} # type: Dict[str, Any] + self.optsend: dict[str, Any] = {} + self.optsniff: dict[str, Any] = {} - def __getattr__(self, attr): - # type: (str) -> Any + def __getattr__(self, attr: str) -> Any: for dct in [self.optam2, self.optam1]: if attr in dct: return dct[attr] raise AttributeError(attr) - def __setattr__(self, attr, val): - # type: (str, Any) -> None + def __setattr__(self, attr: str, val: Any) -> None: mode = self.__dict__.get("mode", 0) if mode == 0: self.__dict__[attr] = val else: [self.optam1, self.optam2][mode - 1][attr] = val - def parse_options(self): - # type: () -> None + def parse_options(self) -> None: pass - def parse_all_options(self, mode, kargs): - # type: (int, Any) -> Tuple[Dict[str, Any], Dict[str, Any]] - sniffopt = {} # type: Dict[str, Any] - sendopt = {} # type: Dict[str, Any] + def parse_all_options( + self, mode: int, kargs: Any + ) -> tuple[dict[str, Any], dict[str, Any]]: + sniffopt: dict[str, Any] = {} + sendopt: dict[str, Any] = {} for k in list(kargs): # use list(): kargs is modified in the loop if k in self.sniff_options_list: sniffopt[k] = kargs[k] @@ -139,32 +134,34 @@ def parse_all_options(self, mode, kargs): self.__dict__["mode"] = omode return sendopt, sniffopt - def is_request(self, req): - # type: (Packet) -> int + def is_request(self, req: Packet) -> int: return 1 @abc.abstractmethod - def make_reply(self, req): - # type: (Packet) -> _T + def make_reply(self, req: Packet) -> _T: pass - def send_reply(self, reply, send_function=None): - # type: (_T, Optional[Callable[..., None]]) -> None + def send_reply( + self, reply: _T, send_function: Callable[..., None] | None = None + ) -> None: if send_function: send_function(reply) else: self.send_function(reply, **self.optsend) - def print_reply(self, req, reply): - # type: (Packet, _T) -> None + def print_reply(self, req: Packet, reply: _T) -> None: if isinstance(reply, PacketList): print("%s ==> %s" % (req.summary(), [res.summary() for res in reply])) else: print("%s ==> %s" % (req.summary(), reply.summary())) - def reply(self, pkt, send_function=None, address=None): - # type: (Packet, Optional[Callable[..., None]], Optional[Any]) -> None + def reply( + self, + pkt: Packet, + send_function: Callable[..., None] | None = None, + address: Any | None = None + ) -> None: if not self.is_request(pkt): return if address: @@ -182,22 +179,19 @@ def reply(self, pkt, send_function=None, address=None): if self.verbose: self.print_reply(pkt, reply) - def run(self, *args, **kargs): - # type: (Any, Any) -> None + def run(self, *args: Any, **kargs: Any) -> None: warnings.warn( "run() method deprecated. The instance is now callable", DeprecationWarning ) self(*args, **kargs) - def bg(self, *args, **kwargs): - # type: (Any, Any) -> AsyncSniffer + def bg(self, *args: Any, **kwargs: Any) -> AsyncSniffer: kwargs.setdefault("bg", True) self(*args, **kwargs) return self.sniffer - def __call__(self, *args, **kargs): - # type: (Any, Any) -> None + def __call__(self, *args: Any, **kargs: Any) -> None: bg = kargs.pop("bg", False) optsend, optsniff = self.parse_all_options(2, kargs) self.optsend = self.defoptsend.copy() @@ -213,12 +207,10 @@ def __call__(self, *args, **kargs): except KeyboardInterrupt: print("Interrupted by user") - def sniff(self): - # type: () -> None + def sniff(self) -> None: sniff(**self.optsniff) - def sniff_bg(self): - # type: () -> None + def sniff_bg(self) -> None: self.sniffer = AsyncSniffer(**self.optsniff) self.sniffer.start() @@ -230,17 +222,14 @@ class AnsweringMachineTCP(AnsweringMachine[Packet]): """ TYPE = socket.SOCK_STREAM - def parse_options(self, port=80, cls=conf.raw_layer): - # type: (int, Type[Packet]) -> None + def parse_options(self, port: int = 80, cls: type[Packet] = conf.raw_layer) -> None: self.port = port self.cls = cls - def close(self): - # type: () -> None + def close(self) -> None: pass - def sniff(self): - # type: () -> None + def sniff(self) -> None: from scapy.supersocket import StreamSocket ssock = socket.socket(socket.AF_INET, self.TYPE) try: @@ -274,13 +263,11 @@ def sniff(self): self.close() ssock.close() - def sniff_bg(self): - # type: () -> None + def sniff_bg(self) -> None: self.sniffer = threading.Thread(target=self.sniff) # type: ignore self.sniffer.start() - def make_reply(self, req, address=None): - # type: (Packet, Optional[Any]) -> Packet + def make_reply(self, req: Packet, address: Any | None = None) -> Packet: return req