diff options
Diffstat (limited to '')
-rw-r--r-- | tools/testing/kunit/kunit_parser.py | 244 |
1 files changed, 91 insertions, 153 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py index 3355196d0515..1ae873e3e341 100644 --- a/tools/testing/kunit/kunit_parser.py +++ b/tools/testing/kunit/kunit_parser.py @@ -11,16 +11,14 @@ from __future__ import annotations import re +import sys -from collections import namedtuple -from datetime import datetime from enum import Enum, auto -from functools import reduce from typing import Iterable, Iterator, List, Optional, Tuple -TestResult = namedtuple('TestResult', ['status','test','log']) +from kunit_printer import stdout -class Test(object): +class Test: """ A class to represent a test parsed from KTAP results. All KTAP results within a test log are stored in a main Test object as @@ -48,10 +46,8 @@ class Test(object): def __str__(self) -> str: """Returns string representation of a Test class object.""" - return ('Test(' + str(self.status) + ', ' + self.name + - ', ' + str(self.expected_count) + ', ' + - str(self.subtests) + ', ' + str(self.log) + ', ' + - str(self.counts) + ')') + return (f'Test({self.status}, {self.name}, {self.expected_count}, ' + f'{self.subtests}, {self.log}, {self.counts})') def __repr__(self) -> str: """Returns string representation of a Test class object.""" @@ -60,7 +56,7 @@ class Test(object): def add_error(self, error_message: str) -> None: """Records an error that occurred while parsing this test.""" self.counts.errors += 1 - print_error('Test ' + self.name + ': ' + error_message) + stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') class TestStatus(Enum): """An enumeration class to represent the status of a test.""" @@ -94,13 +90,12 @@ class TestCounts: self.errors = 0 def __str__(self) -> str: - """Returns the string representation of a TestCounts object. - """ - return ('Passed: ' + str(self.passed) + - ', Failed: ' + str(self.failed) + - ', Crashed: ' + str(self.crashed) + - ', Skipped: ' + str(self.skipped) + - ', Errors: ' + str(self.errors)) + """Returns the string representation of a TestCounts object.""" + statuses = [('passed', self.passed), ('failed', self.failed), + ('crashed', self.crashed), ('skipped', self.skipped), + ('errors', self.errors)] + return f'Ran {self.total()} tests: ' + \ + ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) def total(self) -> int: """Returns the total number of test cases within a test @@ -131,31 +126,19 @@ class TestCounts: """ if self.total() == 0: return TestStatus.NO_TESTS - elif self.crashed: - # If one of the subtests crash, the expected status - # of the Test is crashed. + if self.crashed: + # Crashes should take priority. return TestStatus.TEST_CRASHED - elif self.failed: - # Otherwise if one of the subtests fail, the - # expected status of the Test is failed. + if self.failed: return TestStatus.FAILURE - elif self.passed: - # Otherwise if one of the subtests pass, the - # expected status of the Test is passed. + if self.passed: + # No failures or crashes, looks good! return TestStatus.SUCCESS - else: - # Finally, if none of the subtests have failed, - # crashed, or passed, the expected status of the - # Test is skipped. - return TestStatus.SKIPPED + # We have only skipped tests. + return TestStatus.SKIPPED def add_status(self, status: TestStatus) -> None: - """ - Increments count of inputted status. - - Parameters: - status - status to be added to the TestCounts object - """ + """Increments the count for `status`.""" if status == TestStatus.SUCCESS: self.passed += 1 elif status == TestStatus.FAILURE: @@ -168,42 +151,51 @@ class TestCounts: class LineStream: """ A class to represent the lines of kernel output. - Provides a peek()/pop() interface over an iterator of + Provides a lazy peek()/pop() interface over an iterator of (line#, text). """ _lines: Iterator[Tuple[int, str]] _next: Tuple[int, str] + _need_next: bool _done: bool def __init__(self, lines: Iterator[Tuple[int, str]]): """Creates a new LineStream that wraps the given iterator.""" self._lines = lines self._done = False + self._need_next = True self._next = (0, '') - self._get_next() def _get_next(self) -> None: - """Advances the LineSteam to the next line.""" + """Advances the LineSteam to the next line, if necessary.""" + if not self._need_next: + return try: self._next = next(self._lines) except StopIteration: self._done = True + finally: + self._need_next = False def peek(self) -> str: """Returns the current line, without advancing the LineStream. """ + self._get_next() return self._next[1] def pop(self) -> str: """Returns the current line and advances the LineStream to the next line. """ - n = self._next - self._get_next() - return n[1] + s = self.peek() + if self._done: + raise ValueError(f'LineStream: going past EOF, last line was {s}') + self._need_next = True + return s def __bool__(self) -> bool: """Returns True if stream has more lines.""" + self._get_next() return not self._done # Only used by kunit_tool_test.py. @@ -216,6 +208,7 @@ class LineStream: def line_number(self) -> int: """Returns the line number of the current line.""" + self._get_next() return self._next[0] # Parsing helper methods: @@ -225,7 +218,7 @@ TAP_START = re.compile(r'TAP version ([0-9]+)$') KTAP_END = re.compile('(List of all partitions:|' 'Kernel panic - not syncing: VFS:|reboot: System halted)') -def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: +def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream: """Extracts KTAP lines from the kernel output.""" def isolate_ktap_output(kernel_output: Iterable[str]) \ -> Iterator[Tuple[int, str]]: @@ -251,9 +244,11 @@ def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: # stop extracting KTAP lines break elif started: - # remove prefix and any indention and yield - # line with line number - line = line[prefix_len:].lstrip() + # remove the prefix and optionally any leading + # whitespace. Our parsing logic relies on this. + line = line[prefix_len:] + if lstrip: + line = line.lstrip() yield line_num, line return LineStream(lines=isolate_ktap_output(kernel_output)) @@ -275,11 +270,9 @@ def check_version(version_num: int, accepted_versions: List[int], test - Test object for current test being parsed """ if version_num < min(accepted_versions): - test.add_error(version_type + - ' version lower than expected!') + test.add_error(f'{version_type} version lower than expected!') elif version_num > max(accepted_versions): - test.add_error( - version_type + ' version higher than expected!') + test.add_error(f'{version_type} version higer than expected!') def parse_ktap_header(lines: LineStream, test: Test) -> bool: """ @@ -340,8 +333,8 @@ def parse_test_plan(lines: LineStream, test: Test) -> bool: """ Parses test plan line and stores the expected number of subtests in test object. Reports an error if expected count is 0. - Returns False and reports missing test plan error if fails to parse - test plan. + Returns False and sets expected_count to None if there is no valid test + plan. Accepted format: - '1..[number of subtests]' @@ -356,14 +349,10 @@ def parse_test_plan(lines: LineStream, test: Test) -> bool: match = TEST_PLAN.match(lines.peek()) if not match: test.expected_count = None - test.add_error('missing plan line!') return False test.log.append(lines.pop()) expected_count = int(match.group(1)) test.expected_count = expected_count - if expected_count == 0: - test.status = TestStatus.NO_TESTS - test.add_error('0 tests run!') return True TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') @@ -393,7 +382,7 @@ def peek_test_name_match(lines: LineStream, test: Test) -> bool: if not match: return False name = match.group(4) - return (name == test.name) + return name == test.name def parse_test_result(lines: LineStream, test: Test, expected_num: int) -> bool: @@ -436,8 +425,7 @@ def parse_test_result(lines: LineStream, test: Test, # Check test num num = int(match.group(2)) if num != expected_num: - test.add_error('Expected test number ' + - str(expected_num) + ' but found ' + str(num)) + test.add_error(f'Expected test number {expected_num} but found {num}') # Set status of test object status = match.group(1) @@ -471,51 +459,11 @@ def parse_diagnostic(lines: LineStream) -> List[str]: log.append(lines.pop()) return log -DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^# .*?: kunit test case crashed!$') - -def parse_crash_in_log(test: Test) -> bool: - """ - Iterate through the lines of the log to parse for crash message. - If crash message found, set status to crashed and return True. - Otherwise return False. - - Parameters: - test - Test object for current test being parsed - - Return: - True if crash message found in log - """ - for line in test.log: - if DIAGNOSTIC_CRASH_MESSAGE.match(line): - test.status = TestStatus.TEST_CRASHED - return True - return False - # Printing helper methods: DIVIDER = '=' * 60 -RESET = '\033[0;0m' - -def red(text: str) -> str: - """Returns inputted string with red color code.""" - return '\033[1;31m' + text + RESET - -def yellow(text: str) -> str: - """Returns inputted string with yellow color code.""" - return '\033[1;33m' + text + RESET - -def green(text: str) -> str: - """Returns inputted string with green color code.""" - return '\033[1;32m' + text + RESET - -ANSI_LEN = len(red('')) - -def print_with_timestamp(message: str) -> None: - """Prints message with timestamp at beginning.""" - print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) - def format_test_divider(message: str, len_message: int) -> str: """ Returns string with message centered in fixed width divider. @@ -539,7 +487,7 @@ def format_test_divider(message: str, len_message: int) -> str: # calculate number of dashes for each side of the divider len_1 = int(difference / 2) len_2 = difference - len_1 - return ('=' * len_1) + ' ' + message + ' ' + ('=' * len_2) + return ('=' * len_1) + f' {message} ' + ('=' * len_2) def print_test_header(test: Test) -> None: """ @@ -555,22 +503,15 @@ def print_test_header(test: Test) -> None: message = test.name if test.expected_count: if test.expected_count == 1: - message += (' (' + str(test.expected_count) + - ' subtest)') + message += ' (1 subtest)' else: - message += (' (' + str(test.expected_count) + - ' subtests)') - print_with_timestamp(format_test_divider(message, len(message))) + message += f' ({test.expected_count} subtests)' + stdout.print_with_timestamp(format_test_divider(message, len(message))) def print_log(log: Iterable[str]) -> None: - """ - Prints all strings in saved log for test in yellow. - - Parameters: - log - Iterable object with all strings saved in log for test - """ + """Prints all strings in saved log for test in yellow.""" for m in log: - print_with_timestamp(yellow(m)) + stdout.print_with_timestamp(stdout.yellow(m)) def format_test_result(test: Test) -> str: """ @@ -587,15 +528,16 @@ def format_test_result(test: Test) -> str: String containing formatted test result """ if test.status == TestStatus.SUCCESS: - return (green('[PASSED] ') + test.name) - elif test.status == TestStatus.SKIPPED: - return (yellow('[SKIPPED] ') + test.name) - elif test.status == TestStatus.TEST_CRASHED: + return stdout.green('[PASSED] ') + test.name + if test.status == TestStatus.SKIPPED: + return stdout.yellow('[SKIPPED] ') + test.name + if test.status == TestStatus.NO_TESTS: + return stdout.yellow('[NO TESTS RUN] ') + test.name + if test.status == TestStatus.TEST_CRASHED: print_log(test.log) - return (red('[CRASHED] ') + test.name) - else: - print_log(test.log) - return (red('[FAILED] ') + test.name) + return stdout.red('[CRASHED] ') + test.name + print_log(test.log) + return stdout.red('[FAILED] ') + test.name def print_test_result(test: Test) -> None: """ @@ -607,7 +549,7 @@ def print_test_result(test: Test) -> None: Parameters: test - Test object representing current test being printed """ - print_with_timestamp(format_test_result(test)) + stdout.print_with_timestamp(format_test_result(test)) def print_test_footer(test: Test) -> None: """ @@ -620,8 +562,8 @@ def print_test_footer(test: Test) -> None: test - Test object representing current test being printed """ message = format_test_result(test) - print_with_timestamp(format_test_divider(message, - len(message) - ANSI_LEN)) + stdout.print_with_timestamp(format_test_divider(message, + len(message) - stdout.color_len())) def print_summary_line(test: Test) -> None: """ @@ -638,25 +580,12 @@ def print_summary_line(test: Test) -> None: test - Test object representing current test being printed """ if test.status == TestStatus.SUCCESS: - color = green - elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS: - color = yellow + color = stdout.green + elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): + color = stdout.yellow else: - color = red - counts = test.counts - print_with_timestamp(color('Testing complete. ' + str(counts))) - -def print_error(error_message: str) -> None: - """ - Prints error message with error format. - - Example: - "[ERROR] Test example: missing test plan!" - - Parameters: - error_message - message describing error - """ - print_with_timestamp(red('[ERROR] ') + error_message) + color = stdout.red + stdout.print_with_timestamp(color(f'Testing complete. {test.counts}')) # Other methods: @@ -670,7 +599,6 @@ def bubble_up_test_results(test: Test) -> None: Parameters: test - Test object for current test being parsed """ - parse_crash_in_log(test) subtests = test.subtests counts = test.counts status = test.status @@ -732,6 +660,7 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: # test plan test.name = "main" parse_test_plan(lines, test) + parent_test = True else: # If KTAP/TAP header is not found, test must be subtest # header or test result line so parse attempt to parser @@ -745,7 +674,7 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: expected_count = test.expected_count subtests = [] test_num = 1 - while expected_count is None or test_num <= expected_count: + while parent_test and (expected_count is None or test_num <= expected_count): # Loop to parse any subtests. # Break after parsing expected number of tests or # if expected number of tests is unknown break when test @@ -780,9 +709,18 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: parse_test_result(lines, test, expected_num) else: test.add_error('missing subtest result line!') + + # Check for there being no tests + if parent_test and len(subtests) == 0: + # Don't override a bad status if this test had one reported. + # Assumption: no subtests means CRASHED is from Test.__init__() + if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): + test.status = TestStatus.NO_TESTS + test.add_error('0 tests run!') + # Add statuses to TestCounts attribute in Test object bubble_up_test_results(test) - if parent_test: + if parent_test and not main: # If test has subtests and is not the main test object, print # footer. print_test_footer(test) @@ -790,28 +728,28 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: print_test_result(test) return test -def parse_run_tests(kernel_output: Iterable[str]) -> TestResult: +def parse_run_tests(kernel_output: Iterable[str]) -> Test: """ Using kernel output, extract KTAP lines, parse the lines for test - results and print condensed test results and summary line . + results and print condensed test results and summary line. Parameters: kernel_output - Iterable object contains lines of kernel output Return: - TestResult - Tuple containg status of main test object, main test - object with all subtests, and log of all KTAP lines. + Test - the main test object with all subtests. """ - print_with_timestamp(DIVIDER) + stdout.print_with_timestamp(DIVIDER) lines = extract_tap_lines(kernel_output) test = Test() if not lines: - test.add_error('invalid KTAP input!') + test.name = '<missing>' + test.add_error('could not find any KTAP output!') test.status = TestStatus.FAILURE_TO_PARSE_TESTS else: test = parse_test(lines, 0, []) if test.status != TestStatus.NO_TESTS: test.status = test.counts.get_status() - print_with_timestamp(DIVIDER) + stdout.print_with_timestamp(DIVIDER) print_summary_line(test) - return TestResult(test.status, test, lines) + return test |