Skip to content

Commit

Permalink
Fix rpmsg tests (Bugfix) (#1184)
Browse files Browse the repository at this point in the history
* restructure pingpong test and add unittest

restructure pingpong test and add unittest
  • Loading branch information
stanley31huang authored Apr 19, 2024
1 parent 14059dc commit c397ca0
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 118 deletions.
269 changes: 161 additions & 108 deletions contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_tests.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,18 @@
#!/usr/bin/env python3

import argparse
import datetime
import os
import sys
import time
import re
import subprocess
import shlex
import select
import logging
import threading
from systemd import journal
from pathlib import Path
from serial_test import serial_init, client_mode


def init_logger():
"""
Set the logger to log DEBUG and INFO to stdout, and
WARNING, ERROR, CRITICAL to stderr.
"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
logger_format = "%(asctime)s %(levelname)-8s %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"

# Log DEBUG and INFO to stdout, others to stderr
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(logging.Formatter(logger_format, date_format))

stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(logging.Formatter(logger_format, date_format))

stdout_handler.setLevel(logging.DEBUG)
stderr_handler.setLevel(logging.WARNING)

# Add a filter to the stdout handler to limit log records to
# INFO level and below
stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO)

root_logger.addHandler(stderr_handler)
root_logger.addHandler(stdout_handler)

return root_logger
import serial_test


RPMSG_ROOT = "/sys/bus/rpmsg/devices"
Expand Down Expand Up @@ -151,91 +123,145 @@ def detect_arm_processor_type():
return arm_cpu_type


def pingpong_test(cpu_type):
"""
Probe ping-pong kernel module for RPMSG ping-pong test
class RpmsgPingPongTest:

def __init__(
self,
kernel_module,
probe_cmd,
pingpong_event_pattern,
pingpong_end_pattern,
expected_count,
):
self.kernel_module = kernel_module
self.probe_cmd = probe_cmd
self.pingpong_event_pattern = pingpong_event_pattern
self.pingpong_end_pattern = pingpong_end_pattern
self.expected_count = expected_count
self._init_logger()

def _init_logger(self):
self.log_reader = journal.Reader()
self.log_reader.this_boot()
self.log_reader.seek_tail()
self.log_reader.get_previous()

self._poller = select.poll()
self._poller.register(self.log_reader, self.log_reader.get_events())

def lookup_pingpong_logs(self):
keep_looking = True
for entry in self.log_reader:
logging.info(entry["MESSAGE"])
if entry["MESSAGE"] == "":
continue

Raises:
SystemExit: if ping pong event count is not expected
"""
if re.search(self.pingpong_end_pattern, entry["MESSAGE"]):
keep_looking = False
break
else:
result = re.search(
self.pingpong_event_pattern, entry["MESSAGE"]
)
if result and result.groups()[0] in self.rpmsg_channels:
self.pingpong_events.append(entry["MESSAGE"])

logging.info("## Probe pingpong kernel module")
if cpu_type == "imx":
kernel_module = "imx_rpmsg_pingpong"
probe_cmd = "modprobe {}".format(kernel_module)
check_pattern = r"get .* \(src: (\w*)\)"
expected_count = 51
elif cpu_type == "ti":
kernel_module = "rpmsg_client_sample"
probe_cmd = "modprobe {} count=100".format(kernel_module)
check_pattern = r".*ti.ipc4.ping-pong.*src: (\w*)\)"
expected_count = 100
else:
raise SystemExit("Unexpected CPU type.")
rpmsg_end_pattern = "rpmsg.*: goodbye!"

# Unload module is needed
try:
subprocess.run(
"lsmod | grep {} && modprobe -r {}".format(
kernel_module, kernel_module
),
shell=True,
)
except subprocess.CalledProcessError:
pass
return keep_looking

rpmsg_channels = get_rpmsg_channel()
def monitor_journal_pingpong_logs(self):

log_reader = journal.Reader()
log_reader.seek_tail()
log_reader.get_previous()
start_time = time.time()
logging.info("# start time: %s", start_time)

poll = select.poll()
poll.register(log_reader, log_reader.get_events())
self.pingpong_events = []

start_time = datetime.datetime.now()
logging.info("# start time: %s", start_time)
logging.info("# probe pingpong module with '%s'", probe_cmd)
try:
subprocess.run(probe_cmd, shell=True)
except subprocess.CalledProcessError:
pass
while self._poller.poll(1000):
if self.log_reader.process() == journal.APPEND:
if self.lookup_pingpong_logs() is False:
return self.pingpong_events

pingpong_events = []
needed_break = False
while poll.poll():
if log_reader.process() != journal.APPEND:
continue
cur_time = time.time()
if (cur_time - start_time) > 60:
return self.pingpong_events

for entry in log_reader:
logging.info(entry["MESSAGE"])
if entry["MESSAGE"] == "":
continue
def pingpong_test(self):
"""
Probe ping-pong kernel module for RPMSG ping-pong test
search_end = re.search(rpmsg_end_pattern, entry["MESSAGE"])
search_pattern = re.search(check_pattern, entry["MESSAGE"])
cur_time = datetime.datetime.now()
Raises:
SystemExit: if ping pong event count is not expected
"""

if search_pattern and search_pattern.groups()[0] in rpmsg_channels:
pingpong_events.append(entry["MESSAGE"])
elif search_end or (cur_time - start_time).total_seconds() > 60:
needed_break = True
break
logging.info("# Start ping pong test")
# Unload module is needed
try:
logging.info("# Unload pingpong kernel module if needed")
subprocess.run(
"lsmod | grep {} && modprobe -r {}".format(
self.kernel_module, self.kernel_module
),
shell=True,
)
except subprocess.CalledProcessError:
pass

self.rpmsg_channels = get_rpmsg_channel()

try:
thread = threading.Thread(
target=self.monitor_journal_pingpong_logs
)
thread.start()
logging.info("# probe pingpong module with '%s'", self.probe_cmd)

subprocess.Popen(shlex.split(self.probe_cmd))
thread.join()

self._poller.unregister(self.log_reader)
self.log_reader.close()
except subprocess.CalledProcessError:
pass

logging.info("# check Ping pong records")
if len(self.pingpong_events) != self.expected_count:
logging.info(
"ping-pong count is not match. expected %s, actual: %s",
self.expected_count,
len(self.pingpong_events),
)
raise SystemExit("The ping-pong message is not match.")
else:
logging.info("ping-pong logs count is match")

if needed_break:
break

logging.info("## Check Ping pong test is finish")
if len(pingpong_events) != expected_count:
logging.info(
"ping-pong count is not match. expected %s, actual: %s",
expected_count,
len(pingpong_events),
def pingpong_test(cpu_type):
"""
RPMSG ping-pong test
Raises:
SystemExit: if ping pong event count is not expected
"""

if cpu_type == "imx":
test_obj = RpmsgPingPongTest(
"imx_rpmsg_pingpong",
"modprobe imx_rpmsg_pingpong",
r"get .* \(src: (\w*)\)",
r"rpmsg.*: goodbye!",
51,
)
elif cpu_type == "ti":
test_obj = RpmsgPingPongTest(
"rpmsg_client_sample",
"modprobe rpmsg_client_sample count=100",
r".*ti.ipc4.ping-pong.*\(src: (\w*)\)",
r"rpmsg.*: goodbye!",
100,
)
raise SystemExit("The ping-pong message is not match.")
else:
logging.info("ping-pong logs count is match")
raise SystemExit("Unexpected CPU type.")

test_obj.pingpong_test()


def rpmsg_tty_test_supported(cpu_type):
Expand Down Expand Up @@ -310,22 +336,26 @@ def serial_tty_test(cpu_type, data_size):
path_obj = Path("/dev")
rpmsg_devs = check_rpmsg_tty_devices(path_obj, check_pattern, probe_cmd)
if rpmsg_devs:
client_mode(serial_init(str(rpmsg_devs[0])), data_size)
serial_dev = serial_test.Serial(
str(rpmsg_devs[0]), "rpmsg-tty", [], 115200, 8, "N", 1, 3, 1024
)
serial_test.client_mode(serial_dev, data_size)
else:
raise SystemExit("No RPMSG TTY devices found.")


def main():
def register_arguments():
parser = argparse.ArgumentParser(description="RPMSG related test")
parser.add_argument(
"--type",
help="RPMSG tests",
required=True,
choices=["detect", "pingpong", "serial-tty"],
)
args = parser.parse_args()
init_logger()
return parser.parse_args()


def main(args):
if args.type == "detect":
check_rpmsg_device()
elif args.type == "pingpong":
Expand All @@ -335,4 +365,27 @@ def main():


if __name__ == "__main__":
main()

root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
logger_format = "%(asctime)s %(levelname)-8s %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"

# Log DEBUG and INFO to stdout, others to stderr
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(logging.Formatter(logger_format, date_format))

stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setFormatter(logging.Formatter(logger_format, date_format))

stdout_handler.setLevel(logging.DEBUG)
stderr_handler.setLevel(logging.WARNING)

# Add a filter to the stdout handler to limit log records to
# INFO level and below
stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO)

root_logger.addHandler(stderr_handler)
root_logger.addHandler(stdout_handler)

main(register_arguments())
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def send(self, data: bytes) -> None:
def recv(self) -> bytes:
rcv = ""
try:
self.ser.rts = False
rcv = self.ser.read(self.data_size)
if rcv:
logging.info("Received: {}".format(rcv.decode()))
Expand Down
Loading

0 comments on commit c397ca0

Please sign in to comment.