#!/usr/bin/env python3 # SPDX-License-Identifier: GPL-2.0 # # Program to allow users to fuzz test Hyper-V drivers # by interfacing with Hyper-V debugfs attributes. # Current test methods available: # 1. delay testing # # Current file/directory structure of hyper-V debugfs: # /sys/kernel/debug/hyperv/UUID # /sys/kernel/debug/hyperv/UUID/ # /sys/kernel/debug/hyperv/UUID/ # # author: Branden Bonaby import os import cmd import argparse import glob from argparse import RawDescriptionHelpFormatter from argparse import RawTextHelpFormatter from enum import Enum # Do not change unless, you change the debugfs attributes # in /drivers/hv/debugfs.c. All fuzz testing # attributes will start with "fuzz_test". # debugfs path for hyperv must exist before proceeding debugfs_hyperv_path = "/sys/kernel/debug/hyperv" if not os.path.isdir(debugfs_hyperv_path): print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) exit(-1) class dev_state(Enum): off = 0 on = 1 # File names, that correspond to the files created in # /drivers/hv/debugfs.c class f_names(Enum): state_f = "fuzz_test_state" buff_f = "fuzz_test_buffer_interrupt_delay" mess_f = "fuzz_test_message_delay" # Both single_actions and all_actions are used # for error checking and to allow for some subparser # names to be abbreviated. Do not abbreviate the # test method names, as it will become less intuitive # as to what the user can do. If you do decide to # abbreviate the test method name, make sure the main # function reflects this change. all_actions = [ "disable_all", "D", "enable_all", "view_all", "V" ] single_actions = [ "disable_single", "d", "enable_single", "view_single", "v" ] def main(): file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) args = parse_args() if (not args.action): print ("Error, no options selected...exiting") exit(-1) arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } arg_set.add(args.action) path = args.path if "path" in arg_set else None if (path and path[-1] == "/"): path = path[:-1] validate_args_path(path, arg_set, file_map) if (path and "enable_single" in arg_set): state_path = locate_state(path, file_map) set_test_state(state_path, dev_state.on.value, args.quiet) # Use subparsers as the key for different actions if ("delay" in arg_set): validate_delay_values(args.delay_time) if (args.enable_all): set_delay_all_devices(file_map, args.delay_time, args.quiet) else: set_delay_values(path, file_map, args.delay_time, args.quiet) elif ("disable_all" in arg_set or "D" in arg_set): disable_all_testing(file_map) elif ("disable_single" in arg_set or "d" in arg_set): disable_testing_single_device(path, file_map) elif ("view_all" in arg_set or "V" in arg_set): get_all_devices_test_status(file_map) elif ("view_single" in arg_set or "v" in arg_set): get_device_test_values(path, file_map) # Get the state location def locate_state(device, file_map): return file_map[device][f_names.state_f.value] # Validate delay values to make sure they are acceptable to # enable delays on a device def validate_delay_values(delay): if (delay[0] == -1 and delay[1] == -1): print("\nError, At least 1 value must be greater than 0") exit(-1) for i in delay: if (i < -1 or i == 0 or i > 1000): print("\nError, Values must be equal to -1 " "or be > 0 and <= 1000") exit(-1) # Validate argument path def validate_args_path(path, arg_set, file_map): if (not path and any(element in arg_set for element in single_actions)): print("Error, path (-p) REQUIRED for the specified option. " "Use (-h) to check usage.") exit(-1) elif (path and any(item in arg_set for item in all_actions)): print("Error, path (-p) NOT REQUIRED for the specified option. " "Use (-h) to check usage." ) exit(-1) elif (path not in file_map and any(item in arg_set for item in single_actions)): print("Error, path '{}' not a valid vmbus device".format(path)) exit(-1) # display Testing status of single device def get_device_test_values(path, file_map): for name in file_map[path]: file_location = file_map[path][name] print( name + " = " + str(read_test_files(file_location))) # Create a map of the vmbus devices and their associated files # [key=device, value = [key = filename, value = file path]] def recursive_file_lookup(path, file_map): for f_path in glob.iglob(path + '**/*'): if (os.path.isfile(f_path)): if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): directory = f_path.rsplit("/",1)[0] else: directory = f_path.rsplit("/",2)[0] f_name = f_path.split("/")[-1] if (file_map.get(directory)): file_map[directory].update({f_name:f_path}) else: file_map[directory] = {f_name:f_path} elif (os.path.isdir(f_path)): recursive_file_lookup(f_path,file_map) return file_map # display Testing state of devices def get_all_devices_test_status(file_map): for device in file_map: if (get_test_state(locate_state(device, file_map)) is 1): print("Testing = ON for: {}" .format(device.split("/")[5])) else: print("Testing = OFF for: {}" .format(device.split("/")[5])) # read the vmbus device files, path must be absolute path before calling def read_test_files(path): try: with open(path,"r") as f: file_value = f.readline().strip() return int(file_value) except IOError as e: errno, strerror = e.args print("I/O error({0}): {1} on file {2}" .format(errno, strerror, path)) exit(-1) except ValueError: print ("Element to int conversion error in: \n{}".format(path)) exit(-1) # writing to vmbus device files, path must be absolute path before calling def write_test_files(path, value): try: with open(path,"w") as f: f.write("{}".format(value)) except IOError as e: errno, strerror = e.args print("I/O error({0}): {1} on file {2}" .format(errno, strerror, path)) exit(-1) # set testing state of device def set_test_state(state_path, state_value, quiet): write_test_files(state_path, state_value) if (get_test_state(state_path) is 1): if (not quiet): print("Testing = ON for device: {}" .format(state_path.split("/")[5])) else: if (not quiet): print("Testing = OFF for device: {}" .format(state_path.split("/")[5])) # get testing state of device def get_test_state(state_path): #state == 1 - test = ON #state == 0 - test = OFF return read_test_files(state_path) # write 1 - 1000 microseconds, into a single device using the # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay # debugfs attributes def set_delay_values(device, file_map, delay_length, quiet): try: interrupt = file_map[device][f_names.buff_f.value] message = file_map[device][f_names.mess_f.value] # delay[0]- buffer interrupt delay, delay[1]- message delay if (delay_length[0] >= 0 and delay_length[0] <= 1000): write_test_files(interrupt, delay_length[0]) if (delay_length[1] >= 0 and delay_length[1] <= 1000): write_test_files(message, delay_length[1]) if (not quiet): print("Buffer delay testing = {} for: {}" .format(read_test_files(interrupt), interrupt.split("/")[5])) print("Message delay testing = {} for: {}" .format(read_test_files(message), message.split("/")[5])) except IOError as e: errno, strerror = e.args print("I/O error({0}): {1} on files {2}{3}" .format(errno, strerror, interrupt, message)) exit(-1) # enabling delay testing on all devices def set_delay_all_devices(file_map, delay, quiet): for device in (file_map): set_test_state(locate_state(device, file_map), dev_state.on.value, quiet) set_delay_values(device, file_map, delay, quiet) # disable all testing on a SINGLE device. def disable_testing_single_device(device, file_map): for name in file_map[device]: file_location = file_map[device][name] write_test_files(file_location, dev_state.off.value) print("ALL testing now OFF for {}".format(device.split("/")[-1])) # disable all testing on ALL devices def disable_all_testing(file_map): for device in file_map: disable_testing_single_device(device, file_map) def parse_args(): parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n" "%(prog)s [view_all | V] [-h]\n" "%(prog)s [disable_all | D] [-h]\n" "%(prog)s [disable_single | d] [-h|-p]\n" "%(prog)s [view_single | v] [-h|-p]\n" "%(prog)s --version\n", description = "\nUse lsvmbus to get vmbus device type " "information.\n" "\nThe debugfs root path is " "/sys/kernel/debug/hyperv", formatter_class = RawDescriptionHelpFormatter) subparsers = parser.add_subparsers(dest = "action") parser.add_argument("--version", action = "version", version = '%(prog)s 0.1.0') parser.add_argument("-q","--quiet", action = "store_true", help = "silence none important test messages." " This will only work when enabling testing" " on a device.") # Use the path parser to hold the --path attribute so it can # be shared between subparsers. Also do the same for the state # parser, as all testing methods will use --enable_all and # enable_single. path_parser = argparse.ArgumentParser(add_help=False) path_parser.add_argument("-p","--path", metavar = "", help = "Debugfs path to a vmbus device. The path " "must be the absolute path to the device.") state_parser = argparse.ArgumentParser(add_help=False) state_group = state_parser.add_mutually_exclusive_group(required = True) state_group.add_argument("-E", "--enable_all", action = "store_const", const = "enable_all", help = "Enable the specified test type " "on ALL vmbus devices.") state_group.add_argument("-e", "--enable_single", action = "store_const", const = "enable_single", help = "Enable the specified test type on a " "SINGLE vmbus device.") parser_delay = subparsers.add_parser("delay", parents = [state_parser, path_parser], help = "Delay the ring buffer interrupt or the " "ring buffer message reads in microseconds.", prog = "vmbus_testing", usage = "%(prog)s [-h]\n" "%(prog)s -E -t [value] [value]\n" "%(prog)s -e -t [value] [value] -p", description = "Delay the ring buffer interrupt for " "vmbus devices, or delay the ring buffer message " "reads for vmbus devices (both in microseconds). This " "is only on the host to guest channel.") parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, type = check_range, default =[0,0], required = (True), help = "Set [buffer] & [message] delay time. " "Value constraints: -1 == value " "or 0 < value <= 1000.\n" "Use -1 to keep the previous value for that delay " "type, or a value > 0 <= 1000 to change the delay " "time.") parser_dis_all = subparsers.add_parser("disable_all", aliases = ['D'], prog = "vmbus_testing", usage = "%(prog)s [disable_all | D] -h\n" "%(prog)s [disable_all | D]\n", help = "Disable ALL testing on ALL vmbus devices.", description = "Disable ALL testing on ALL vmbus " "devices.") parser_dis_single = subparsers.add_parser("disable_single", aliases = ['d'], parents = [path_parser], prog = "vmbus_testing", usage = "%(prog)s [disable_single | d] -h\n" "%(prog)s [disable_single | d] -p\n", help = "Disable ALL testing on a SINGLE vmbus device.", description = "Disable ALL testing on a SINGLE vmbus " "device.") parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], help = "View the test state for ALL vmbus devices.", prog = "vmbus_testing", usage = "%(prog)s [view_all | V] -h\n" "%(prog)s [view_all | V]\n", description = "This shows the test state for ALL the " "vmbus devices.") parser_view_single = subparsers.add_parser("view_single", aliases = ['v'],parents = [path_parser], help = "View the test values for a SINGLE vmbus " "device.", description = "This shows the test values for a SINGLE " "vmbus device.", prog = "vmbus_testing", usage = "%(prog)s [view_single | v] -h\n" "%(prog)s [view_single | v] -p") return parser.parse_args() # value checking for range checking input in parser def check_range(arg1): try: val = int(arg1) except ValueError as err: raise argparse.ArgumentTypeError(str(err)) if val < -1 or val > 1000: message = ("\n\nvalue must be -1 or 0 < value <= 1000. " "Value program received: {}\n").format(val) raise argparse.ArgumentTypeError(message) return val if __name__ == "__main__": main()