diff options
Diffstat (limited to '')
-rw-r--r-- | tools/testing/kunit/kunit_parser.py | 1094 |
1 files changed, 780 insertions, 314 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py index 8019e3dd4c32..ce34be15c929 100644 --- a/tools/testing/kunit/kunit_parser.py +++ b/tools/testing/kunit/kunit_parser.py @@ -1,359 +1,825 @@ # SPDX-License-Identifier: GPL-2.0 # -# Parses test results from a kernel dmesg log. +# Parses KTAP test results from a kernel dmesg log and incrementally prints +# results with reader-friendly format. Stores and returns test results in a +# Test object. # # Copyright (C) 2019, Google LLC. # Author: Felix Guo <felixguoxiuping@gmail.com> # Author: Brendan Higgins <brendanhiggins@google.com> +# Author: Rae Moar <rmoar@google.com> +from __future__ import annotations +from dataclasses import dataclass import re +import textwrap -from collections import namedtuple -from datetime import datetime from enum import Enum, auto -from functools import reduce -from typing import List - -TestResult = namedtuple('TestResult', ['status','suites','log']) - -class TestSuite(object): - def __init__(self): - self.status = None - self.name = None - self.cases = [] - - def __str__(self): - return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')' - - def __repr__(self): +from typing import Iterable, Iterator, List, Optional, Tuple + +from kunit_printer import stdout + +class Test: + """ + A class to represent a test parsed from KTAP results. All KTAP + results within a test log are stored in a main Test object as + subtests. + + Attributes: + status : TestStatus - status of the test + name : str - name of the test + expected_count : int - expected number of subtests (0 if single + test case and None if unknown expected number of subtests) + subtests : List[Test] - list of subtests + log : List[str] - log of KTAP lines that correspond to the test + counts : TestCounts - counts of the test statuses and errors of + subtests or of the test itself if the test is a single + test case. + """ + def __init__(self) -> None: + """Creates Test object with default attributes.""" + self.status = TestStatus.TEST_CRASHED + self.name = '' + self.expected_count = 0 # type: Optional[int] + self.subtests = [] # type: List[Test] + self.log = [] # type: List[str] + self.counts = TestCounts() + + def __str__(self) -> str: + """Returns string representation of a Test class object.""" + return (f'Test({self.status}, {self.name}, {self.expected_count}, ' + f'{self.subtests}, {self.log}, {self.counts})') + + def __repr__(self) -> str: + """Returns string representation of a Test class object.""" return str(self) -class TestCase(object): - def __init__(self): - self.status = None - self.name = '' - self.log = [] + def add_error(self, error_message: str) -> None: + """Records an error that occurred while parsing this test.""" + self.counts.errors += 1 + stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') - def __str__(self): - return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')' - - def __repr__(self): - return str(self) + def ok_status(self) -> bool: + """Returns true if the status was ok, i.e. passed or skipped.""" + return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED) class TestStatus(Enum): + """An enumeration class to represent the status of a test.""" SUCCESS = auto() FAILURE = auto() + SKIPPED = auto() TEST_CRASHED = auto() NO_TESTS = auto() FAILURE_TO_PARSE_TESTS = auto() -kunit_start_re = re.compile(r'TAP version [0-9]+$') -kunit_end_re = re.compile('(List of all partitions:|' - 'Kernel panic - not syncing: VFS:)') - -def isolate_kunit_output(kernel_output): - started = False - for line in kernel_output: - if kunit_start_re.search(line): - prefix_len = len(line.split('TAP version')[0]) - started = True - yield line[prefix_len:] if prefix_len > 0 else line - elif kunit_end_re.search(line): - break - elif started: - yield line[prefix_len:] if prefix_len > 0 else line - -def raw_output(kernel_output): - for line in kernel_output: - print(line) - yield line - -DIVIDER = '=' * 60 - -RESET = '\033[0;0m' - -def red(text): - return '\033[1;31m' + text + RESET - -def yellow(text): - return '\033[1;33m' + text + RESET - -def green(text): - return '\033[1;32m' + text + RESET - -def print_with_timestamp(message): - print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) - -def format_suite_divider(message): - return '======== ' + message + ' ========' - -def print_suite_divider(message): - print_with_timestamp(DIVIDER) - print_with_timestamp(format_suite_divider(message)) - -def print_log(log): - for m in log: - print_with_timestamp(m) - -TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$') - -def consume_non_diagnositic(lines: List[str]) -> None: - while lines and not TAP_ENTRIES.match(lines[0]): - lines.pop(0) +@dataclass +class TestCounts: + """ + Tracks the counts of statuses of all test cases and any errors within + a Test. + """ + passed: int = 0 + failed: int = 0 + crashed: int = 0 + skipped: int = 0 + errors: int = 0 + + def __str__(self) -> str: + """Returns the string representation of a TestCounts object.""" + statuses = [('passed', self.passed), ('failed', self.failed), + ('crashed', self.crashed), ('skipped', self.skipped), + ('errors', self.errors)] + return f'Ran {self.total()} tests: ' + \ + ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) + + def total(self) -> int: + """Returns the total number of test cases within a test + object, where a test case is a test with no subtests. + """ + return (self.passed + self.failed + self.crashed + + self.skipped) + + def add_subtest_counts(self, counts: TestCounts) -> None: + """ + Adds the counts of another TestCounts object to the current + TestCounts object. Used to add the counts of a subtest to the + parent test. + + Parameters: + counts - a different TestCounts object whose counts + will be added to the counts of the TestCounts object + """ + self.passed += counts.passed + self.failed += counts.failed + self.crashed += counts.crashed + self.skipped += counts.skipped + self.errors += counts.errors + + def get_status(self) -> TestStatus: + """Returns the aggregated status of a Test using test + counts. + """ + if self.total() == 0: + return TestStatus.NO_TESTS + if self.crashed: + # Crashes should take priority. + return TestStatus.TEST_CRASHED + if self.failed: + return TestStatus.FAILURE + if self.passed: + # No failures or crashes, looks good! + return TestStatus.SUCCESS + # We have only skipped tests. + return TestStatus.SKIPPED + + def add_status(self, status: TestStatus) -> None: + """Increments the count for `status`.""" + if status == TestStatus.SUCCESS: + self.passed += 1 + elif status == TestStatus.FAILURE: + self.failed += 1 + elif status == TestStatus.SKIPPED: + self.skipped += 1 + elif status != TestStatus.NO_TESTS: + self.crashed += 1 + +class LineStream: + """ + A class to represent the lines of kernel output. + Provides a lazy peek()/pop() interface over an iterator of + (line#, text). + """ + _lines: Iterator[Tuple[int, str]] + _next: Tuple[int, str] + _need_next: bool + _done: bool + + def __init__(self, lines: Iterator[Tuple[int, str]]): + """Creates a new LineStream that wraps the given iterator.""" + self._lines = lines + self._done = False + self._need_next = True + self._next = (0, '') + + def _get_next(self) -> None: + """Advances the LineSteam to the next line, if necessary.""" + if not self._need_next: + return + try: + self._next = next(self._lines) + except StopIteration: + self._done = True + finally: + self._need_next = False + + def peek(self) -> str: + """Returns the current line, without advancing the LineStream. + """ + self._get_next() + return self._next[1] + + def pop(self) -> str: + """Returns the current line and advances the LineStream to + the next line. + """ + s = self.peek() + if self._done: + raise ValueError(f'LineStream: going past EOF, last line was {s}') + self._need_next = True + return s + + def __bool__(self) -> bool: + """Returns True if stream has more lines.""" + self._get_next() + return not self._done + + # Only used by kunit_tool_test.py. + def __iter__(self) -> Iterator[str]: + """Empties all lines stored in LineStream object into + Iterator object and returns the Iterator object. + """ + while bool(self): + yield self.pop() + + def line_number(self) -> int: + """Returns the line number of the current line.""" + self._get_next() + return self._next[0] + +# Parsing helper methods: + +KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$') +TAP_START = re.compile(r'\s*TAP version ([0-9]+)$') +KTAP_END = re.compile(r'\s*(List of all partitions:|' + 'Kernel panic - not syncing: VFS:|reboot: System halted)') +EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$') + +def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: + """Extracts KTAP lines from the kernel output.""" + def isolate_ktap_output(kernel_output: Iterable[str]) \ + -> Iterator[Tuple[int, str]]: + line_num = 0 + started = False + for line in kernel_output: + line_num += 1 + line = line.rstrip() # remove trailing \n + if not started and KTAP_START.search(line): + # start extracting KTAP lines and set prefix + # to number of characters before version line + prefix_len = len( + line.split('KTAP version')[0]) + started = True + yield line_num, line[prefix_len:] + elif not started and TAP_START.search(line): + # start extracting KTAP lines and set prefix + # to number of characters before version line + prefix_len = len(line.split('TAP version')[0]) + started = True + yield line_num, line[prefix_len:] + elif started and KTAP_END.search(line): + # stop extracting KTAP lines + break + elif started: + # remove the prefix, if any. + line = line[prefix_len:] + yield line_num, line + elif EXECUTOR_ERROR.search(line): + yield line_num, line + return LineStream(lines=isolate_ktap_output(kernel_output)) + +KTAP_VERSIONS = [1] +TAP_VERSIONS = [13, 14] + +def check_version(version_num: int, accepted_versions: List[int], + version_type: str, test: Test) -> None: + """ + Adds error to test object if version number is too high or too + low. + + Parameters: + version_num - The inputted version number from the parsed KTAP or TAP + header line + accepted_version - List of accepted KTAP or TAP versions + version_type - 'KTAP' or 'TAP' depending on the type of + version line. + test - Test object for current test being parsed + """ + if version_num < min(accepted_versions): + test.add_error(f'{version_type} version lower than expected!') + elif version_num > max(accepted_versions): + test.add_error(f'{version_type} version higer than expected!') + +def parse_ktap_header(lines: LineStream, test: Test) -> bool: + """ + Parses KTAP/TAP header line and checks version number. + Returns False if fails to parse KTAP/TAP header line. + + Accepted formats: + - 'KTAP version [version number]' + - 'TAP version [version number]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + + Return: + True if successfully parsed KTAP/TAP header line + """ + ktap_match = KTAP_START.match(lines.peek()) + tap_match = TAP_START.match(lines.peek()) + if ktap_match: + version_num = int(ktap_match.group(1)) + check_version(version_num, KTAP_VERSIONS, 'KTAP', test) + elif tap_match: + version_num = int(tap_match.group(1)) + check_version(version_num, TAP_VERSIONS, 'TAP', test) + else: + return False + lines.pop() + return True -def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None: - while lines and not TAP_ENTRIES.match(lines[0]): - test_case.log.append(lines[0]) - lines.pop(0) +TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$') -OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text']) +def parse_test_header(lines: LineStream, test: Test) -> bool: + """ + Parses test header and stores test name in test object. + Returns False if fails to parse test header line. -OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$') + Accepted format: + - '# Subtest: [test name]' -OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$') + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed -def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool: - save_non_diagnositic(lines, test_case) - if not lines: - test_case.status = TestStatus.TEST_CRASHED - return True - line = lines[0] - match = OK_NOT_OK_SUBTEST.match(line) - while not match and lines: - line = lines.pop(0) - match = OK_NOT_OK_SUBTEST.match(line) - if match: - test_case.log.append(lines.pop(0)) - test_case.name = match.group(2) - if test_case.status == TestStatus.TEST_CRASHED: - return True - if match.group(1) == 'ok': - test_case.status = TestStatus.SUCCESS - else: - test_case.status = TestStatus.FAILURE - return True - else: + Return: + True if successfully parsed test header line + """ + match = TEST_HEADER.match(lines.peek()) + if not match: return False - -SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# .*?: (.*)$') -DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!' - -def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool: - save_non_diagnositic(lines, test_case) - if not lines: + test.name = match.group(1) + lines.pop() + return True + +TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)') + +def parse_test_plan(lines: LineStream, test: Test) -> bool: + """ + Parses test plan line and stores the expected number of subtests in + test object. Reports an error if expected count is 0. + Returns False and sets expected_count to None if there is no valid test + plan. + + Accepted format: + - '1..[number of subtests]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + + Return: + True if successfully parsed test plan line + """ + match = TEST_PLAN.match(lines.peek()) + if not match: + test.expected_count = None return False - line = lines[0] - match = SUBTEST_DIAGNOSTIC.match(line) - if match: - test_case.log.append(lines.pop(0)) - if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE: - test_case.status = TestStatus.TEST_CRASHED - return True - else: + expected_count = int(match.group(1)) + test.expected_count = expected_count + lines.pop() + return True + +TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') + +TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') + +def peek_test_name_match(lines: LineStream, test: Test) -> bool: + """ + Matches current line with the format of a test result line and checks + if the name matches the name of the current test. + Returns False if fails to match format or name. + + Accepted format: + - '[ok|not ok] [test number] [-] [test name] [optional skip + directive]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + + Return: + True if matched a test result line and the name matching the + expected test name + """ + line = lines.peek() + match = TEST_RESULT.match(line) + if not match: + return False + name = match.group(4) + return name == test.name + +def parse_test_result(lines: LineStream, test: Test, + expected_num: int) -> bool: + """ + Parses test result line and stores the status and name in the test + object. Reports an error if the test number does not match expected + test number. + Returns False if fails to parse test result line. + + Note that the SKIP directive is the only direction that causes a + change in status. + + Accepted format: + - '[ok|not ok] [test number] [-] [test name] [optional skip + directive]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + expected_num - expected test number for current test + + Return: + True if successfully parsed a test result line. + """ + line = lines.peek() + match = TEST_RESULT.match(line) + skip_match = TEST_RESULT_SKIP.match(line) + + # Check if line matches test result line format + if not match: return False + lines.pop() -def parse_test_case(lines: List[str]) -> TestCase: - test_case = TestCase() - save_non_diagnositic(lines, test_case) - while parse_diagnostic(lines, test_case): - pass - if parse_ok_not_ok_test_case(lines, test_case): - return test_case + # Set name of test object + if skip_match: + test.name = skip_match.group(4) else: - return None - -SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$') - -def parse_subtest_header(lines: List[str]) -> str: - consume_non_diagnositic(lines) - if not lines: - return None - match = SUBTEST_HEADER.match(lines[0]) - if match: - lines.pop(0) - return match.group(1) + test.name = match.group(4) + + # Check test num + num = int(match.group(2)) + if num != expected_num: + test.add_error(f'Expected test number {expected_num} but found {num}') + + # Set status of test object + status = match.group(1) + if skip_match: + test.status = TestStatus.SKIPPED + elif status == 'ok': + test.status = TestStatus.SUCCESS else: - return None + test.status = TestStatus.FAILURE + return True -SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)') +def parse_diagnostic(lines: LineStream) -> List[str]: + """ + Parse lines that do not match the format of a test result line or + test header line and returns them in list. -def parse_subtest_plan(lines: List[str]) -> int: - consume_non_diagnositic(lines) - match = SUBTEST_PLAN.match(lines[0]) - if match: - lines.pop(0) - return int(match.group(1)) - else: - return None - -def max_status(left: TestStatus, right: TestStatus) -> TestStatus: - if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: - return TestStatus.TEST_CRASHED - elif left == TestStatus.FAILURE or right == TestStatus.FAILURE: - return TestStatus.FAILURE - elif left != TestStatus.SUCCESS: - return left - elif right != TestStatus.SUCCESS: - return right - else: - return TestStatus.SUCCESS + Line formats that are not parsed: + - '# Subtest: [test name]' + - '[ok|not ok] [test number] [-] [test name] [optional skip + directive]' + - 'KTAP version [version number]' -def parse_ok_not_ok_test_suite(lines: List[str], - test_suite: TestSuite, - expected_suite_index: int) -> bool: - consume_non_diagnositic(lines) - if not lines: - test_suite.status = TestStatus.TEST_CRASHED - return False - line = lines[0] - match = OK_NOT_OK_MODULE.match(line) - if match: - lines.pop(0) - if match.group(1) == 'ok': - test_suite.status = TestStatus.SUCCESS - else: - test_suite.status = TestStatus.FAILURE - suite_index = int(match.group(2)) - if suite_index != expected_suite_index: - print_with_timestamp( - red('[ERROR] ') + 'expected_suite_index ' + - str(expected_suite_index) + ', but got ' + - str(suite_index)) - return True - else: - return False + Parameters: + lines - LineStream of KTAP output to parse -def bubble_up_errors(to_status, status_container_list) -> TestStatus: - status_list = map(to_status, status_container_list) - return reduce(max_status, status_list, TestStatus.SUCCESS) + Return: + Log of diagnostic lines + """ + log = [] # type: List[str] + non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN] + while lines and not any(re.match(lines.peek()) + for re in non_diagnostic_lines): + log.append(lines.pop()) + return log -def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus: - max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases) - return max_status(max_test_case_status, test_suite.status) -def parse_test_suite(lines: List[str], expected_suite_index: int) -> TestSuite: - if not lines: - return None - consume_non_diagnositic(lines) - test_suite = TestSuite() - test_suite.status = TestStatus.SUCCESS - name = parse_subtest_header(lines) - if not name: - return None - test_suite.name = name - expected_test_case_num = parse_subtest_plan(lines) - if not expected_test_case_num: - return None - while expected_test_case_num > 0: - test_case = parse_test_case(lines) - if not test_case: - break - test_suite.cases.append(test_case) - expected_test_case_num -= 1 - if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index): - test_suite.status = bubble_up_test_case_errors(test_suite) - return test_suite - elif not lines: - print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token') - return test_suite - else: - print('failed to parse end of suite' + lines[0]) - return None +# Printing helper methods: -TAP_HEADER = re.compile(r'^TAP version 14$') +DIVIDER = '=' * 60 -def parse_tap_header(lines: List[str]) -> bool: - consume_non_diagnositic(lines) - if TAP_HEADER.match(lines[0]): - lines.pop(0) - return True +def format_test_divider(message: str, len_message: int) -> str: + """ + Returns string with message centered in fixed width divider. + + Example: + '===================== message example =====================' + + Parameters: + message - message to be centered in divider line + len_message - length of the message to be printed such that + any characters of the color codes are not counted + + Return: + String containing message centered in fixed width divider + """ + default_count = 3 # default number of dashes + len_1 = default_count + len_2 = default_count + difference = len(DIVIDER) - len_message - 2 # 2 spaces added + if difference > 0: + # calculate number of dashes for each side of the divider + len_1 = int(difference / 2) + len_2 = difference - len_1 + return ('=' * len_1) + f' {message} ' + ('=' * len_2) + +def print_test_header(test: Test) -> None: + """ + Prints test header with test name and optionally the expected number + of subtests. + + Example: + '=================== example (2 subtests) ===================' + + Parameters: + test - Test object representing current test being printed + """ + message = test.name + if message != "": + # Add a leading space before the subtest counts only if a test name + # is provided using a "# Subtest" header line. + message += " " + if test.expected_count: + if test.expected_count == 1: + message += '(1 subtest)' + else: + message += f'({test.expected_count} subtests)' + stdout.print_with_timestamp(format_test_divider(message, len(message))) + +def print_log(log: Iterable[str]) -> None: + """Prints all strings in saved log for test in yellow.""" + formatted = textwrap.dedent('\n'.join(log)) + for line in formatted.splitlines(): + stdout.print_with_timestamp(stdout.yellow(line)) + +def format_test_result(test: Test) -> str: + """ + Returns string with formatted test result with colored status and test + name. + + Example: + '[PASSED] example' + + Parameters: + test - Test object representing current test being printed + + Return: + String containing formatted test result + """ + if test.status == TestStatus.SUCCESS: + return stdout.green('[PASSED] ') + test.name + if test.status == TestStatus.SKIPPED: + return stdout.yellow('[SKIPPED] ') + test.name + if test.status == TestStatus.NO_TESTS: + return stdout.yellow('[NO TESTS RUN] ') + test.name + if test.status == TestStatus.TEST_CRASHED: + print_log(test.log) + return stdout.red('[CRASHED] ') + test.name + print_log(test.log) + return stdout.red('[FAILED] ') + test.name + +def print_test_result(test: Test) -> None: + """ + Prints result line with status of test. + + Example: + '[PASSED] example' + + Parameters: + test - Test object representing current test being printed + """ + stdout.print_with_timestamp(format_test_result(test)) + +def print_test_footer(test: Test) -> None: + """ + Prints test footer with status of test. + + Example: + '===================== [PASSED] example =====================' + + Parameters: + test - Test object representing current test being printed + """ + message = format_test_result(test) + stdout.print_with_timestamp(format_test_divider(message, + len(message) - stdout.color_len())) + + + +def _summarize_failed_tests(test: Test) -> str: + """Tries to summarize all the failing subtests in `test`.""" + + def failed_names(test: Test, parent_name: str) -> List[str]: + # Note: we use 'main' internally for the top-level test. + if not parent_name or parent_name == 'main': + full_name = test.name + else: + full_name = parent_name + '.' + test.name + + if not test.subtests: # this is a leaf node + return [full_name] + + # If all the children failed, just say this subtest failed. + # Don't summarize it down "the top-level test failed", though. + failed_subtests = [sub for sub in test.subtests if not sub.ok_status()] + if parent_name and len(failed_subtests) == len(test.subtests): + return [full_name] + + all_failures = [] # type: List[str] + for t in failed_subtests: + all_failures.extend(failed_names(t, full_name)) + return all_failures + + failures = failed_names(test, '') + # If there are too many failures, printing them out will just be noisy. + if len(failures) > 10: # this is an arbitrary limit + return '' + + return 'Failures: ' + ', '.join(failures) + + +def print_summary_line(test: Test) -> None: + """ + Prints summary line of test object. Color of line is dependent on + status of test. Color is green if test passes, yellow if test is + skipped, and red if the test fails or crashes. Summary line contains + counts of the statuses of the tests subtests or the test itself if it + has no subtests. + + Example: + "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, + Errors: 0" + + test - Test object representing current test being printed + """ + if test.status == TestStatus.SUCCESS: + color = stdout.green + elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): + color = stdout.yellow else: - return False - -TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)') - -def parse_test_plan(lines: List[str]) -> int: - consume_non_diagnositic(lines) - match = TEST_PLAN.match(lines[0]) - if match: - lines.pop(0) - return int(match.group(1)) + color = stdout.red + stdout.print_with_timestamp(color(f'Testing complete. {test.counts}')) + + # Summarize failures that might have gone off-screen since we had a lot + # of tests (arbitrarily defined as >=100 for now). + if test.ok_status() or test.counts.total() < 100: + return + summarized = _summarize_failed_tests(test) + if not summarized: + return + stdout.print_with_timestamp(color(summarized)) + +# Other methods: + +def bubble_up_test_results(test: Test) -> None: + """ + If the test has subtests, add the test counts of the subtests to the + test and check if any of the tests crashed and if so set the test + status to crashed. Otherwise if the test has no subtests add the + status of the test to the test counts. + + Parameters: + test - Test object for current test being parsed + """ + subtests = test.subtests + counts = test.counts + status = test.status + for t in subtests: + counts.add_subtest_counts(t.counts) + if counts.total() == 0: + counts.add_status(status) + elif test.counts.get_status() == TestStatus.TEST_CRASHED: + test.status = TestStatus.TEST_CRASHED + +def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test: + """ + Finds next test to parse in LineStream, creates new Test object, + parses any subtests of the test, populates Test object with all + information (status, name) about the test and the Test objects for + any subtests, and then returns the Test object. The method accepts + three formats of tests: + + Accepted test formats: + + - Main KTAP/TAP header + + Example: + + KTAP version 1 + 1..4 + [subtests] + + - Subtest header (must include either the KTAP version line or + "# Subtest" header line) + + Example (preferred format with both KTAP version line and + "# Subtest" line): + + KTAP version 1 + # Subtest: name + 1..3 + [subtests] + ok 1 name + + Example (only "# Subtest" line): + + # Subtest: name + 1..3 + [subtests] + ok 1 name + + Example (only KTAP version line, compliant with KTAP v1 spec): + + KTAP version 1 + 1..3 + [subtests] + ok 1 name + + - Test result line + + Example: + + ok 1 - test + + Parameters: + lines - LineStream of KTAP output to parse + expected_num - expected test number for test to be parsed + log - list of strings containing any preceding diagnostic lines + corresponding to the current test + is_subtest - boolean indicating whether test is a subtest + + Return: + Test object populated with characteristics and any subtests + """ + test = Test() + test.log.extend(log) + + # Parse any errors prior to parsing tests + err_log = parse_diagnostic(lines) + test.log.extend(err_log) + + if not is_subtest: + # If parsing the main/top-level test, parse KTAP version line and + # test plan + test.name = "main" + ktap_line = parse_ktap_header(lines, test) + test.log.extend(parse_diagnostic(lines)) + parse_test_plan(lines, test) + parent_test = True else: - return None - -def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus: - return bubble_up_errors(lambda x: x.status, test_suite_list) - -def parse_test_result(lines: List[str]) -> TestResult: - consume_non_diagnositic(lines) - if not lines or not parse_tap_header(lines): - return TestResult(TestStatus.NO_TESTS, [], lines) - expected_test_suite_num = parse_test_plan(lines) - if not expected_test_suite_num: - return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines) - test_suites = [] - for i in range(1, expected_test_suite_num + 1): - test_suite = parse_test_suite(lines, i) - if test_suite: - test_suites.append(test_suite) + # If not the main test, attempt to parse a test header containing + # the KTAP version line and/or subtest header line + ktap_line = parse_ktap_header(lines, test) + subtest_line = parse_test_header(lines, test) + parent_test = (ktap_line or subtest_line) + if parent_test: + # If KTAP version line and/or subtest header is found, attempt + # to parse test plan and print test header + test.log.extend(parse_diagnostic(lines)) + parse_test_plan(lines, test) + print_test_header(test) + expected_count = test.expected_count + subtests = [] + test_num = 1 + while parent_test and (expected_count is None or test_num <= expected_count): + # Loop to parse any subtests. + # Break after parsing expected number of tests or + # if expected number of tests is unknown break when test + # result line with matching name to subtest header is found + # or no more lines in stream. + sub_log = parse_diagnostic(lines) + sub_test = Test() + if not lines or (peek_test_name_match(lines, test) and + is_subtest): + if expected_count and test_num <= expected_count: + # If parser reaches end of test before + # parsing expected number of subtests, print + # crashed subtest and record error + test.add_error('missing expected subtest!') + sub_test.log.extend(sub_log) + test.counts.add_status( + TestStatus.TEST_CRASHED) + print_test_result(sub_test) + else: + test.log.extend(sub_log) + break else: - print_with_timestamp( - red('[ERROR] ') + ' expected ' + - str(expected_test_suite_num) + - ' test suites, but got ' + str(i - 2)) - break - test_suite = parse_test_suite(lines, -1) - if test_suite: - print_with_timestamp(red('[ERROR] ') + - 'got unexpected test suite: ' + test_suite.name) - if test_suites: - return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines) - else: - return TestResult(TestStatus.NO_TESTS, [], lines) - -def print_and_count_results(test_result: TestResult) -> None: - total_tests = 0 - failed_tests = 0 - crashed_tests = 0 - for test_suite in test_result.suites: - if test_suite.status == TestStatus.SUCCESS: - print_suite_divider(green('[PASSED] ') + test_suite.name) - elif test_suite.status == TestStatus.TEST_CRASHED: - print_suite_divider(red('[CRASHED] ' + test_suite.name)) + sub_test = parse_test(lines, test_num, sub_log, True) + subtests.append(sub_test) + test_num += 1 + test.subtests = subtests + if is_subtest: + # If not main test, look for test result line + test.log.extend(parse_diagnostic(lines)) + if test.name != "" and not peek_test_name_match(lines, test): + test.add_error('missing subtest result line!') else: - print_suite_divider(red('[FAILED] ') + test_suite.name) - for test_case in test_suite.cases: - total_tests += 1 - if test_case.status == TestStatus.SUCCESS: - print_with_timestamp(green('[PASSED] ') + test_case.name) - elif test_case.status == TestStatus.TEST_CRASHED: - crashed_tests += 1 - print_with_timestamp(red('[CRASHED] ' + test_case.name)) - print_log(map(yellow, test_case.log)) - print_with_timestamp('') - else: - failed_tests += 1 - print_with_timestamp(red('[FAILED] ') + test_case.name) - print_log(map(yellow, test_case.log)) - print_with_timestamp('') - return total_tests, failed_tests, crashed_tests - -def parse_run_tests(kernel_output) -> TestResult: - total_tests = 0 - failed_tests = 0 - crashed_tests = 0 - test_result = parse_test_result(list(isolate_kunit_output(kernel_output))) - if test_result.status == TestStatus.NO_TESTS: - print(red('[ERROR] ') + yellow('no tests run!')) - elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS: - print(red('[ERROR] ') + yellow('could not parse test results!')) + parse_test_result(lines, test, expected_num) + + # Check for there being no subtests within parent test + if parent_test and len(subtests) == 0: + # Don't override a bad status if this test had one reported. + # Assumption: no subtests means CRASHED is from Test.__init__() + if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): + print_log(test.log) + test.status = TestStatus.NO_TESTS + test.add_error('0 tests run!') + + # Add statuses to TestCounts attribute in Test object + bubble_up_test_results(test) + if parent_test and is_subtest: + # If test has subtests and is not the main test object, print + # footer. + print_test_footer(test) + elif is_subtest: + print_test_result(test) + return test + +def parse_run_tests(kernel_output: Iterable[str]) -> Test: + """ + Using kernel output, extract KTAP lines, parse the lines for test + results and print condensed test results and summary line. + + Parameters: + kernel_output - Iterable object contains lines of kernel output + + Return: + Test - the main test object with all subtests. + """ + stdout.print_with_timestamp(DIVIDER) + lines = extract_tap_lines(kernel_output) + test = Test() + if not lines: + test.name = '<missing>' + test.add_error('Could not find any KTAP output. Did any KUnit tests run?') + test.status = TestStatus.FAILURE_TO_PARSE_TESTS else: - (total_tests, - failed_tests, - crashed_tests) = print_and_count_results(test_result) - print_with_timestamp(DIVIDER) - fmt = green if test_result.status == TestStatus.SUCCESS else red - print_with_timestamp( - fmt('Testing complete. %d tests run. %d failed. %d crashed.' % - (total_tests, failed_tests, crashed_tests))) - return test_result + test = parse_test(lines, 0, [], False) + if test.status != TestStatus.NO_TESTS: + test.status = test.counts.get_status() + stdout.print_with_timestamp(DIVIDER) + print_summary_line(test) + return test |