aboutsummaryrefslogtreecommitdiffstats
path: root/tools/testing/kunit/kunit_parser.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tools/testing/kunit/kunit_parser.py244
1 files changed, 91 insertions, 153 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py
index 3355196d0515..1ae873e3e341 100644
--- a/tools/testing/kunit/kunit_parser.py
+++ b/tools/testing/kunit/kunit_parser.py
@@ -11,16 +11,14 @@
from __future__ import annotations
import re
+import sys
-from collections import namedtuple
-from datetime import datetime
from enum import Enum, auto
-from functools import reduce
from typing import Iterable, Iterator, List, Optional, Tuple
-TestResult = namedtuple('TestResult', ['status','test','log'])
+from kunit_printer import stdout
-class Test(object):
+class Test:
"""
A class to represent a test parsed from KTAP results. All KTAP
results within a test log are stored in a main Test object as
@@ -48,10 +46,8 @@ class Test(object):
def __str__(self) -> str:
"""Returns string representation of a Test class object."""
- return ('Test(' + str(self.status) + ', ' + self.name +
- ', ' + str(self.expected_count) + ', ' +
- str(self.subtests) + ', ' + str(self.log) + ', ' +
- str(self.counts) + ')')
+ return (f'Test({self.status}, {self.name}, {self.expected_count}, '
+ f'{self.subtests}, {self.log}, {self.counts})')
def __repr__(self) -> str:
"""Returns string representation of a Test class object."""
@@ -60,7 +56,7 @@ class Test(object):
def add_error(self, error_message: str) -> None:
"""Records an error that occurred while parsing this test."""
self.counts.errors += 1
- print_error('Test ' + self.name + ': ' + error_message)
+ stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
class TestStatus(Enum):
"""An enumeration class to represent the status of a test."""
@@ -94,13 +90,12 @@ class TestCounts:
self.errors = 0
def __str__(self) -> str:
- """Returns the string representation of a TestCounts object.
- """
- return ('Passed: ' + str(self.passed) +
- ', Failed: ' + str(self.failed) +
- ', Crashed: ' + str(self.crashed) +
- ', Skipped: ' + str(self.skipped) +
- ', Errors: ' + str(self.errors))
+ """Returns the string representation of a TestCounts object."""
+ statuses = [('passed', self.passed), ('failed', self.failed),
+ ('crashed', self.crashed), ('skipped', self.skipped),
+ ('errors', self.errors)]
+ return f'Ran {self.total()} tests: ' + \
+ ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
def total(self) -> int:
"""Returns the total number of test cases within a test
@@ -131,31 +126,19 @@ class TestCounts:
"""
if self.total() == 0:
return TestStatus.NO_TESTS
- elif self.crashed:
- # If one of the subtests crash, the expected status
- # of the Test is crashed.
+ if self.crashed:
+ # Crashes should take priority.
return TestStatus.TEST_CRASHED
- elif self.failed:
- # Otherwise if one of the subtests fail, the
- # expected status of the Test is failed.
+ if self.failed:
return TestStatus.FAILURE
- elif self.passed:
- # Otherwise if one of the subtests pass, the
- # expected status of the Test is passed.
+ if self.passed:
+ # No failures or crashes, looks good!
return TestStatus.SUCCESS
- else:
- # Finally, if none of the subtests have failed,
- # crashed, or passed, the expected status of the
- # Test is skipped.
- return TestStatus.SKIPPED
+ # We have only skipped tests.
+ return TestStatus.SKIPPED
def add_status(self, status: TestStatus) -> None:
- """
- Increments count of inputted status.
-
- Parameters:
- status - status to be added to the TestCounts object
- """
+ """Increments the count for `status`."""
if status == TestStatus.SUCCESS:
self.passed += 1
elif status == TestStatus.FAILURE:
@@ -168,42 +151,51 @@ class TestCounts:
class LineStream:
"""
A class to represent the lines of kernel output.
- Provides a peek()/pop() interface over an iterator of
+ Provides a lazy peek()/pop() interface over an iterator of
(line#, text).
"""
_lines: Iterator[Tuple[int, str]]
_next: Tuple[int, str]
+ _need_next: bool
_done: bool
def __init__(self, lines: Iterator[Tuple[int, str]]):
"""Creates a new LineStream that wraps the given iterator."""
self._lines = lines
self._done = False
+ self._need_next = True
self._next = (0, '')
- self._get_next()
def _get_next(self) -> None:
- """Advances the LineSteam to the next line."""
+ """Advances the LineSteam to the next line, if necessary."""
+ if not self._need_next:
+ return
try:
self._next = next(self._lines)
except StopIteration:
self._done = True
+ finally:
+ self._need_next = False
def peek(self) -> str:
"""Returns the current line, without advancing the LineStream.
"""
+ self._get_next()
return self._next[1]
def pop(self) -> str:
"""Returns the current line and advances the LineStream to
the next line.
"""
- n = self._next
- self._get_next()
- return n[1]
+ s = self.peek()
+ if self._done:
+ raise ValueError(f'LineStream: going past EOF, last line was {s}')
+ self._need_next = True
+ return s
def __bool__(self) -> bool:
"""Returns True if stream has more lines."""
+ self._get_next()
return not self._done
# Only used by kunit_tool_test.py.
@@ -216,6 +208,7 @@ class LineStream:
def line_number(self) -> int:
"""Returns the line number of the current line."""
+ self._get_next()
return self._next[0]
# Parsing helper methods:
@@ -225,7 +218,7 @@ TAP_START = re.compile(r'TAP version ([0-9]+)$')
KTAP_END = re.compile('(List of all partitions:|'
'Kernel panic - not syncing: VFS:|reboot: System halted)')
-def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
+def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream:
"""Extracts KTAP lines from the kernel output."""
def isolate_ktap_output(kernel_output: Iterable[str]) \
-> Iterator[Tuple[int, str]]:
@@ -251,9 +244,11 @@ def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
# stop extracting KTAP lines
break
elif started:
- # remove prefix and any indention and yield
- # line with line number
- line = line[prefix_len:].lstrip()
+ # remove the prefix and optionally any leading
+ # whitespace. Our parsing logic relies on this.
+ line = line[prefix_len:]
+ if lstrip:
+ line = line.lstrip()
yield line_num, line
return LineStream(lines=isolate_ktap_output(kernel_output))
@@ -275,11 +270,9 @@ def check_version(version_num: int, accepted_versions: List[int],
test - Test object for current test being parsed
"""
if version_num < min(accepted_versions):
- test.add_error(version_type +
- ' version lower than expected!')
+ test.add_error(f'{version_type} version lower than expected!')
elif version_num > max(accepted_versions):
- test.add_error(
- version_type + ' version higher than expected!')
+ test.add_error(f'{version_type} version higer than expected!')
def parse_ktap_header(lines: LineStream, test: Test) -> bool:
"""
@@ -340,8 +333,8 @@ def parse_test_plan(lines: LineStream, test: Test) -> bool:
"""
Parses test plan line and stores the expected number of subtests in
test object. Reports an error if expected count is 0.
- Returns False and reports missing test plan error if fails to parse
- test plan.
+ Returns False and sets expected_count to None if there is no valid test
+ plan.
Accepted format:
- '1..[number of subtests]'
@@ -356,14 +349,10 @@ def parse_test_plan(lines: LineStream, test: Test) -> bool:
match = TEST_PLAN.match(lines.peek())
if not match:
test.expected_count = None
- test.add_error('missing plan line!')
return False
test.log.append(lines.pop())
expected_count = int(match.group(1))
test.expected_count = expected_count
- if expected_count == 0:
- test.status = TestStatus.NO_TESTS
- test.add_error('0 tests run!')
return True
TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
@@ -393,7 +382,7 @@ def peek_test_name_match(lines: LineStream, test: Test) -> bool:
if not match:
return False
name = match.group(4)
- return (name == test.name)
+ return name == test.name
def parse_test_result(lines: LineStream, test: Test,
expected_num: int) -> bool:
@@ -436,8 +425,7 @@ def parse_test_result(lines: LineStream, test: Test,
# Check test num
num = int(match.group(2))
if num != expected_num:
- test.add_error('Expected test number ' +
- str(expected_num) + ' but found ' + str(num))
+ test.add_error(f'Expected test number {expected_num} but found {num}')
# Set status of test object
status = match.group(1)
@@ -471,51 +459,11 @@ def parse_diagnostic(lines: LineStream) -> List[str]:
log.append(lines.pop())
return log
-DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^# .*?: kunit test case crashed!$')
-
-def parse_crash_in_log(test: Test) -> bool:
- """
- Iterate through the lines of the log to parse for crash message.
- If crash message found, set status to crashed and return True.
- Otherwise return False.
-
- Parameters:
- test - Test object for current test being parsed
-
- Return:
- True if crash message found in log
- """
- for line in test.log:
- if DIAGNOSTIC_CRASH_MESSAGE.match(line):
- test.status = TestStatus.TEST_CRASHED
- return True
- return False
-
# Printing helper methods:
DIVIDER = '=' * 60
-RESET = '\033[0;0m'
-
-def red(text: str) -> str:
- """Returns inputted string with red color code."""
- return '\033[1;31m' + text + RESET
-
-def yellow(text: str) -> str:
- """Returns inputted string with yellow color code."""
- return '\033[1;33m' + text + RESET
-
-def green(text: str) -> str:
- """Returns inputted string with green color code."""
- return '\033[1;32m' + text + RESET
-
-ANSI_LEN = len(red(''))
-
-def print_with_timestamp(message: str) -> None:
- """Prints message with timestamp at beginning."""
- print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
-
def format_test_divider(message: str, len_message: int) -> str:
"""
Returns string with message centered in fixed width divider.
@@ -539,7 +487,7 @@ def format_test_divider(message: str, len_message: int) -> str:
# calculate number of dashes for each side of the divider
len_1 = int(difference / 2)
len_2 = difference - len_1
- return ('=' * len_1) + ' ' + message + ' ' + ('=' * len_2)
+ return ('=' * len_1) + f' {message} ' + ('=' * len_2)
def print_test_header(test: Test) -> None:
"""
@@ -555,22 +503,15 @@ def print_test_header(test: Test) -> None:
message = test.name
if test.expected_count:
if test.expected_count == 1:
- message += (' (' + str(test.expected_count) +
- ' subtest)')
+ message += ' (1 subtest)'
else:
- message += (' (' + str(test.expected_count) +
- ' subtests)')
- print_with_timestamp(format_test_divider(message, len(message)))
+ message += f' ({test.expected_count} subtests)'
+ stdout.print_with_timestamp(format_test_divider(message, len(message)))
def print_log(log: Iterable[str]) -> None:
- """
- Prints all strings in saved log for test in yellow.
-
- Parameters:
- log - Iterable object with all strings saved in log for test
- """
+ """Prints all strings in saved log for test in yellow."""
for m in log:
- print_with_timestamp(yellow(m))
+ stdout.print_with_timestamp(stdout.yellow(m))
def format_test_result(test: Test) -> str:
"""
@@ -587,15 +528,16 @@ def format_test_result(test: Test) -> str:
String containing formatted test result
"""
if test.status == TestStatus.SUCCESS:
- return (green('[PASSED] ') + test.name)
- elif test.status == TestStatus.SKIPPED:
- return (yellow('[SKIPPED] ') + test.name)
- elif test.status == TestStatus.TEST_CRASHED:
+ return stdout.green('[PASSED] ') + test.name
+ if test.status == TestStatus.SKIPPED:
+ return stdout.yellow('[SKIPPED] ') + test.name
+ if test.status == TestStatus.NO_TESTS:
+ return stdout.yellow('[NO TESTS RUN] ') + test.name
+ if test.status == TestStatus.TEST_CRASHED:
print_log(test.log)
- return (red('[CRASHED] ') + test.name)
- else:
- print_log(test.log)
- return (red('[FAILED] ') + test.name)
+ return stdout.red('[CRASHED] ') + test.name
+ print_log(test.log)
+ return stdout.red('[FAILED] ') + test.name
def print_test_result(test: Test) -> None:
"""
@@ -607,7 +549,7 @@ def print_test_result(test: Test) -> None:
Parameters:
test - Test object representing current test being printed
"""
- print_with_timestamp(format_test_result(test))
+ stdout.print_with_timestamp(format_test_result(test))
def print_test_footer(test: Test) -> None:
"""
@@ -620,8 +562,8 @@ def print_test_footer(test: Test) -> None:
test - Test object representing current test being printed
"""
message = format_test_result(test)
- print_with_timestamp(format_test_divider(message,
- len(message) - ANSI_LEN))
+ stdout.print_with_timestamp(format_test_divider(message,
+ len(message) - stdout.color_len()))
def print_summary_line(test: Test) -> None:
"""
@@ -638,25 +580,12 @@ def print_summary_line(test: Test) -> None:
test - Test object representing current test being printed
"""
if test.status == TestStatus.SUCCESS:
- color = green
- elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS:
- color = yellow
+ color = stdout.green
+ elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
+ color = stdout.yellow
else:
- color = red
- counts = test.counts
- print_with_timestamp(color('Testing complete. ' + str(counts)))
-
-def print_error(error_message: str) -> None:
- """
- Prints error message with error format.
-
- Example:
- "[ERROR] Test example: missing test plan!"
-
- Parameters:
- error_message - message describing error
- """
- print_with_timestamp(red('[ERROR] ') + error_message)
+ color = stdout.red
+ stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
# Other methods:
@@ -670,7 +599,6 @@ def bubble_up_test_results(test: Test) -> None:
Parameters:
test - Test object for current test being parsed
"""
- parse_crash_in_log(test)
subtests = test.subtests
counts = test.counts
status = test.status
@@ -732,6 +660,7 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
# test plan
test.name = "main"
parse_test_plan(lines, test)
+ parent_test = True
else:
# If KTAP/TAP header is not found, test must be subtest
# header or test result line so parse attempt to parser
@@ -745,7 +674,7 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
expected_count = test.expected_count
subtests = []
test_num = 1
- while expected_count is None or test_num <= expected_count:
+ while parent_test and (expected_count is None or test_num <= expected_count):
# Loop to parse any subtests.
# Break after parsing expected number of tests or
# if expected number of tests is unknown break when test
@@ -780,9 +709,18 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
parse_test_result(lines, test, expected_num)
else:
test.add_error('missing subtest result line!')
+
+ # Check for there being no tests
+ if parent_test and len(subtests) == 0:
+ # Don't override a bad status if this test had one reported.
+ # Assumption: no subtests means CRASHED is from Test.__init__()
+ if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
+ test.status = TestStatus.NO_TESTS
+ test.add_error('0 tests run!')
+
# Add statuses to TestCounts attribute in Test object
bubble_up_test_results(test)
- if parent_test:
+ if parent_test and not main:
# If test has subtests and is not the main test object, print
# footer.
print_test_footer(test)
@@ -790,28 +728,28 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
print_test_result(test)
return test
-def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
+def parse_run_tests(kernel_output: Iterable[str]) -> Test:
"""
Using kernel output, extract KTAP lines, parse the lines for test
- results and print condensed test results and summary line .
+ results and print condensed test results and summary line.
Parameters:
kernel_output - Iterable object contains lines of kernel output
Return:
- TestResult - Tuple containg status of main test object, main test
- object with all subtests, and log of all KTAP lines.
+ Test - the main test object with all subtests.
"""
- print_with_timestamp(DIVIDER)
+ stdout.print_with_timestamp(DIVIDER)
lines = extract_tap_lines(kernel_output)
test = Test()
if not lines:
- test.add_error('invalid KTAP input!')
+ test.name = '<missing>'
+ test.add_error('could not find any KTAP output!')
test.status = TestStatus.FAILURE_TO_PARSE_TESTS
else:
test = parse_test(lines, 0, [])
if test.status != TestStatus.NO_TESTS:
test.status = test.counts.get_status()
- print_with_timestamp(DIVIDER)
+ stdout.print_with_timestamp(DIVIDER)
print_summary_line(test)
- return TestResult(test.status, test, lines)
+ return test