Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cd qemu-vexpress-a9 #577

Open
supperthomas opened this issue Feb 1, 2025 · 5 comments
Open

cd qemu-vexpress-a9 #577

supperthomas opened this issue Feb 1, 2025 · 5 comments

Comments

@supperthomas
Copy link
Owner

No description provided.

@supperthomas
Copy link
Owner Author

    def check_env(self):
        # 检查QEMU环境是否可用
        def exec_cmd(cmd):
            # 执行命令并返回输出文本
            text = ''
            p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
            out, err = p.communicate()
            for line in out.splitlines():
                text += str(line, encoding="utf8") + '\r\n'
            return text

        # 获取QEMU版本信息,检查其是否包含"version"
        result = exec_cmd('qemu-system-{} --version'.format(self.system))
        check_ok = (result.find('version') != -1)
        if check_ok:
            self.env_version = result
        return check_ok

@supperthomas
Copy link
Owner Author

    def run(self):
        # 运行QEMU
        if self.check_env():
            logger.info("QEMU environment check OK.")  # 环境检查通过
            logger.debug(self.env_version)
        else:
            logger.error("QEMU environment check FAILED.")  # 环境检查失败
            return

        # 构建运行QEMU的命令
        cmd = 'qemu-system-{} -nographic -M {} -kernel {}'.format(self.system, self.machine, self.elf_path)
        
        if self.sd_path != "None":
            cmd = cmd + ' -sd ' + self.sd_path

        # 根据平台设置不同的创建子进程方法
        if platform.system() == "Windows":
            self.sub_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        else:
            self.sub_proc = subprocess.Popen("exec " + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             shell=True)
        event = threading.Event()  # 创建事件对象,用于线程间通信

1 similar comment
@supperthomas
Copy link
Owner Author

    def run(self):
        # 运行QEMU
        if self.check_env():
            logger.info("QEMU environment check OK.")  # 环境检查通过
            logger.debug(self.env_version)
        else:
            logger.error("QEMU environment check FAILED.")  # 环境检查失败
            return

        # 构建运行QEMU的命令
        cmd = 'qemu-system-{} -nographic -M {} -kernel {}'.format(self.system, self.machine, self.elf_path)
        
        if self.sd_path != "None":
            cmd = cmd + ' -sd ' + self.sd_path

        # 根据平台设置不同的创建子进程方法
        if platform.system() == "Windows":
            self.sub_proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
        else:
            self.sub_proc = subprocess.Popen("exec " + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=0,
                                             shell=True)
        event = threading.Event()  # 创建事件对象,用于线程间通信

@supperthomas
Copy link
Owner Author

bsp_board_info:
  arch: arm
  toolchain: arm-none-eabi-gcc
  pre_build: |
    scons -c
    qemu-system-arm --version
  build_cmd: |
    scons -j8
  post_build: |
    scons --version
    qemu-system-arm --version
  run_cmd: qemu-system-arm -M vexpress-a9 -smp cpus=2 -kernel rtthread.bin -nographic -sd sd.bin
  qemu_flag: true
pkg.tools.coremark:
  kconfig:
    - CONFIG_PKG_USING_COREMARK=y
    - CONFIG_COREMARK_ITERATIONS=36000
  ci_build_run_flag : true
  buildcheckresult: "core_main"              #检查编译的log中是否有匹配字
  msh_cmd: |
    ps
    version
    core_mark
  msh_cmd_timeout: 60
  checkresult: 'CoreMark 1.0' #检查执行过程中的log是否有匹配字

@supperthomas
Copy link
Owner Author

supperthomas commented Feb 1, 2025

import subprocess
import threading
import time
import logging
import sys
import os
import shutil
import re
import multiprocessing
import yaml


# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

class QemuManager:
    def __init__(self, qemu_cmd, idle_timeout=5, checkresult=None):
        """
        初始化QEMU管理器
        :param qemu_cmd: QEMU启动命令
        :param idle_timeout: 日志空闲超时时间(秒)
        """
        self.qemu_cmd = qemu_cmd
        self.idle_timeout = idle_timeout
        self.qemu_process = None
        self.log_thread = None
        self.checkresult = checkresult
        self.last_log_time = time.time()
        self.logs = []
        self.running = False
        self.checkresult_found = False  # 标记是否找到checkresult
    def start_qemu(self):
        """启动QEMU进程"""
        logger.info("Starting QEMU...")
        self.qemu_process = subprocess.Popen(
            self.qemu_cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True,
            bufsize=0,
            text=True
        )
        self.running = True
        logger.info("QEMU started successfully.")

    def log_monitor(self):
        """监控QEMU输出日志"""
        logger.info("Starting log monitor...")
        while self.running:
            line = self.qemu_process.stdout.readline()
            if line:
                line = line.strip()
                self.logs.append(line)
                self.last_log_time = time.time()  # 更新最后日志时间
                logger.info(f"QEMU Output: {line}")  # 实时打印日志
            # 检查是否包含checkresult
            if self.checkresult and self.checkresult in line:
                logger.info(f"Checkresult '{self.checkresult}' found in logs. Success...")
                self.checkresult_found = True
                #self.running = False  # 停止监控
                #break
            else:
                time.sleep(0.1)

    def send_command(self, command):
        """向QEMU发送命令"""
        if not self.running or not self.qemu_process:
            logger.error("QEMU is not running.")
            return False

        logger.info(f"Sending command: {command}")
        try:
            self.qemu_process.stdin.write(command + "\n")
            self.qemu_process.stdin.flush()
            return True
        except Exception as e:
            logger.error(f"Failed to send command: {e}")
            return False

    def stop_qemu(self):
        """停止QEMU进程"""
        if self.qemu_process:
            logger.info("Stopping QEMU...")
            self.running = False
            self.qemu_process.terminate()
            self.qemu_process.wait(timeout=5)
            logger.info("QEMU stopped.")



    def run(self,commands):
        """主运行逻辑"""
        try:
            # 启动QEMU
            self.start_qemu()

            # 启动日志监控线程
            self.log_thread = threading.Thread(target=self.log_monitor, daemon=True)
            self.log_thread.start()

            # 等待QEMU启动完成
            time.sleep(5)

            for cmd in commands:
                if not self.send_command(cmd):
                    break
                time.sleep(2)  # 命令之间间隔2秒

            # 监控日志输出,超时退出
            while self.running:
                idle_time = time.time() - self.last_log_time
                if idle_time > self.idle_timeout :
                    if not self.checkresult_found:
                        logger.info(f"No logs for {self.idle_timeout} seconds. ::error:: Exiting...")
                    else:
                        logger.info(f"No logs for {self.idle_timeout} seconds. ::check success:: Exiting...")
                    break
                time.sleep(0.1)

        except KeyboardInterrupt:
            logger.info("Script interrupted by user.")
        except Exception as e:
            logger.error(f"An error occurred: {e}")
        finally:
            self.stop_qemu()
def run_command(command, timeout=None):
    """运行命令并返回输出和结果"""
    print(command)
    result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=timeout)
    return result.stdout, result.returncode

def check_output(output, check_string):
    """检查输出中是否包含指定字符串"""
    flag = check_string in output
    if flag == True:
        print('find string ' + check_string)
    else:
        print('::error:: can not find string ' + check_string + output)
    return flag

if __name__ == "__main__":
    # QEMU启动命令
    pre_build_commands = [
        "scons -c"
    ]
    post_build_command = "echo end"
    build_command = "scons"
    build_check_result = "core_main"
    check_result = "CoreMark 1.0"
    qemu_timeout_second = 50
    qemu_command = (
        "qemu-system-arm -M vexpress-a9 -smp cpus=2 -kernel rtthread.bin -nographic -sd sd.bin"
    )
    commands = ["version", "ps", "core_mark"]
    # 创建QEMU管理器并运行
    yml_files_content = []
    bsp_root = os.getcwd()
    print(bsp_root)
    directory = os.path.join(bsp_root , '.ci/attachconfig')
    if os.path.exists(directory):
        for root, dirs, files in os.walk(directory):
            for filename in files:
                print(filename)
                if filename.endswith('attachconfig.yml'):
                    file_path = os.path.join(root, filename)
                    print(file_path)
                    if os.path.exists(file_path):
                        try:
                            with open(file_path, 'r' ,encoding='utf-8') as file:
                                print('===========')
                                content = yaml.safe_load(file)
                                if content is None:
                                    continue
                                yml_files_content.append(content)
                        except yaml.YAMLError as e:
                            print(f"::error::Error parsing YAML file: {e}")
                            continue
                        except Exception as e:
                            print(f"::error::Error reading file: {e}")
                            continue

    for projects in yml_files_content:
            for name, details in projects.items():
                if(name == 'bsp_board_info'):
                    print(details)
                    pre_build_commands = details.get("pre_build").splitlines()
                    build_command = details.get("build_cmd").splitlines()
                    post_build_command = details.get("post_build").splitlines()
                    qemu_command = details.get("run_cmd")
                if details.get("kconfig") is not None:
                    if details.get("buildcheckresult")  is not None:
                        build_check_result = details.get("buildcheckresult")
                    if details.get("msh_cmd") is not None:
                        commands = details.get("msh_cmd").splitlines()
                    if details.get("checkresult") is not None:
                        check_result = details.get("checkresult")

    print(commands)
    print(check_result)
    print(build_check_result)
    print(qemu_command)
    print(pre_build_commands)
    #exit(1)

    # 执行 pre_build 命令
    for command in pre_build_commands:
        output, returncode = run_command(command)
        print(output)
        if returncode != 0:
            print(f"Pre-build command failed: {command}")
            print(output)
           # return
    for command in build_command:
        output, returncode = run_command(command)
        print(output)
        if returncode != 0 or not check_output(output, build_check_result):
                print("Build failed or build check result not found")
                print(output)
                #return
    print('==========')
    for command in post_build_command:
        output, returncode = run_command(command)
        print(output)
        if returncode != 0:
            print(f"Post-build command failed: {post_build_command}")
            print(output)
            #return

    print(qemu_command)
    #exit(1)
    qemu_manager = QemuManager(qemu_cmd=qemu_command, idle_timeout=qemu_timeout_second,checkresult=check_result)
    qemu_manager.run(commands)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant