# SPDX-License-Identifier: GPL-2.0 # # Runs UML kernel, collects output, and handles errors. # # Copyright (C) 2019, Google LLC. # Author: Felix Guo # Author: Brendan Higgins import importlib.abc import importlib.util import logging import subprocess import os import shlex import shutil import signal import threading from typing import Iterator, List, Optional, Tuple import kunit_config from kunit_printer import stdout import qemu_config KCONFIG_PATH = '.config' KUNITCONFIG_PATH = '.kunitconfig' OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' OUTFILE_PATH = 'test.log' ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') class ConfigError(Exception): """Represents an error trying to configure the Linux kernel.""" class BuildError(Exception): """Represents an error trying to build the Linux kernel.""" class LinuxSourceTreeOperations: """An abstraction over command line operations performed on a source tree.""" def __init__(self, linux_arch: str, cross_compile: Optional[str]): self._linux_arch = linux_arch self._cross_compile = cross_compile def make_mrproper(self) -> None: try: subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) except OSError as e: raise ConfigError('Could not call make command: ' + str(e)) except subprocess.CalledProcessError as e: raise ConfigError(e.output.decode()) def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: return base_kunitconfig def make_olddefconfig(self, build_dir: str, make_options) -> None: command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] if self._cross_compile: command += ['CROSS_COMPILE=' + self._cross_compile] if make_options: command.extend(make_options) print('Populating config with:\n$', ' '.join(command)) try: subprocess.check_output(command, stderr=subprocess.STDOUT) except OSError as e: raise ConfigError('Could not call make command: ' + str(e)) except subprocess.CalledProcessError as e: raise ConfigError(e.output.decode()) def make(self, jobs, build_dir: str, make_options) -> None: command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] if make_options: command.extend(make_options) if self._cross_compile: command += ['CROSS_COMPILE=' + self._cross_compile] print('Building with:\n$', ' '.join(command)) try: proc = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL) except OSError as e: raise BuildError('Could not call execute make: ' + str(e)) except subprocess.CalledProcessError as e: raise BuildError(e.output) _, stderr = proc.communicate() if proc.returncode != 0: raise BuildError(stderr.decode()) if stderr: # likely only due to build warnings print(stderr.decode()) def start(self, params: List[str], build_dir: str) -> subprocess.Popen: raise RuntimeError('not implemented!') class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): super().__init__(linux_arch=qemu_arch_params.linux_arch, cross_compile=cross_compile) self._kconfig = qemu_arch_params.kconfig self._qemu_arch = qemu_arch_params.qemu_arch self._kernel_path = qemu_arch_params.kernel_path self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot' self._extra_qemu_params = qemu_arch_params.extra_qemu_params def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: kconfig = kunit_config.parse_from_string(self._kconfig) kconfig.merge_in_entries(base_kunitconfig) return kconfig def start(self, params: List[str], build_dir: str) -> subprocess.Popen: kernel_path = os.path.join(build_dir, self._kernel_path) qemu_command = ['qemu-system-' + self._qemu_arch, '-nodefaults', '-m', '1024', '-kernel', kernel_path, '-append', ' '.join(params + [self._kernel_command_line]), '-no-reboot', '-nographic', '-serial', 'stdio'] + self._extra_qemu_params # Note: shlex.join() does what we want, but requires python 3.8+. print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) return subprocess.Popen(qemu_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, errors='backslashreplace') class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): """An abstraction over command line operations performed on a source tree.""" def __init__(self, cross_compile=None): super().__init__(linux_arch='um', cross_compile=cross_compile) def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) kconfig.merge_in_entries(base_kunitconfig) return kconfig def start(self, params: List[str], build_dir: str) -> subprocess.Popen: """Runs the Linux UML binary. Must be named 'linux'.""" linux_bin = os.path.join(build_dir, 'linux') params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) return subprocess.Popen([linux_bin] + params, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, errors='backslashreplace') def get_kconfig_path(build_dir: str) -> str: return os.path.join(build_dir, KCONFIG_PATH) def get_kunitconfig_path(build_dir: str) -> str: return os.path.join(build_dir, KUNITCONFIG_PATH) def get_old_kunitconfig_path(build_dir: str) -> str: return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) def get_parsed_kunitconfig(build_dir: str, kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: if not kunitconfig_paths: path = get_kunitconfig_path(build_dir) if not os.path.exists(path): shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) return kunit_config.parse_file(path) merged = kunit_config.Kconfig() for path in kunitconfig_paths: if os.path.isdir(path): path = os.path.join(path, KUNITCONFIG_PATH) if not os.path.exists(path): raise ConfigError(f'Specified kunitconfig ({path}) does not exist') partial = kunit_config.parse_file(path) diff = merged.conflicting_options(partial) if diff: diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') merged.merge_in_entries(partial) return merged def get_outfile_path(build_dir: str) -> str: return os.path.join(build_dir, OUTFILE_PATH) def _default_qemu_config_path(arch: str) -> str: config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') if os.path.isfile(config_path): return config_path options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) def _get_qemu_ops(config_path: str, extra_qemu_args: Optional[List[str]], cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: # The module name/path has very little to do with where the actual file # exists (I learned this through experimentation and could not find it # anywhere in the Python documentation). # # Bascially, we completely ignore the actual file location of the config # we are loading and just tell Python that the module lives in the # QEMU_CONFIGS_DIR for import purposes regardless of where it actually # exists as a file. module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) spec = importlib.util.spec_from_file_location(module_path, config_path) assert spec is not None config = importlib.util.module_from_spec(spec) # See https://github.com/python/typeshed/pull/2626 for context. assert isinstance(spec.loader, importlib.abc.Loader) spec.loader.exec_module(config) if not hasattr(config, 'QEMU_ARCH'): raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) params: qemu_config.QemuArchParams = config.QEMU_ARCH # type: ignore if extra_qemu_args: params.extra_qemu_params.extend(extra_qemu_args) return params.linux_arch, LinuxSourceTreeOperationsQemu( params, cross_compile=cross_compile) class LinuxSourceTree: """Represents a Linux kernel source tree with KUnit tests.""" def __init__( self, build_dir: str, kunitconfig_paths: Optional[List[str]]=None, kconfig_add: Optional[List[str]]=None, arch=None, cross_compile=None, qemu_config_path=None, extra_qemu_args=None) -> None: signal.signal(signal.SIGINT, self.signal_handler) if qemu_config_path: self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) else: self._arch = 'um' if arch is None else arch if self._arch == 'um': self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) else: qemu_config_path = _default_qemu_config_path(self._arch) _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) if kconfig_add: kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) self._kconfig.merge_in_entries(kconfig) def arch(self) -> str: return self._arch def clean(self) -> bool: try: self._ops.make_mrproper() except ConfigError as e: logging.error(e) return False return True def validate_config(self, build_dir: str) -> bool: kconfig_path = get_kconfig_path(build_dir) validated_kconfig = kunit_config.parse_file(kconfig_path) if self._kconfig.is_subset_of(validated_kconfig): return True missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 'This is probably due to unsatisfied dependencies.\n' \ 'Missing: ' + ', '.join(str(e) for e in missing) if self._arch == 'um': message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 'on a different architecture with something like "--arch=x86_64".' logging.error(message) return False def build_config(self, build_dir: str, make_options) -> bool: kconfig_path = get_kconfig_path(build_dir) if build_dir and not os.path.exists(build_dir): os.mkdir(build_dir) try: self._kconfig = self._ops.make_arch_config(self._kconfig) self._kconfig.write_to_file(kconfig_path) self._ops.make_olddefconfig(build_dir, make_options) except ConfigError as e: logging.error(e) return False if not self.validate_config(build_dir): return False old_path = get_old_kunitconfig_path(build_dir) if os.path.exists(old_path): os.remove(old_path) # write_to_file appends to the file self._kconfig.write_to_file(old_path) return True def _kunitconfig_changed(self, build_dir: str) -> bool: old_path = get_old_kunitconfig_path(build_dir) if not os.path.exists(old_path): return True old_kconfig = kunit_config.parse_file(old_path) return old_kconfig != self._kconfig def build_reconfig(self, build_dir: str, make_options) -> bool: """Creates a new .config if it is not a subset of the .kunitconfig.""" kconfig_path = get_kconfig_path(build_dir) if not os.path.exists(kconfig_path): print('Generating .config ...') return self.build_config(build_dir, make_options) existing_kconfig = kunit_config.parse_file(kconfig_path) self._kconfig = self._ops.make_arch_config(self._kconfig) if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): return True print('Regenerating .config ...') os.remove(kconfig_path) return self.build_config(build_dir, make_options) def build_kernel(self, jobs, build_dir: str, make_options) -> bool: try: self._ops.make_olddefconfig(build_dir, make_options) self._ops.make(jobs, build_dir, make_options) except (ConfigError, BuildError) as e: logging.error(e) return False return self.validate_config(build_dir) def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]: if not args: args = [] if filter_glob: args.append('kunit.filter_glob='+filter_glob) args.append('kunit.enable=1') process = self._ops.start(args, build_dir) assert process.stdout is not None # tell mypy it's set # Enforce the timeout in a background thread. def _wait_proc(): try: process.wait(timeout=timeout) except Exception as e: print(e) process.terminate() process.wait() waiter = threading.Thread(target=_wait_proc) waiter.start() output = open(get_outfile_path(build_dir), 'w') try: # Tee the output to the file and to our caller in real time. for line in process.stdout: output.write(line) yield line # This runs even if our caller doesn't consume every line. finally: # Flush any leftover output to the file output.write(process.stdout.read()) output.close() process.stdout.close() waiter.join() subprocess.call(['stty', 'sane']) def signal_handler(self, unused_sig, unused_frame) -> None: logging.error('Build interruption occurred. Cleaning console.') subprocess.call(['stty', 'sane'])