diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_tests.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_tests.py index df1ed67f04..0c96612007 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_tests.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_tests.py @@ -1,46 +1,18 @@ #!/usr/bin/env python3 import argparse -import datetime import os import sys +import time import re import subprocess +import shlex import select import logging +import threading from systemd import journal from pathlib import Path -from serial_test import serial_init, client_mode - - -def init_logger(): - """ - Set the logger to log DEBUG and INFO to stdout, and - WARNING, ERROR, CRITICAL to stderr. - """ - root_logger = logging.getLogger() - root_logger.setLevel(logging.INFO) - logger_format = "%(asctime)s %(levelname)-8s %(message)s" - date_format = "%Y-%m-%d %H:%M:%S" - - # Log DEBUG and INFO to stdout, others to stderr - stdout_handler = logging.StreamHandler(sys.stdout) - stdout_handler.setFormatter(logging.Formatter(logger_format, date_format)) - - stderr_handler = logging.StreamHandler(sys.stderr) - stderr_handler.setFormatter(logging.Formatter(logger_format, date_format)) - - stdout_handler.setLevel(logging.DEBUG) - stderr_handler.setLevel(logging.WARNING) - - # Add a filter to the stdout handler to limit log records to - # INFO level and below - stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO) - - root_logger.addHandler(stderr_handler) - root_logger.addHandler(stdout_handler) - - return root_logger +import serial_test RPMSG_ROOT = "/sys/bus/rpmsg/devices" @@ -151,91 +123,145 @@ def detect_arm_processor_type(): return arm_cpu_type -def pingpong_test(cpu_type): - """ - Probe ping-pong kernel module for RPMSG ping-pong test +class RpmsgPingPongTest: + + def __init__( + self, + kernel_module, + probe_cmd, + pingpong_event_pattern, + pingpong_end_pattern, + expected_count, + ): + self.kernel_module = kernel_module + self.probe_cmd = probe_cmd + self.pingpong_event_pattern = pingpong_event_pattern + self.pingpong_end_pattern = pingpong_end_pattern + self.expected_count = expected_count + self._init_logger() + + def _init_logger(self): + self.log_reader = journal.Reader() + self.log_reader.this_boot() + self.log_reader.seek_tail() + self.log_reader.get_previous() + + self._poller = select.poll() + self._poller.register(self.log_reader, self.log_reader.get_events()) + + def lookup_pingpong_logs(self): + keep_looking = True + for entry in self.log_reader: + logging.info(entry["MESSAGE"]) + if entry["MESSAGE"] == "": + continue - Raises: - SystemExit: if ping pong event count is not expected - """ + if re.search(self.pingpong_end_pattern, entry["MESSAGE"]): + keep_looking = False + break + else: + result = re.search( + self.pingpong_event_pattern, entry["MESSAGE"] + ) + if result and result.groups()[0] in self.rpmsg_channels: + self.pingpong_events.append(entry["MESSAGE"]) - logging.info("## Probe pingpong kernel module") - if cpu_type == "imx": - kernel_module = "imx_rpmsg_pingpong" - probe_cmd = "modprobe {}".format(kernel_module) - check_pattern = r"get .* \(src: (\w*)\)" - expected_count = 51 - elif cpu_type == "ti": - kernel_module = "rpmsg_client_sample" - probe_cmd = "modprobe {} count=100".format(kernel_module) - check_pattern = r".*ti.ipc4.ping-pong.*src: (\w*)\)" - expected_count = 100 - else: - raise SystemExit("Unexpected CPU type.") - rpmsg_end_pattern = "rpmsg.*: goodbye!" - - # Unload module is needed - try: - subprocess.run( - "lsmod | grep {} && modprobe -r {}".format( - kernel_module, kernel_module - ), - shell=True, - ) - except subprocess.CalledProcessError: - pass + return keep_looking - rpmsg_channels = get_rpmsg_channel() + def monitor_journal_pingpong_logs(self): - log_reader = journal.Reader() - log_reader.seek_tail() - log_reader.get_previous() + start_time = time.time() + logging.info("# start time: %s", start_time) - poll = select.poll() - poll.register(log_reader, log_reader.get_events()) + self.pingpong_events = [] - start_time = datetime.datetime.now() - logging.info("# start time: %s", start_time) - logging.info("# probe pingpong module with '%s'", probe_cmd) - try: - subprocess.run(probe_cmd, shell=True) - except subprocess.CalledProcessError: - pass + while self._poller.poll(1000): + if self.log_reader.process() == journal.APPEND: + if self.lookup_pingpong_logs() is False: + return self.pingpong_events - pingpong_events = [] - needed_break = False - while poll.poll(): - if log_reader.process() != journal.APPEND: - continue + cur_time = time.time() + if (cur_time - start_time) > 60: + return self.pingpong_events - for entry in log_reader: - logging.info(entry["MESSAGE"]) - if entry["MESSAGE"] == "": - continue + def pingpong_test(self): + """ + Probe ping-pong kernel module for RPMSG ping-pong test - search_end = re.search(rpmsg_end_pattern, entry["MESSAGE"]) - search_pattern = re.search(check_pattern, entry["MESSAGE"]) - cur_time = datetime.datetime.now() + Raises: + SystemExit: if ping pong event count is not expected + """ - if search_pattern and search_pattern.groups()[0] in rpmsg_channels: - pingpong_events.append(entry["MESSAGE"]) - elif search_end or (cur_time - start_time).total_seconds() > 60: - needed_break = True - break + logging.info("# Start ping pong test") + # Unload module is needed + try: + logging.info("# Unload pingpong kernel module if needed") + subprocess.run( + "lsmod | grep {} && modprobe -r {}".format( + self.kernel_module, self.kernel_module + ), + shell=True, + ) + except subprocess.CalledProcessError: + pass + + self.rpmsg_channels = get_rpmsg_channel() + + try: + thread = threading.Thread( + target=self.monitor_journal_pingpong_logs + ) + thread.start() + logging.info("# probe pingpong module with '%s'", self.probe_cmd) + + subprocess.Popen(shlex.split(self.probe_cmd)) + thread.join() + + self._poller.unregister(self.log_reader) + self.log_reader.close() + except subprocess.CalledProcessError: + pass + + logging.info("# check Ping pong records") + if len(self.pingpong_events) != self.expected_count: + logging.info( + "ping-pong count is not match. expected %s, actual: %s", + self.expected_count, + len(self.pingpong_events), + ) + raise SystemExit("The ping-pong message is not match.") + else: + logging.info("ping-pong logs count is match") - if needed_break: - break - logging.info("## Check Ping pong test is finish") - if len(pingpong_events) != expected_count: - logging.info( - "ping-pong count is not match. expected %s, actual: %s", - expected_count, - len(pingpong_events), +def pingpong_test(cpu_type): + """ + RPMSG ping-pong test + + Raises: + SystemExit: if ping pong event count is not expected + """ + + if cpu_type == "imx": + test_obj = RpmsgPingPongTest( + "imx_rpmsg_pingpong", + "modprobe imx_rpmsg_pingpong", + r"get .* \(src: (\w*)\)", + r"rpmsg.*: goodbye!", + 51, + ) + elif cpu_type == "ti": + test_obj = RpmsgPingPongTest( + "rpmsg_client_sample", + "modprobe rpmsg_client_sample count=100", + r".*ti.ipc4.ping-pong.*\(src: (\w*)\)", + r"rpmsg.*: goodbye!", + 100, ) - raise SystemExit("The ping-pong message is not match.") else: - logging.info("ping-pong logs count is match") + raise SystemExit("Unexpected CPU type.") + + test_obj.pingpong_test() def rpmsg_tty_test_supported(cpu_type): @@ -310,12 +336,15 @@ def serial_tty_test(cpu_type, data_size): path_obj = Path("/dev") rpmsg_devs = check_rpmsg_tty_devices(path_obj, check_pattern, probe_cmd) if rpmsg_devs: - client_mode(serial_init(str(rpmsg_devs[0])), data_size) + serial_dev = serial_test.Serial( + str(rpmsg_devs[0]), "rpmsg-tty", [], 115200, 8, "N", 1, 3, 1024 + ) + serial_test.client_mode(serial_dev, data_size) else: raise SystemExit("No RPMSG TTY devices found.") -def main(): +def register_arguments(): parser = argparse.ArgumentParser(description="RPMSG related test") parser.add_argument( "--type", @@ -323,9 +352,10 @@ def main(): required=True, choices=["detect", "pingpong", "serial-tty"], ) - args = parser.parse_args() - init_logger() + return parser.parse_args() + +def main(args): if args.type == "detect": check_rpmsg_device() elif args.type == "pingpong": @@ -335,4 +365,27 @@ def main(): if __name__ == "__main__": - main() + + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + logger_format = "%(asctime)s %(levelname)-8s %(message)s" + date_format = "%Y-%m-%d %H:%M:%S" + + # Log DEBUG and INFO to stdout, others to stderr + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(logging.Formatter(logger_format, date_format)) + + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setFormatter(logging.Formatter(logger_format, date_format)) + + stdout_handler.setLevel(logging.DEBUG) + stderr_handler.setLevel(logging.WARNING) + + # Add a filter to the stdout handler to limit log records to + # INFO level and below + stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO) + + root_logger.addHandler(stderr_handler) + root_logger.addHandler(stdout_handler) + + main(register_arguments()) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py index a7b7adb7fd..b4047ff419 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/serial_test.py @@ -105,7 +105,6 @@ def send(self, data: bytes) -> None: def recv(self) -> bytes: rcv = "" try: - self.ser.rts = False rcv = self.ser.read(self.data_size) if rcv: logging.info("Received: {}".format(rcv.decode())) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_tests.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_tests.py index 24aaa85937..45d97a5dc0 100755 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_tests.py +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_tests.py @@ -1,10 +1,10 @@ import unittest import sys -from unittest.mock import patch, MagicMock +import argparse +from unittest.mock import patch, MagicMock, Mock sys.modules["systemd"] = MagicMock() -sys.modules["serial_test"] = MagicMock() -sys.modules["serial_test"].client_mode = MagicMock(side_effect=SystemExit(0)) + import rpmsg_tests @@ -96,21 +96,135 @@ def test_rpmsg_test_supported_with_other(self): with self.assertRaisesRegex(SystemExit, "Unexpected CPU type."): rpmsg_tests.rpmsg_tty_test_supported("mtk") + +class TestRpmsgPingPong(unittest.TestCase): + + @patch("rpmsg_tests.RpmsgPingPongTest.__init__") + @patch("rpmsg_tests.RpmsgPingPongTest.pingpong_test") + def test_pingpong_entry_imx(self, mock_pingpong, mock_init): + + mock_init.return_value = None + rpmsg_tests.pingpong_test("imx") + mock_init.assert_called_with( + "imx_rpmsg_pingpong", + "modprobe imx_rpmsg_pingpong", + r"get .* \(src: (\w*)\)", + r"rpmsg.*: goodbye!", + 51, + ) + mock_pingpong.assert_called_with() + + @patch("rpmsg_tests.RpmsgPingPongTest.__init__") + @patch("rpmsg_tests.RpmsgPingPongTest.pingpong_test") + def test_pingpong_entry_ti(self, mock_pingpong, mock_init): + + mock_init.return_value = None + rpmsg_tests.pingpong_test("ti") + mock_init.assert_called_with( + "rpmsg_client_sample", + "modprobe rpmsg_client_sample count=100", + r".*ti.ipc4.ping-pong.*\(src: (\w*)\)", + r"rpmsg.*: goodbye!", + 100, + ) + mock_pingpong.assert_called_with() + + @patch("rpmsg_tests.RpmsgPingPongTest.__init__") + @patch("rpmsg_tests.RpmsgPingPongTest.pingpong_test") + def test_pingpong_entry_failed(self, mock_pingpong, mock_init): + + mock_init.return_value = None + with self.assertRaises(SystemExit): + rpmsg_tests.pingpong_test("unknown") + mock_init.assert_not_called() + mock_pingpong.assert_not_called() + + +class TestRpmsgSerialTty(unittest.TestCase): + + @patch("serial_test.client_mode") + @patch("serial_test.Serial") @patch("rpmsg_tests.check_rpmsg_tty_devices") - def test_no_rpmsg_devices(self, mock_check_rpmsg_tty_devices): + def test_no_rpmsg_devices( + self, mock_check_rpmsg_tty_devices, mock_serial, mock_client_mode + ): """ No RPMSG TTY devices found and raise SystemExit """ + mock_serial.return_value = [] mock_check_rpmsg_tty_devices.return_value = [] with self.assertRaisesRegex(SystemExit, "No RPMSG TTY devices found."): rpmsg_tests.serial_tty_test("imx", 64) + mock_serial.assert_not_called() + mock_client_mode.assert_not_called() + @patch("serial_test.client_mode") + @patch("serial_test.Serial") @patch("rpmsg_tests.check_rpmsg_tty_devices") - def test_rpmsg_test_passed(self, mock_check_rpmsg_tty_devices): + def test_rpmsg_tty_test_passed( + self, mock_check_rpmsg_tty_devices, mock_serial, mock_client_mode + ): """ String-ECHO test passed through RPMSG TTY device """ - mock_check_rpmsg_tty_devices.return_value = ["/dev/ttyRPMSG30"] - - with self.assertRaises(SystemExit): - rpmsg_tests.serial_tty_test("imx", 64) + serial_dev = "serial-dev" + tty_device = "/dev/ttyRPMSG30" + mock_check_rpmsg_tty_devices.return_value = [tty_device] + mock_serial.return_value = serial_dev + + rpmsg_tests.serial_tty_test("imx", 64) + mock_serial.assert_called_with( + tty_device, "rpmsg-tty", [], 115200, 8, "N", 1, 3, 1024 + ) + mock_client_mode.assert_called_with(serial_dev, 64) + + @patch("rpmsg_tests.serial_tty_test") + @patch("rpmsg_tests.pingpong_test") + @patch("rpmsg_tests.check_rpmsg_device") + def test_launch_check_rpmsg_device( + self, mock_check_rpmsg, mock_pingpong, mock_stty + ): + + mock_args = Mock(return_value=argparse.Namespace(type="detect")) + rpmsg_tests.main(mock_args()) + mock_check_rpmsg.assert_called_with() + mock_pingpong.assert_not_called() + mock_stty.assert_not_called() + + @patch("rpmsg_tests.serial_tty_test") + @patch("rpmsg_tests.check_rpmsg_device") + @patch("rpmsg_tests.detect_arm_processor_type") + @patch("rpmsg_tests.pingpong_test") + def test_launch_pingpong_test( + self, mock_pingpong, mock_detect_arch, mock_check_rpmsg, mock_stty + ): + + mock_args = Mock(return_value=argparse.Namespace(type="pingpong")) + mock_detect_arch.return_value = "imx" + rpmsg_tests.main(mock_args()) + mock_pingpong.assert_called_with("imx") + mock_detect_arch.assert_called_with() + mock_check_rpmsg.assert_not_called() + mock_stty.assert_not_called() + + @patch("rpmsg_tests.check_rpmsg_device") + @patch("rpmsg_tests.pingpong_test") + @patch("rpmsg_tests.detect_arm_processor_type") + @patch("rpmsg_tests.serial_tty_test") + def test_launch_serial_tty_test( + self, mock_stty, mock_detect_arch, mock_pingpong, mock_check_rpmsg + ): + + mock_args = Mock(return_value=argparse.Namespace(type="serial-tty")) + mock_detect_arch.return_value = "imx" + rpmsg_tests.main(mock_args()) + mock_stty.assert_called_with("imx", 1024) + mock_detect_arch.assert_called_with() + mock_pingpong.assert_not_called() + mock_check_rpmsg.assert_not_called() + + def test_argument_parser(self): + sys.argv = ["rpmsg_tests.py", "--type", "detect"] + args = rpmsg_tests.register_arguments() + + self.assertEqual(args.type, "detect") diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu index c081806f2a..f3c4782d9e 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu @@ -20,3 +20,4 @@ _description: Automated RPMSG framework tests include: ce-oem-rpmsg/detect-device ce-oem-rpmsg/serial-tty + ce-oem-rpmsg/pingpong