#!/usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0 # # A thin wrapper on top of the KUnit Kernel # # Copyright (C) 2019, Google LLC. # Author: Felix Guo # Author: Brendan Higgins import argparse import os import re import shlex import sys import time assert sys.version_info >= (3, 7), "Python version is too old" from dataclasses import dataclass from enum import Enum, auto from typing import Iterable, List, Optional, Sequence, Tuple import kunit_json import kunit_kernel import kunit_parser from kunit_printer import stdout class KunitStatus(Enum): SUCCESS = auto() CONFIG_FAILURE = auto() BUILD_FAILURE = auto() TEST_FAILURE = auto() @dataclass class KunitResult: status: KunitStatus elapsed_time: float @dataclass class KunitConfigRequest: build_dir: str make_options: Optional[List[str]] @dataclass class KunitBuildRequest(KunitConfigRequest): jobs: int @dataclass class KunitParseRequest: raw_output: Optional[str] json: Optional[str] @dataclass class KunitExecRequest(KunitParseRequest): build_dir: str timeout: int filter_glob: str filter: str filter_action: Optional[str] kernel_args: Optional[List[str]] run_isolated: Optional[str] list_tests: bool list_tests_attr: bool @dataclass class KunitRequest(KunitExecRequest, KunitBuildRequest): pass def get_kernel_root_path() -> str: path = sys.argv[0] if not __file__ else __file__ parts = os.path.realpath(path).split('tools/testing/kunit') if len(parts) != 2: sys.exit(1) return parts[0] def config_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitConfigRequest) -> KunitResult: stdout.print_with_timestamp('Configuring KUnit Kernel ...') config_start = time.time() success = linux.build_reconfig(request.build_dir, request.make_options) config_end = time.time() status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE return KunitResult(status, config_end - config_start) def build_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitBuildRequest) -> KunitResult: stdout.print_with_timestamp('Building KUnit Kernel ...') build_start = time.time() success = linux.build_kernel(request.jobs, request.build_dir, request.make_options) build_end = time.time() status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE return KunitResult(status, build_end - build_start) def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitBuildRequest) -> KunitResult: config_result = config_tests(linux, request) if config_result.status != KunitStatus.SUCCESS: return config_result return build_tests(linux, request) def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]: args = ['kunit.action=list'] if request.kernel_args: args.extend(request.kernel_args) output = linux.run_kernel(args=args, timeout=request.timeout, filter_glob=request.filter_glob, filter=request.filter, filter_action=request.filter_action, build_dir=request.build_dir) lines = kunit_parser.extract_tap_lines(output) # Hack! Drop the dummy TAP version header that the executor prints out. lines.pop() # Filter out any extraneous non-test output that might have gotten mixed in. return [l for l in output if re.match(r'^[^\s.]+\.[^\s.]+$', l)] def _list_tests_attr(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> Iterable[str]: args = ['kunit.action=list_attr'] if request.kernel_args: args.extend(request.kernel_args) output = linux.run_kernel(args=args, timeout=request.timeout, filter_glob=request.filter_glob, filter=request.filter, filter_action=request.filter_action, build_dir=request.build_dir) lines = kunit_parser.extract_tap_lines(output) # Hack! Drop the dummy TAP version header that the executor prints out. lines.pop() # Filter out any extraneous non-test output that might have gotten mixed in. return lines def _suites_from_test_list(tests: List[str]) -> List[str]: """Extracts all the suites from an ordered list of tests.""" suites = [] # type: List[str] for t in tests: parts = t.split('.', maxsplit=2) if len(parts) != 2: raise ValueError(f'internal KUnit error, test name should be of the form ".", got "{t}"') suite, _ = parts if not suites or suites[-1] != suite: suites.append(suite) return suites def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult: filter_globs = [request.filter_glob] if request.list_tests: output = _list_tests(linux, request) for line in output: print(line.rstrip()) return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0) if request.list_tests_attr: attr_output = _list_tests_attr(linux, request) for line in attr_output: print(line.rstrip()) return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0) if request.run_isolated: tests = _list_tests(linux, request) if request.run_isolated == 'test': filter_globs = tests elif request.run_isolated == 'suite': filter_globs = _suites_from_test_list(tests) # Apply the test-part of the user's glob, if present. if '.' in request.filter_glob: test_glob = request.filter_glob.split('.', maxsplit=2)[1] filter_globs = [g + '.'+ test_glob for g in filter_globs] metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig') test_counts = kunit_parser.TestCounts() exec_time = 0.0 for i, filter_glob in enumerate(filter_globs): stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs))) test_start = time.time() run_result = linux.run_kernel( args=request.kernel_args, timeout=request.timeout, filter_glob=filter_glob, filter=request.filter, filter_action=request.filter_action, build_dir=request.build_dir) _, test_result = parse_tests(request, metadata, run_result) # run_kernel() doesn't block on the kernel exiting. # That only happens after we get the last line of output from `run_result`. # So exec_time here actually contains parsing + execution time, which is fine. test_end = time.time() exec_time += test_end - test_start test_counts.add_subtest_counts(test_result.counts) if len(filter_globs) == 1 and test_counts.crashed > 0: bd = request.build_dir print('The kernel seems to have crashed; you can decode the stack traces with:') print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format( bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0])) kunit_status = _map_to_overall_status(test_counts.get_status()) return KunitResult(status=kunit_status, elapsed_time=exec_time) def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus: if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED): return KunitStatus.SUCCESS return KunitStatus.TEST_FAILURE def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]: parse_start = time.time() if request.raw_output: # Treat unparsed results as one passing test. fake_test = kunit_parser.Test() fake_test.status = kunit_parser.TestStatus.SUCCESS fake_test.counts.passed = 1 output: Iterable[str] = input_data if request.raw_output == 'all': pass elif request.raw_output == 'kunit': output = kunit_parser.extract_tap_lines(output) for line in output: print(line.rstrip()) parse_time = time.time() - parse_start return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test # Actually parse the test results. test = kunit_parser.parse_run_tests(input_data) parse_time = time.time() - parse_start if request.json: json_str = kunit_json.get_json_result( test=test, metadata=metadata) if request.json == 'stdout': print(json_str) else: with open(request.json, 'w') as f: f.write(json_str) stdout.print_with_timestamp("Test results stored in %s" % os.path.abspath(request.json)) if test.status != kunit_parser.TestStatus.SUCCESS: return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test return KunitResult(KunitStatus.SUCCESS, parse_time), test def run_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitRequest) -> KunitResult: run_start = time.time() config_result = config_tests(linux, request) if config_result.status != KunitStatus.SUCCESS: return config_result build_result = build_tests(linux, request) if build_result.status != KunitStatus.SUCCESS: return build_result exec_result = exec_tests(linux, request) run_end = time.time() stdout.print_with_timestamp(( 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 'building, %.3fs running\n') % ( run_end - run_start, config_result.elapsed_time, build_result.elapsed_time, exec_result.elapsed_time)) return exec_result # Problem: # $ kunit.py run --json # works as one would expect and prints the parsed test results as JSON. # $ kunit.py run --json suite_name # would *not* pass suite_name as the filter_glob and print as json. # argparse will consider it to be another way of writing # $ kunit.py run --json=suite_name # i.e. it would run all tests, and dump the json to a `suite_name` file. # So we hackily automatically rewrite --json => --json=stdout pseudo_bool_flag_defaults = { '--json': 'stdout', '--raw_output': 'kunit', } def massage_argv(argv: Sequence[str]) -> Sequence[str]: def massage_arg(arg: str) -> str: if arg not in pseudo_bool_flag_defaults: return arg return f'{arg}={pseudo_bool_flag_defaults[arg]}' return list(map(massage_arg, argv)) def get_default_jobs() -> int: return len(os.sched_getaffinity(0)) def add_common_opts(parser: argparse.ArgumentParser) -> None: parser.add_argument('--build_dir', help='As in the make command, it specifies the build ' 'directory.', type=str, default='.kunit', metavar='DIR') parser.add_argument('--make_options', help='X=Y make option, can be repeated.', action='append', metavar='X=Y') parser.add_argument('--alltests', help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config', action='store_true') parser.add_argument('--kunitconfig', help='Path to Kconfig fragment that enables KUnit tests.' ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" ' 'will get automatically appended. If repeated, the files ' 'blindly concatenated, which might not work in all cases.', action='append', metavar='PATHS') parser.add_argument('--kconfig_add', help='Additional Kconfig options to append to the ' '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.', action='append', metavar='CONFIG_X=Y') parser.add_argument('--arch', help=('Specifies the architecture to run tests under. ' 'The architecture specified here must match the ' 'string passed to the ARCH make param, ' 'e.g. i386, x86_64, arm, um, etc. Non-UML ' 'architectures run on QEMU.'), type=str, default='um', metavar='ARCH') parser.add_argument('--cross_compile', help=('Sets make\'s CROSS_COMPILE variable; it should ' 'be set to a toolchain path prefix (the prefix ' 'of gcc and other tools in your toolchain, for ' 'example `sparc64-linux-gnu-` if you have the ' 'sparc toolchain installed on your system, or ' '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` ' 'if you have downloaded the microblaze toolchain ' 'from the 0-day website to a directory in your ' 'home directory called `toolchains`).'), metavar='PREFIX') parser.add_argument('--qemu_config', help=('Takes a path to a path to a file containing ' 'a QemuArchParams object.'), type=str, metavar='FILE') parser.add_argument('--qemu_args', help='Additional QEMU arguments, e.g. "-smp 8"', action='append', metavar='') def add_build_opts(parser: argparse.ArgumentParser) -> None: parser.add_argument('--jobs', help='As in the make command, "Specifies the number of ' 'jobs (commands) to run simultaneously."', type=int, default=get_default_jobs(), metavar='N') def add_exec_opts(parser: argparse.ArgumentParser) -> None: parser.add_argument('--timeout', help='maximum number of seconds to allow for all tests ' 'to run. This does not include time taken to build the ' 'tests.', type=int, default=300, metavar='SECONDS') parser.add_argument('filter_glob', help='Filter which KUnit test suites/tests run at ' 'boot-time, e.g. list* or list*.*del_test', type=str, nargs='?', default='', metavar='filter_glob') parser.add_argument('--filter', help='Filter KUnit tests with attributes, ' 'e.g. module=example or speed>slow', type=str, default='') parser.add_argument('--filter_action', help='If set to skip, filtered tests will be skipped, ' 'e.g. --filter_action=skip. Otherwise they will not run.', type=str, choices=['skip']) parser.add_argument('--kernel_args', help='Kernel command-line parameters. Maybe be repeated', action='append', metavar='') parser.add_argument('--run_isolated', help='If set, boot the kernel for each ' 'individual suite/test. This is can be useful for debugging ' 'a non-hermetic test, one that might pass/fail based on ' 'what ran before it.', type=str, choices=['suite', 'test']) parser.add_argument('--list_tests', help='If set, list all tests that will be ' 'run.', action='store_true') parser.add_argument('--list_tests_attr', help='If set, list all tests and test ' 'attributes.', action='store_true') def add_parse_opts(parser: argparse.ArgumentParser) -> None: parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. ' 'By default, filters to just KUnit output. Use ' '--raw_output=all to show everything', type=str, nargs='?', const='all', default=None, choices=['all', 'kunit']) parser.add_argument('--json', nargs='?', help='Prints parsed test results as JSON to stdout or a file if ' 'a filename is specified. Does nothing if --raw_output is set.', type=str, const='stdout', default=None, metavar='FILE') def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree: """Returns a LinuxSourceTree based on the user's arguments.""" # Allow users to specify multiple arguments in one string, e.g. '-smp 8' qemu_args: List[str] = [] if cli_args.qemu_args: for arg in cli_args.qemu_args: qemu_args.extend(shlex.split(arg)) kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else [] if cli_args.alltests: # Prepend so user-specified options take prio if we ever allow # --kunitconfig options to have differing options. kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs return kunit_kernel.LinuxSourceTree(cli_args.build_dir, kunitconfig_paths=kunitconfigs, kconfig_add=cli_args.kconfig_add, arch=cli_args.arch, cross_compile=cli_args.cross_compile, qemu_config_path=cli_args.qemu_config, extra_qemu_args=qemu_args) def run_handler(cli_args: argparse.Namespace) -> None: if not os.path.exists(cli_args.build_dir): os.mkdir(cli_args.build_dir) linux = tree_from_args(cli_args) request = KunitRequest(build_dir=cli_args.build_dir, make_options=cli_args.make_options, jobs=cli_args.jobs, raw_output=cli_args.raw_output, json=cli_args.json, timeout=cli_args.timeout, filter_glob=cli_args.filter_glob, filter=cli_args.filter, filter_action=cli_args.filter_action, kernel_args=cli_args.kernel_args, run_isolated=cli_args.run_isolated, list_tests=cli_args.list_tests, list_tests_attr=cli_args.list_tests_attr) result = run_tests(linux, request) if result.status != KunitStatus.SUCCESS: sys.exit(1) def config_handler(cli_args: argparse.Namespace) -> None: if cli_args.build_dir and ( not os.path.exists(cli_args.build_dir)): os.mkdir(cli_args.build_dir) linux = tree_from_args(cli_args) request = KunitConfigRequest(build_dir=cli_args.build_dir, make_options=cli_args.make_options) result = config_tests(linux, request) stdout.print_with_timestamp(( 'Elapsed time: %.3fs\n') % ( result.elapsed_time)) if result.status != KunitStatus.SUCCESS: sys.exit(1) def build_handler(cli_args: argparse.Namespace) -> None: linux = tree_from_args(cli_args) request = KunitBuildRequest(build_dir=cli_args.build_dir, make_options=cli_args.make_options, jobs=cli_args.jobs) result = config_and_build_tests(linux, request) stdout.print_with_timestamp(( 'Elapsed time: %.3fs\n') % ( result.elapsed_time)) if result.status != KunitStatus.SUCCESS: sys.exit(1) def exec_handler(cli_args: argparse.Namespace) -> None: linux = tree_from_args(cli_args) exec_request = KunitExecRequest(raw_output=cli_args.raw_output, build_dir=cli_args.build_dir, json=cli_args.json, timeout=cli_args.timeout, filter_glob=cli_args.filter_glob, filter=cli_args.filter, filter_action=cli_args.filter_action, kernel_args=cli_args.kernel_args, run_isolated=cli_args.run_isolated, list_tests=cli_args.list_tests, list_tests_attr=cli_args.list_tests_attr) result = exec_tests(linux, exec_request) stdout.print_with_timestamp(( 'Elapsed time: %.3fs\n') % (result.elapsed_time)) if result.status != KunitStatus.SUCCESS: sys.exit(1) def parse_handler(cli_args: argparse.Namespace) -> None: if cli_args.file is None: sys.stdin.reconfigure(errors='backslashreplace') # type: ignore kunit_output = sys.stdin # type: Iterable[str] else: with open(cli_args.file, 'r', errors='backslashreplace') as f: kunit_output = f.read().splitlines() # We know nothing about how the result was created! metadata = kunit_json.Metadata() request = KunitParseRequest(raw_output=cli_args.raw_output, json=cli_args.json) result, _ = parse_tests(request, metadata, kunit_output) if result.status != KunitStatus.SUCCESS: sys.exit(1) subcommand_handlers_map = { 'run': run_handler, 'config': config_handler, 'build': build_handler, 'exec': exec_handler, 'parse': parse_handler } def main(argv: Sequence[str]) -> None: parser = argparse.ArgumentParser( description='Helps writing and running KUnit tests.') subparser = parser.add_subparsers(dest='subcommand') # The 'run' command will config, build, exec, and parse in one go. run_parser = subparser.add_parser('run', help='Runs KUnit tests.') add_common_opts(run_parser) add_build_opts(run_parser) add_exec_opts(run_parser) add_parse_opts(run_parser) config_parser = subparser.add_parser('config', help='Ensures that .config contains all of ' 'the options in .kunitconfig') add_common_opts(config_parser) build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests') add_common_opts(build_parser) add_build_opts(build_parser) exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests') add_common_opts(exec_parser) add_exec_opts(exec_parser) add_parse_opts(exec_parser) # The 'parse' option is special, as it doesn't need the kernel source # (therefore there is no need for a build_dir, hence no add_common_opts) # and the '--file' argument is not relevant to 'run', so isn't in # add_parse_opts() parse_parser = subparser.add_parser('parse', help='Parses KUnit results from a file, ' 'and parses formatted results.') add_parse_opts(parse_parser) parse_parser.add_argument('file', help='Specifies the file to read results from.', type=str, nargs='?', metavar='input_file') cli_args = parser.parse_args(massage_argv(argv)) if get_kernel_root_path(): os.chdir(get_kernel_root_path()) subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None) if subcomand_handler is None: parser.print_help() return subcomand_handler(cli_args) if __name__ == '__main__': main(sys.argv[1:])