Skip to content

Commit

Permalink
fix black8 issue
Browse files Browse the repository at this point in the history
fix black8 issue
  • Loading branch information
stanley31huang committed Jan 10, 2025
1 parent 88c2fcc commit 5a7259c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def __enter__(self):
self.enable_otg_module([OTG_MODULE])
self.usb_gadget_node = Path(
tempfile.TemporaryDirectory(
dir=self.root_path.joinpath("usb_gadget"),
prefix="udc_"
dir=self.root_path.joinpath("usb_gadget"), prefix="udc_"
).name
)
self.otg_setup()
Expand All @@ -87,7 +86,10 @@ def __exit__(self, exec, value, tb):
logging.debug("Clean up OTG configurations")
self._cleanup_usb_gadget()
cur_modules = [
mod for mod in self._get_child_modules() if mod not in self._child_modules]
mod
for mod in self._get_child_modules()
if mod not in self._child_modules
]
self.disable_otg_related_modules(cur_modules)
if self._child_modules:
self.enable_otg_module(self._child_modules)
Expand All @@ -104,11 +106,15 @@ def _get_child_modules(self):

def enable_otg_module(self, modules):
for module in modules:
subprocess.run("modprobe {}".format(module), shell=True, check=True)
subprocess.run(
"modprobe {}".format(module), shell=True, check=True
)

def disable_otg_related_modules(self, modules):
for module in modules:
subprocess.run("modprobe -r {}".format(module), shell=True, check=True)
subprocess.run(
"modprobe -r {}".format(module), shell=True, check=True
)

def otg_setup(self):
"""
Expand Down Expand Up @@ -150,8 +156,9 @@ def create_otg_function(self):
if not function_path.exists():
function_path.mkdir()

self.usb_gadget_node.joinpath(
"configs", "c.1", func_name).symlink_to(function_path, True)
self.usb_gadget_node.joinpath("configs", "c.1", func_name).symlink_to(
function_path, True
)

def _cleanup_usb_gadget(self):
func_name = "{}.0".format(self.OTG_FUNCTION)
Expand Down Expand Up @@ -207,7 +214,9 @@ def otg_setup(self):
logging.info("Create an USB image file for Mass Storage Test")
self._usb_img = tempfile.NamedTemporaryFile("+bw", delete=False)
subprocess.run(
"dd if=/dev/zero of={} bs=1M count=1024".format(self._usb_img.name),
"dd if=/dev/zero of={} bs=1M count=1024".format(
self._usb_img.name
),
shell=True,
check=True,
)
Expand All @@ -233,8 +242,9 @@ def create_otg_function(self):

function_path.joinpath("lun.0", "file").write_text(self._usb_img.name)

self.usb_gadget_node.joinpath(
"configs", "c.1", func_name).symlink_to(function_path, True)
self.usb_gadget_node.joinpath("configs", "c.1", func_name).symlink_to(
function_path, True
)

def detection_check_on_rpyc(self, rpyc_ip):
logging.info("USB drive detection on RPYC")
Expand All @@ -250,11 +260,13 @@ def detection_check_on_rpyc(self, rpyc_ip):

def function_check_with_rpyc(self, rpyc_ip):
logging.info("USB read/write testing on RPYC")
raise SystemExit(rpyc_client(
rpyc_ip,
"usb_storage_test",
self.usb_type,
))
raise SystemExit(
rpyc_client(
rpyc_ip,
"usb_storage_test",
self.usb_type,
)
)

def otg_test_process(self, rpyc_ip):
logging.info("Start Mass Storage Testing with OTG interface")
Expand Down Expand Up @@ -282,7 +294,9 @@ class OtgEthernetSetup(OtgConfigFsOperatorBase):
OTG_TARGET_MODULE = "usb_f_ecm"

def _collect_net_intfs(self):
return [os.path.basename(intf) for intf in glob.glob("/sys/class/net/*")]
return [
os.path.basename(intf) for intf in glob.glob("/sys/class/net/*")
]

def otg_setup(self):
self._net_intfs = self._collect_net_intfs()
Expand All @@ -301,7 +315,9 @@ def self_check(self):

otg_net_intf = [x for x in cur_net_intfs if x not in self._net_intfs]
if len(otg_net_intf) != 1:
logging.error("Found more than one new interface. %s", otg_net_intf)
logging.error(
"Found more than one new interface. %s", otg_net_intf
)
else:
logging.info("Found new network interface '%s'", otg_net_intf[0])
self._net_dev = otg_net_intf[0]
Expand All @@ -327,7 +343,7 @@ def function_check_with_rpyc(self, rpyc_ip):
logging.info("Ping from DUT to Target")
_module = SourceFileLoader(
"_",
os.path.join(CHECKBOX_BASE_PROVIDER, "bin/gateway_ping_test.py")
os.path.join(CHECKBOX_BASE_PROVIDER, "bin/gateway_ping_test.py"),
).load_module()
test_func = getattr(_module, "perform_ping_test")
ret = test_func([self._net_dev], "169.254.0.10")
Expand Down Expand Up @@ -367,7 +383,9 @@ def self_check(self):

otg_ser_intf = [x for x in cur_ser_intfs if x not in self._ser_intfs]
if len(otg_ser_intf) != 1:
logging.error("Found more than one new interface. %s", otg_ser_intf)
logging.error(
"Found more than one new interface. %s", otg_ser_intf
)
else:
logging.info("Found new network interface '%s'", otg_ser_intf[0])
self._serial_iface = otg_ser_intf[0]
Expand Down Expand Up @@ -415,8 +433,8 @@ def otg_test_process(self, rpyc_ip):
"N",
1,
3,
1024
)
1024,
),
)
t_thread.start()
time.sleep(3)
Expand Down Expand Up @@ -482,9 +500,7 @@ def register_arguments():
def main():
args = register_arguments()
if args.mode == "test":
otg_testing(
args.udc_node, args.type, args.rpyc_address, args.usb_type
)
otg_testing(args.udc_node, args.type, args.rpyc_address, args.usb_type)
elif args.mode == "info":
dump_otg_info(args.config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def rpyc_client(host, cmd, *args, **kwargs):
"""
for _ in range(2):
try:
conn = rpyc.connect(host, 60000, config={
"allow_all_attrs": True,
"allow_exposed_attrs": False
}
conn = rpyc.connect(
host,
60000,
config={"allow_all_attrs": True, "allow_exposed_attrs": False},
)
break
except ConnectionRefusedError:
Expand All @@ -47,4 +47,4 @@ def rpyc_client(host, cmd, *args, **kwargs):
except rpyc.core.vinegar.GenericException as exc:
raise SystemExit(
"Zapper host failed to process the requested command."
) from exc
) from exc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from rpyc.utils.server import ThreadedServer



logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler_std = logging.StreamHandler(sys.stdout)
Expand All @@ -19,9 +18,7 @@
)
LIBS = {
"enable_serial_server": {
"source": os.path.join(
CHECKBOX_PROVIDER_CEOEM_PATH, "serial_test.py"
),
"source": os.path.join(CHECKBOX_PROVIDER_CEOEM_PATH, "serial_test.py"),
"function": "server_mode",
},
"serial_check": {
Expand Down Expand Up @@ -60,7 +57,9 @@ def wrap(*args, **kwargs):
if func.__name__ in dynamic_integrate_funcs:
args = args[1:]

with redirect_stderr(sio_stderr) as stderr, redirect_stdout(sio_stdout) as stdout:
with redirect_stderr(sio_stderr) as stderr, redirect_stdout(
sio_stdout
) as stdout:
try:
ret = func(*args, **kwargs)
except SystemExit as exp:
Expand All @@ -70,6 +69,7 @@ def wrap(*args, **kwargs):
cls.logs = "stdout logs: {}".format(stdout.getvalue())
cls.logs += "\nstderr logs: {}".format(stderr.getvalue())
return ret

return wrap


Expand All @@ -87,9 +87,7 @@ def _load_method_from_file(name, file, func):

def append_method_to_service(cls):
for key, value in LIBS.items():
func = _load_method_from_file(
key, value["source"], value["function"]
)
func = _load_method_from_file(key, value["source"], value["function"])
if func:
setattr(cls, key, capture_io_logs(func))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ def _device_node_detect(func, device_type=""):
print(".", end="", flush=True)
attempts -= 1
print()
raise SystemExit(
"Failed to detect new {} interface".format(device_type)
)
raise SystemExit("Failed to detect new {} interface".format(device_type))


def configure_local_network(interface, net_info):
Expand Down

0 comments on commit 5a7259c

Please sign in to comment.