Skip to content

Plugins

ETenal edited this page Mar 11, 2024 · 20 revisions

SyzBridge provides a comprehensive interface for reproducing upstream kernel bugs. It can easily integrate with existing bug assessment tools as standalone plugins of SyzBridge.

In fact, all the current functionalities of SyzBridge (reproducing bugs on downstream, extracting kernel trace, automatically tuning the original PoCs, etc.) were implemented as one or more plugins. These plugins can share data and collaborate with each other. SyzBridge also provides rich APIs for common functionalities, such as launching VMs or detecting kernel crashes, among others.

  • RawBugReproduce: RawBugReproduce runs upstream PoC on downstream, without any adaptation, and monitors kernel crashes if appear.

  • SyzFeatureMinimize: SyzFeatureMinimize minimizes the testcase features. Those features control how PoC works, for example, repeat, sandbox. Minimizing those features provides the maximum capability to accommodate different downstream distro environments.

  • TraceAnalysis: TraceAnalysis extracts PoC execution traces from upstream and downstream distro. Those traces are later fed to ModulesAnalysis for further analysis.

  • ModulesAnalysis: ModulesAnalysis takes the kernel traces from TraceAnalysis, and compares the upstream trace with the downstream traces. The differences in the two traces reflect the potential missing modules.

  • BugReproduce: BugReproduce takes the results from previous plugins, and makes adaptations to the upstream PoC and then tests it on downstream.

Workflow

A blank plugin the following entries:

  • prepare() is a common entry to prepare a plugin. It has no arguments, users are able to pass customized arguments through SyzBridge configuration file. prepare() function can pass arguments to prepare_on_demand() if necessary.

  • prepare_on_demand() is used to set up variables and conditions. It's also handy when you are testing a plugin by passing the arguments directly to the plugin without a config file.

  • success() can only be called once for each case. It makes the current case move to the succeed category.

  • run() is the main entry of a plugin. It's where you implement the main functionality of your plugin. It returns True to create a success stamp, otherwise, no stamp will be created.

  • generate_report() executes after run() returns True.

  • cleanup() executes after plugin is finished.

from plugins import AnalysisModule

class Template(AnalysisModule):
    NAME = "Template"
    REPORT_START = "======================Template Report======================"
    REPORT_END =   "==================================================================="
    REPORT_NAME = "Report_Template"
    DEPENDENCY_PLUGINS = []

    def __init__(self):
        super().__init__()
        
    def prepare(self):
        plugin = self.cfg.get_plugin(self.NAME)
        if plugin == None:
            self.err_msg("No such plugin {}".format(self.NAME))
        try:
            self.greeting = int(plugin.greeting)
        except AttributeError:
            self.err_msg("Failed to get greeting")
            return False
        return self.prepare_on_demand()
    
    def prepare_on_demand(self):
        self._prepared = True
        return True
    
    def success(self):
        return self._move_to_success

    def run(self):
        """
        do something
        True: plugin return successfully
        False: something goes wrong, stamp will not be created
        """
        self.logger.info("Hello you, {}".format(self.greeting))
        return True
    
    def generate_report(self):
        final_report = "\n".join(self.report)
        self.info_msg(final_report)
        self._write_to(final_report, self.REPORT_NAME)
    
    def _write_to(self, content, name):
        file_path = "{}/{}".format(self.path_case_plugin, name)
        super()._write_to(content, file_path)

    def cleanup(self):
        super().cleanup()

Write your own plugin

This is a tutorial for writing a bug bisection plugin. This plugin tests whether a bug is reproducible on a list of decreasing kernel versions (e.g., v5.7, v5.6, v5.5...). If the bug fails to reproduce on a certain kernel version, it often indicates that the vulnerable code was introduced later (e.g., if the bug reproduced on v5.7 and v5.6 but failed on v5.5, it means the buggy commit appeared between v5.5 and v5.6).

The plugin folder contains at least two files: __init__.py and name_of_the_plugin.py

__init__.py contains a DESCRIPTION, ENABLE, and AS_SERVICE. DESCRIPTION is the plugin description displayed in the help information of SyzBridge. ENABLE determines whether the plugin is enabled. Enabling it doesn't automatically apply the plugin to the workflow; users must pass the argument --name-of-the-plugin to use the plugin. For more details, refer to Run-SyzBridge. AS_SERVICE determines whether to treat the plugin as a service. A service plugin can't be invoked through the command line and is often used by other plugins as a library. For example, SyzkallerInterface plugin provides SyzkallerInterface class for use by other plugins.

# __init__.py
from .bug_bisection import BugBisection

DESCRIPTION = "BugBisection is a plugin that determines the first buggy kernel version"
ENABLE = True
AS_SERVICE = False

Now, we start writing the main functions of the plugin. In prepare() function, we want to retrieve the repro_timeout, how many attempt, and a kernel_version list.

    def prepare(self):
        plugin = self.cfg.get_plugin(self.NAME)
        if plugin == None:
            self.err_msg("No such plugin {}".format(self.NAME))
        try:
            self.repro_timeout = int(plugin.timeout)
        except AttributeError:
            self.err_msg("Failed to get timeout")
            return False
        try:
            self.repro_attempt = int(plugin.attempt)
        except AttributeError:
            self.repro_attempt = 3
        try:
            self.kernel_version = plugin.kernel_version
        except AttributeError:
            return False
        return self.prepare_on_demand()

In the configuration file, the plugin config is defined as follows:

"BugBisection": {
     "kernel_version": {"e5fd3e6": ["v5.7", "v5.6", "v5.5", "v5.4", "v5.3", "v5.2", "v5.1", "v5.0"]},
     "timeout": 300,
     "attempt": 10
}

Next, we write the main entry function, run().

The first step is to determine whether the PoC is 32-bit or 64-bit. Then, we iterate through the kernel version from the kernel_version list. For each kernel version, we call build_upstream_kernel to compile the Linux kernel and then reproduce the bug in test_poc.

    def run(self):
        i386 = False
        if regx_match(r'386', self.case["manager"]):
            i386 = True
        for version in self.kernel_version[self.case_hash]:
            self.logger.info("Now testing {}".format(version))
            if self.build_upstream_kernel(kernel_version=version) != 0:
                self.err_msg("Failed to build upstream kernel")
                return False
            if not self.test_poc(i386=i386, version=version):
                self.report.append("Bug doesn't reproduce on {}".format(version))
                return True
            self.report.append("Bug triggered on {}".format(version))
        return True

Compiling the Linux kernel is straightforward. SyzBridge provides an API called build_mainline_kernel for building the Linux kernel. We specify the desired kernel version in the commit argument while preserving the original kernel configuration by setting keep_ori_config to True. For more information about the other arguments of build_mainline_kernel, please refer to the API Reference

    def build_upstream_kernel(self, kernel_version):
        if self._check_stamp("BUILD_KERNEL"):
            self._remove_stamp("BUILD_KERNEL")
        ret = self.build_mainline_kernel(commit=kernel_version, keep_ori_config=True)
        if ret == 0:
            self._create_stamp("BUILD_SYZ_FEATURE_MINIMIZE_KERNEL")
        return ret

We want to run the PoC on an upstream kernel in a virtual machine and monitor for kernel crashes if they occur. Fortunately, SyzBridge takes care of most of these steps.

First, we obtain the kernel from Config().get_kernel_by_name(). The self.kernel variable is derived from self.case['kernel'], which ultimately originates from Syzbot. Config().get_kernel_by_name() returns a Vendor() class. This Vendor() class contains a Reproducer() class that simplifies bug reproduction.

The reproduce() function requires several arguments. func is the callback function that the virtual machine will eventually invoke after it boots up. By using this callback function, you can determine what to do once the VM is ready, how to reproduce the bug, or perform any necessary preparations. func_args passes the callback function arguments to the callback function. vm_tag isn't mandatory, but it helps with identification. timeout specifies the maximum running time for the VM, which will be terminated if the time limit is reached. attempt indicates the number of rounds of reproduction needed. If root is set to true, it signifies reproducing the PoC as the root user. work_dir is the working directory where the reproducing log and VM log are saved. c_hash should be the bug hash value and is used in multiple places, such as in the log file name.

The return values of reproduce() include the crash context, triggerability status, and any extra output, if available.

    def test_poc(self, i386, version):
        upstream = self.cfg.get_kernel_by_name(self.kernel)
        if upstream == None:
            self.logger.exception("Fail to get {} kernel".format(self.kernel))
            return False
        upstream.repro.init_logger(self.logger)
        _, triggered, _ = upstream.repro.reproduce(func=self._capture_crash, func_args=(i386,), vm_tag='test {}'.format(version),\
            timeout=self.repro_timeout + 100, attempt=self.repro_attempt, root=True, work_dir=self.path_case_plugin, c_hash=self.case_hash)
        self.info_msg("crash triggered: {}".format(triggered))
        return triggered

The final missing piece is the callback function _capture_crash(). The callback function for reproduce() has two mandatory arguments: qemu, which is an object of the VM() class, granting the user the capability to operate the VM (e.g., uploading/downloading files to/from the VM), and the second argument root, which indicates whether the PoC is being reproduced by the root user. The following arguments are customized; here, we only pass i386 to the callback function.

To run the PoC, we upload the PoC source code into the VM, compile the source code into a Linux executable binary, and ultimately execute the PoC program. It's important to note that you don't need to worry about how to capture the kernel crash; SyzBridge takes care of this for you. Normally, SyzBridge captures every kernel crash report, but you can also specify which crashes to capture or exclude by setting the include and exclude options in the configuration file. See more details here

    def _capture_crash(self, qemu: VM, root: bool, i386: bool):
        qemu.upload(user='root', src=["{}/poc.c".format(self.path_case_plugin)], dst="~/poc.c", wait=True)
        if i386:
            qemu.command(cmds="gcc -m32 -pthread -o poc poc.c", user="root", wait=True)
        else:
            qemu.command(cmds="gcc -pthread -o poc poc.c", user="root", wait=True)
        
        qemu.command(cmds="./poc", user="root", wait=True, timeout=self.repro_timeout)
        return

The final plugin can be found at here