aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/tools/testing/selftests/drivers/net/lib/py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/testing/selftests/drivers/net/lib/py')
-rw-r--r--tools/testing/selftests/drivers/net/lib/py/__init__.py19
-rw-r--r--tools/testing/selftests/drivers/net/lib/py/env.py282
-rw-r--r--tools/testing/selftests/drivers/net/lib/py/load.py58
-rw-r--r--tools/testing/selftests/drivers/net/lib/py/remote.py15
-rw-r--r--tools/testing/selftests/drivers/net/lib/py/remote_netns.py21
-rw-r--r--tools/testing/selftests/drivers/net/lib/py/remote_ssh.py39
6 files changed, 434 insertions, 0 deletions
diff --git a/tools/testing/selftests/drivers/net/lib/py/__init__.py b/tools/testing/selftests/drivers/net/lib/py/__init__.py
new file mode 100644
index 000000000000..401e70f7f136
--- /dev/null
+++ b/tools/testing/selftests/drivers/net/lib/py/__init__.py
@@ -0,0 +1,19 @@
+# SPDX-License-Identifier: GPL-2.0
+
+import sys
+from pathlib import Path
+
+KSFT_DIR = (Path(__file__).parent / "../../../..").resolve()
+
+try:
+ sys.path.append(KSFT_DIR.as_posix())
+ from net.lib.py import *
+except ModuleNotFoundError as e:
+ ksft_pr("Failed importing `net` library from kernel sources")
+ ksft_pr(str(e))
+ ktap_result(True, comment="SKIP")
+ sys.exit(4)
+
+from .env import *
+from .load import *
+from .remote import Remote
diff --git a/tools/testing/selftests/drivers/net/lib/py/env.py b/tools/testing/selftests/drivers/net/lib/py/env.py
new file mode 100644
index 000000000000..3bccddf8cbc5
--- /dev/null
+++ b/tools/testing/selftests/drivers/net/lib/py/env.py
@@ -0,0 +1,282 @@
+# SPDX-License-Identifier: GPL-2.0
+
+import os
+import time
+from pathlib import Path
+from lib.py import KsftSkipEx, KsftXfailEx
+from lib.py import ksft_setup
+from lib.py import cmd, ethtool, ip, CmdExitFailure
+from lib.py import NetNS, NetdevSimDev
+from .remote import Remote
+
+
+class NetDrvEnvBase:
+ """
+ Base class for a NIC / host environments
+
+ Attributes:
+ test_dir: Path to the source directory of the test
+ net_lib_dir: Path to the net/lib directory
+ """
+ def __init__(self, src_path):
+ self.src_path = Path(src_path)
+ self.test_dir = self.src_path.parent.resolve()
+ self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
+
+ self.env = self._load_env_file()
+
+ def _load_env_file(self):
+ env = os.environ.copy()
+
+ src_dir = Path(self.src_path).parent.resolve()
+ if not (src_dir / "net.config").exists():
+ return ksft_setup(env)
+
+ with open((src_dir / "net.config").as_posix(), 'r') as fp:
+ for line in fp.readlines():
+ full_file = line
+ # Strip comments
+ pos = line.find("#")
+ if pos >= 0:
+ line = line[:pos]
+ line = line.strip()
+ if not line:
+ continue
+ pair = line.split('=', maxsplit=1)
+ if len(pair) != 2:
+ raise Exception("Can't parse configuration line:", full_file)
+ env[pair[0]] = pair[1]
+ return ksft_setup(env)
+
+
+class NetDrvEnv(NetDrvEnvBase):
+ """
+ Class for a single NIC / host env, with no remote end
+ """
+ def __init__(self, src_path, nsim_test=None, **kwargs):
+ super().__init__(src_path)
+
+ self._ns = None
+
+ if 'NETIF' in self.env:
+ if nsim_test is True:
+ raise KsftXfailEx("Test only works on netdevsim")
+
+ self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
+ else:
+ if nsim_test is False:
+ raise KsftXfailEx("Test does not work on netdevsim")
+
+ self._ns = NetdevSimDev(**kwargs)
+ self.dev = self._ns.nsims[0].dev
+ self.ifname = self.dev['ifname']
+ self.ifindex = self.dev['ifindex']
+
+ def __enter__(self):
+ ip(f"link set dev {self.dev['ifname']} up")
+
+ return self
+
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ """
+ __exit__ gets called at the end of a "with" block.
+ """
+ self.__del__()
+
+ def __del__(self):
+ if self._ns:
+ self._ns.remove()
+ self._ns = None
+
+
+class NetDrvEpEnv(NetDrvEnvBase):
+ """
+ Class for an environment with a local device and "remote endpoint"
+ which can be used to send traffic in.
+
+ For local testing it creates two network namespaces and a pair
+ of netdevsim devices.
+ """
+
+ # Network prefixes used for local tests
+ nsim_v4_pfx = "192.0.2."
+ nsim_v6_pfx = "2001:db8::"
+
+ def __init__(self, src_path, nsim_test=None):
+ super().__init__(src_path)
+
+ self._stats_settle_time = None
+
+ # Things we try to destroy
+ self.remote = None
+ # These are for local testing state
+ self._netns = None
+ self._ns = None
+ self._ns_peer = None
+
+ self.addr_v = { "4": None, "6": None }
+ self.remote_addr_v = { "4": None, "6": None }
+
+ if "NETIF" in self.env:
+ if nsim_test is True:
+ raise KsftXfailEx("Test only works on netdevsim")
+ self._check_env()
+
+ self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
+
+ self.addr_v["4"] = self.env.get("LOCAL_V4")
+ self.addr_v["6"] = self.env.get("LOCAL_V6")
+ self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
+ self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
+ kind = self.env["REMOTE_TYPE"]
+ args = self.env["REMOTE_ARGS"]
+ else:
+ if nsim_test is False:
+ raise KsftXfailEx("Test does not work on netdevsim")
+
+ self.create_local()
+
+ self.dev = self._ns.nsims[0].dev
+
+ self.addr_v["4"] = self.nsim_v4_pfx + "1"
+ self.addr_v["6"] = self.nsim_v6_pfx + "1"
+ self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
+ self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
+ kind = "netns"
+ args = self._netns.name
+
+ self.remote = Remote(kind, args, src_path)
+
+ self.addr_ipver = "6" if self.addr_v["6"] else "4"
+ self.addr = self.addr_v[self.addr_ipver]
+ self.remote_addr = self.remote_addr_v[self.addr_ipver]
+
+ # Bracketed addresses, some commands need IPv6 to be inside []
+ self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
+ self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
+
+ self.ifname = self.dev['ifname']
+ self.ifindex = self.dev['ifindex']
+
+ # resolve remote interface name
+ self.remote_ifname = self.resolve_remote_ifc()
+
+ self._required_cmd = {}
+
+ def create_local(self):
+ self._netns = NetNS()
+ self._ns = NetdevSimDev()
+ self._ns_peer = NetdevSimDev(ns=self._netns)
+
+ with open("/proc/self/ns/net") as nsfd0, \
+ open("/var/run/netns/" + self._netns.name) as nsfd1:
+ ifi0 = self._ns.nsims[0].ifindex
+ ifi1 = self._ns_peer.nsims[0].ifindex
+ NetdevSimDev.ctrl_write('link_device',
+ f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
+
+ ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
+ ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
+ ip(f" link set dev {self._ns.nsims[0].ifname} up")
+
+ ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
+ ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
+ ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
+
+ def _check_env(self):
+ vars_needed = [
+ ["LOCAL_V4", "LOCAL_V6"],
+ ["REMOTE_V4", "REMOTE_V6"],
+ ["REMOTE_TYPE"],
+ ["REMOTE_ARGS"]
+ ]
+ missing = []
+
+ for choice in vars_needed:
+ for entry in choice:
+ if entry in self.env:
+ break
+ else:
+ missing.append(choice)
+ # Make sure v4 / v6 configs are symmetric
+ if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
+ missing.append(["LOCAL_V6", "REMOTE_V6"])
+ if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
+ missing.append(["LOCAL_V4", "REMOTE_V4"])
+ if missing:
+ raise Exception("Invalid environment, missing configuration:", missing,
+ "Please see tools/testing/selftests/drivers/net/README.rst")
+
+ def resolve_remote_ifc(self):
+ v4 = v6 = None
+ if self.remote_addr_v["4"]:
+ v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
+ if self.remote_addr_v["6"]:
+ v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
+ if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
+ raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
+ if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
+ raise Exception("Can't resolve remote interface name, multiple interfaces match")
+ return v6[0]["ifname"] if v6 else v4[0]["ifname"]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, ex_type, ex_value, ex_tb):
+ """
+ __exit__ gets called at the end of a "with" block.
+ """
+ self.__del__()
+
+ def __del__(self):
+ if self._ns:
+ self._ns.remove()
+ self._ns = None
+ if self._ns_peer:
+ self._ns_peer.remove()
+ self._ns_peer = None
+ if self._netns:
+ del self._netns
+ self._netns = None
+ if self.remote:
+ del self.remote
+ self.remote = None
+
+ def require_ipver(self, ipver):
+ if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
+ raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
+
+ def _require_cmd(self, comm, key, host=None):
+ cached = self._required_cmd.get(comm, {})
+ if cached.get(key) is None:
+ cached[key] = cmd("command -v -- " + comm, fail=False,
+ shell=True, host=host).ret == 0
+ self._required_cmd[comm] = cached
+ return cached[key]
+
+ def require_cmd(self, comm, local=True, remote=False):
+ if local:
+ if not self._require_cmd(comm, "local"):
+ raise KsftSkipEx("Test requires command: " + comm)
+ if remote:
+ if not self._require_cmd(comm, "remote"):
+ raise KsftSkipEx("Test requires (remote) command: " + comm)
+
+ def wait_hw_stats_settle(self):
+ """
+ Wait for HW stats to become consistent, some devices DMA HW stats
+ periodically so events won't be reflected until next sync.
+ Good drivers will tell us via ethtool what their sync period is.
+ """
+ if self._stats_settle_time is None:
+ data = {}
+ try:
+ data = ethtool("-c " + self.ifname, json=True)[0]
+ except CmdExitFailure as e:
+ if "Operation not supported" not in e.cmd.stderr:
+ raise
+
+ self._stats_settle_time = 0.025 + \
+ data.get('stats-block-usecs', 0) / 1000 / 1000
+
+ time.sleep(self._stats_settle_time)
diff --git a/tools/testing/selftests/drivers/net/lib/py/load.py b/tools/testing/selftests/drivers/net/lib/py/load.py
new file mode 100644
index 000000000000..d9c10613ae67
--- /dev/null
+++ b/tools/testing/selftests/drivers/net/lib/py/load.py
@@ -0,0 +1,58 @@
+# SPDX-License-Identifier: GPL-2.0
+
+import time
+
+from lib.py import ksft_pr, cmd, ip, rand_port, wait_port_listen
+
+class GenerateTraffic:
+ def __init__(self, env, port=None):
+ env.require_cmd("iperf3", remote=True)
+
+ self.env = env
+
+ if port is None:
+ port = rand_port()
+ self._iperf_server = cmd(f"iperf3 -s -1 -p {port}", background=True)
+ wait_port_listen(port)
+ time.sleep(0.1)
+ self._iperf_client = cmd(f"iperf3 -c {env.addr} -P 16 -p {port} -t 86400",
+ background=True, host=env.remote)
+
+ # Wait for traffic to ramp up
+ if not self._wait_pkts(pps=1000):
+ self.stop(verbose=True)
+ raise Exception("iperf3 traffic did not ramp up")
+
+ def _wait_pkts(self, pkt_cnt=None, pps=None):
+ """
+ Wait until we've seen pkt_cnt or until traffic ramps up to pps.
+ Only one of pkt_cnt or pss can be specified.
+ """
+ pkt_start = ip("-s link show dev " + self.env.ifname, json=True)[0]["stats64"]["rx"]["packets"]
+ for _ in range(50):
+ time.sleep(0.1)
+ pkt_now = ip("-s link show dev " + self.env.ifname, json=True)[0]["stats64"]["rx"]["packets"]
+ if pps:
+ if pkt_now - pkt_start > pps / 10:
+ return True
+ pkt_start = pkt_now
+ elif pkt_cnt:
+ if pkt_now - pkt_start > pkt_cnt:
+ return True
+ return False
+
+ def wait_pkts_and_stop(self, pkt_cnt):
+ failed = not self._wait_pkts(pkt_cnt=pkt_cnt)
+ self.stop(verbose=failed)
+
+ def stop(self, verbose=None):
+ self._iperf_client.process(terminate=True)
+ if verbose:
+ ksft_pr(">> Client:")
+ ksft_pr(self._iperf_client.stdout)
+ ksft_pr(self._iperf_client.stderr)
+ self._iperf_server.process(terminate=True)
+ if verbose:
+ ksft_pr(">> Server:")
+ ksft_pr(self._iperf_server.stdout)
+ ksft_pr(self._iperf_server.stderr)
diff --git a/tools/testing/selftests/drivers/net/lib/py/remote.py b/tools/testing/selftests/drivers/net/lib/py/remote.py
new file mode 100644
index 000000000000..b1780b987722
--- /dev/null
+++ b/tools/testing/selftests/drivers/net/lib/py/remote.py
@@ -0,0 +1,15 @@
+# SPDX-License-Identifier: GPL-2.0
+
+import os
+import importlib
+
+_modules = {}
+
+def Remote(kind, args, src_path):
+ global _modules
+
+ if kind not in _modules:
+ _modules[kind] = importlib.import_module("..remote_" + kind, __name__)
+
+ dir_path = os.path.abspath(src_path + "/../")
+ return getattr(_modules[kind], "Remote")(args, dir_path)
diff --git a/tools/testing/selftests/drivers/net/lib/py/remote_netns.py b/tools/testing/selftests/drivers/net/lib/py/remote_netns.py
new file mode 100644
index 000000000000..7d5eeb0271bc
--- /dev/null
+++ b/tools/testing/selftests/drivers/net/lib/py/remote_netns.py
@@ -0,0 +1,21 @@
+# SPDX-License-Identifier: GPL-2.0
+
+import os
+import subprocess
+
+from lib.py import cmd
+
+
+class Remote:
+ def __init__(self, name, dir_path):
+ self.name = name
+ self.dir_path = dir_path
+
+ def cmd(self, comm):
+ return subprocess.Popen(["ip", "netns", "exec", self.name, "bash", "-c", comm],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ def deploy(self, what):
+ if os.path.isabs(what):
+ return what
+ return os.path.abspath(self.dir_path + "/" + what)
diff --git a/tools/testing/selftests/drivers/net/lib/py/remote_ssh.py b/tools/testing/selftests/drivers/net/lib/py/remote_ssh.py
new file mode 100644
index 000000000000..924addde19a3
--- /dev/null
+++ b/tools/testing/selftests/drivers/net/lib/py/remote_ssh.py
@@ -0,0 +1,39 @@
+# SPDX-License-Identifier: GPL-2.0
+
+import os
+import string
+import subprocess
+import random
+
+from lib.py import cmd
+
+
+class Remote:
+ def __init__(self, name, dir_path):
+ self.name = name
+ self.dir_path = dir_path
+ self._tmpdir = None
+
+ def __del__(self):
+ if self._tmpdir:
+ cmd("rm -rf " + self._tmpdir, host=self)
+ self._tmpdir = None
+
+ def cmd(self, comm):
+ return subprocess.Popen(["ssh", "-q", self.name, comm],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ def _mktmp(self):
+ return ''.join(random.choice(string.ascii_lowercase) for _ in range(8))
+
+ def deploy(self, what):
+ if not self._tmpdir:
+ self._tmpdir = "/tmp/" + self._mktmp()
+ cmd("mkdir " + self._tmpdir, host=self)
+ file_name = self._tmpdir + "/" + self._mktmp() + os.path.basename(what)
+
+ if not os.path.isabs(what):
+ what = os.path.abspath(self.dir_path + "/" + what)
+
+ cmd(f"scp {what} {self.name}:{file_name}")
+ return file_name