Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HINTY] Modernise typing in scapy/ansmachine.py #4650

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 45 additions & 58 deletions scapy/ansmachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# Answering machines #
########################

from __future__ import annotations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer support Python 2, so this is not wanted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review!

This is still necessary if you want to use modern type hints such as dict[str] instead of Dict[str] https://peps.python.org/pep-0585/

I can revert to Dict if you prefer, it just requires an extra from typing import Dict


import abc
import functools
import threading
Expand All @@ -26,10 +28,8 @@
from typing import (
Any,
Callable,
Dict,
Generic,
Optional,
Tuple,
Type,
TypeVar,
cast,
Expand All @@ -39,12 +39,10 @@


class ReferenceAM(type):
def __new__(cls,
name, # type: str
bases, # type: Tuple[type, ...]
dct # type: Dict[str, Any]
):
# type: (...) -> Type['AnsweringMachine[_T]']
def __new__(cls, name: str,
bases: tuple[type, ...],
dct: dict[str, Any]
) -> type[AnsweringMachine[_T]]:
obj = cast('Type[AnsweringMachine[_T]]',
super(ReferenceAM, cls).__new__(cls, name, bases, dct))
try:
Expand All @@ -69,55 +67,52 @@ def __new__(cls,

class AnsweringMachine(Generic[_T], metaclass=ReferenceAM):
function_name = ""
filter = None # type: Optional[str]
sniff_options = {"store": 0} # type: Dict[str, Any]
filter: str | None = None
sniff_options: dict[str, Any] = {"store": 0}
sniff_options_list = ["store", "iface", "count", "promisc", "filter",
"type", "prn", "stop_filter", "opened_socket"]
send_options = {"verbose": 0} # type: Dict[str, Any]
send_options: dict[str, Any] = {"verbose": 0}
send_options_list = ["iface", "inter", "loop", "verbose", "socket"]
send_function = staticmethod(sendp)

def __init__(self, **kargs):
# type: (Any) -> None
def __init__(self, **kargs: Any) -> None:
self.mode = 0
self.verbose = kargs.get("verbose", conf.verb >= 0)
if self.filter:
kargs.setdefault("filter", self.filter)
kargs.setdefault("prn", self.reply)
self.optam1 = {} # type: Dict[str, Any]
self.optam2 = {} # type: Dict[str, Any]
self.optam0 = {} # type: Dict[str, Any]
self.optam1: dict[str, Any] = {}
self.optam2: dict[str, Any] = {}
self.optam0: dict[str, Any] = {}
doptsend, doptsniff = self.parse_all_options(1, kargs)
self.defoptsend = self.send_options.copy()
self.defoptsend.update(doptsend)
self.defoptsniff = self.sniff_options.copy()
self.defoptsniff.update(doptsniff)
self.optsend = {} # type: Dict[str, Any]
self.optsniff = {} # type: Dict[str, Any]
self.optsend: dict[str, Any] = {}
self.optsniff: dict[str, Any] = {}

def __getattr__(self, attr):
# type: (str) -> Any
def __getattr__(self, attr: str) -> Any:
for dct in [self.optam2, self.optam1]:
if attr in dct:
return dct[attr]
raise AttributeError(attr)

def __setattr__(self, attr, val):
# type: (str, Any) -> None
def __setattr__(self, attr: str, val: Any) -> None:
mode = self.__dict__.get("mode", 0)
if mode == 0:
self.__dict__[attr] = val
else:
[self.optam1, self.optam2][mode - 1][attr] = val

def parse_options(self):
# type: () -> None
def parse_options(self) -> None:
pass

def parse_all_options(self, mode, kargs):
# type: (int, Any) -> Tuple[Dict[str, Any], Dict[str, Any]]
sniffopt = {} # type: Dict[str, Any]
sendopt = {} # type: Dict[str, Any]
def parse_all_options(
self, mode: int, kargs: Any
) -> tuple[dict[str, Any], dict[str, Any]]:
sniffopt: dict[str, Any] = {}
sendopt: dict[str, Any] = {}
for k in list(kargs): # use list(): kargs is modified in the loop
if k in self.sniff_options_list:
sniffopt[k] = kargs[k]
Expand All @@ -139,32 +134,34 @@ def parse_all_options(self, mode, kargs):
self.__dict__["mode"] = omode
return sendopt, sniffopt

def is_request(self, req):
# type: (Packet) -> int
def is_request(self, req: Packet) -> int:
return 1

@abc.abstractmethod
def make_reply(self, req):
# type: (Packet) -> _T
def make_reply(self, req: Packet) -> _T:
pass

def send_reply(self, reply, send_function=None):
# type: (_T, Optional[Callable[..., None]]) -> None
def send_reply(
self, reply: _T, send_function: Callable[..., None] | None = None
) -> None:
if send_function:
send_function(reply)
else:
self.send_function(reply, **self.optsend)

def print_reply(self, req, reply):
# type: (Packet, _T) -> None
def print_reply(self, req: Packet, reply: _T) -> None:
if isinstance(reply, PacketList):
print("%s ==> %s" % (req.summary(),
[res.summary() for res in reply]))
else:
print("%s ==> %s" % (req.summary(), reply.summary()))

def reply(self, pkt, send_function=None, address=None):
# type: (Packet, Optional[Callable[..., None]], Optional[Any]) -> None
def reply(
self,
pkt: Packet,
send_function: Callable[..., None] | None = None,
address: Any | None = None
) -> None:
if not self.is_request(pkt):
return
if address:
Expand All @@ -182,22 +179,19 @@ def reply(self, pkt, send_function=None, address=None):
if self.verbose:
self.print_reply(pkt, reply)

def run(self, *args, **kargs):
# type: (Any, Any) -> None
def run(self, *args: Any, **kargs: Any) -> None:
warnings.warn(
"run() method deprecated. The instance is now callable",
DeprecationWarning
)
self(*args, **kargs)

def bg(self, *args, **kwargs):
# type: (Any, Any) -> AsyncSniffer
def bg(self, *args: Any, **kwargs: Any) -> AsyncSniffer:
kwargs.setdefault("bg", True)
self(*args, **kwargs)
return self.sniffer

def __call__(self, *args, **kargs):
# type: (Any, Any) -> None
def __call__(self, *args: Any, **kargs: Any) -> None:
bg = kargs.pop("bg", False)
optsend, optsniff = self.parse_all_options(2, kargs)
self.optsend = self.defoptsend.copy()
Expand All @@ -213,12 +207,10 @@ def __call__(self, *args, **kargs):
except KeyboardInterrupt:
print("Interrupted by user")

def sniff(self):
# type: () -> None
def sniff(self) -> None:
sniff(**self.optsniff)

def sniff_bg(self):
# type: () -> None
def sniff_bg(self) -> None:
self.sniffer = AsyncSniffer(**self.optsniff)
self.sniffer.start()

Expand All @@ -230,17 +222,14 @@ class AnsweringMachineTCP(AnsweringMachine[Packet]):
"""
TYPE = socket.SOCK_STREAM

def parse_options(self, port=80, cls=conf.raw_layer):
# type: (int, Type[Packet]) -> None
def parse_options(self, port: int = 80, cls: type[Packet] = conf.raw_layer) -> None:
self.port = port
self.cls = cls

def close(self):
# type: () -> None
def close(self) -> None:
pass

def sniff(self):
# type: () -> None
def sniff(self) -> None:
from scapy.supersocket import StreamSocket
ssock = socket.socket(socket.AF_INET, self.TYPE)
try:
Expand Down Expand Up @@ -274,13 +263,11 @@ def sniff(self):
self.close()
ssock.close()

def sniff_bg(self):
# type: () -> None
def sniff_bg(self) -> None:
self.sniffer = threading.Thread(target=self.sniff) # type: ignore
self.sniffer.start()

def make_reply(self, req, address=None):
# type: (Packet, Optional[Any]) -> Packet
def make_reply(self, req: Packet, address: Any | None = None) -> Packet:
return req


Expand Down