aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Koo <steven.koo@ni.com>2022-06-20 10:47:18 -0500
committerskooNI <60897865+skooNI@users.noreply.github.com>2022-07-20 15:57:20 -0500
commit770711c40a482d1e87d75393dd3fe95d75efa379 (patch)
tree87e877dffdc4740c47904caa1118fa267b89f0c5
parentfpga: Add READMEs describing Lattice and ADI IP sourcing (diff)
downloaduhd-770711c40a482d1e87d75393dd3fe95d75efa379.tar.xz
uhd-770711c40a482d1e87d75393dd3fe95d75efa379.zip
ci: add devtest e320 support
This commit adds devtest support for e320 via tftp. The e320 has a hardware incompatibility with sdmuxes that we use for the n3xx devices, which makes them unreliable. Instead this loads a small Linux OS into the e320 system memory and reimages the sd card from there. Signed-off-by: Steven Koo <steven.koo@ni.com>
-rw-r--r--.ci/templates/job-uhd-devtest-rhombus.yml19
-rw-r--r--.ci/templates/job-uhd-devtest.yml41
-rw-r--r--.ci/templates/stages-uhd-pipeline.yml6
-rwxr-xr-x.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml16
-rw-r--r--.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml14
-rwxr-xr-x.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml13
-rw-r--r--.ci/utils/httpd.py63
-rw-r--r--.ci/utils/mutex_hardware.py124
-rw-r--r--.ci/utils/tftp.py98
9 files changed, 380 insertions, 14 deletions
diff --git a/.ci/templates/job-uhd-devtest-rhombus.yml b/.ci/templates/job-uhd-devtest-rhombus.yml
index 1395b0de0..3348e4fff 100644
--- a/.ci/templates/job-uhd-devtest-rhombus.yml
+++ b/.ci/templates/job-uhd-devtest-rhombus.yml
@@ -10,7 +10,7 @@ parameters:
default: current
- name: testDevices
type: string
- default: 'x3xx,b2xx,n3xx'
+ default: 'x3xx,b2xx,n3xx,e320'
jobs:
- template: job-uhd-devtest.yml
@@ -136,3 +136,20 @@ jobs:
devtestPattern: 'n3x0'
devSDImage: gnuradio-image-ni-sulfur-rev11-mender.sdimg.bz2
devLabgridConfig: .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-n321-0.yml
+
+ ${{ if contains(parameters.testDevices, 'e320') }}:
+ rhombus-e320-0:
+ devAgent: rhombus-e320-0
+ devType: 'e3xx'
+ devModel: 'e320'
+ devName: rhombus-e320-0
+ devSerial: '31A8171'
+ devHostname: 'ni-e320-31a8171'
+ devBus: 'ip'
+ devAddr: '192.168.20.7'
+ sfpAddrs: '192.168.20.7'
+ devFpga: 'XG'
+ devtestPattern: 'e320'
+ devInitramfsImage: fitImage-manufacturing
+ devSDImage: gnuradio-image-ni-neon-rev2-mender.sdimg.bz2
+ devLabgridConfig: .ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml
diff --git a/.ci/templates/job-uhd-devtest.yml b/.ci/templates/job-uhd-devtest.yml
index 2b284c7ab..3e095c5f3 100644
--- a/.ci/templates/job-uhd-devtest.yml
+++ b/.ci/templates/job-uhd-devtest.yml
@@ -67,18 +67,30 @@ jobs:
cleanDestinationFolder: true
- download: ${{ parameters.uhdArtifactSource }}
- artifact: $(devType)-images
+ artifact: n3xx-images
# Only sync the bz2 sdimg since the bmap
# is incompatible with mender
- patterns: '**/*.bz2'
+ patterns: |
+ **/*.bz2
+ fitImage-manufacturing
displayName: Download $(devType)-images artifact
condition: and(succeeded(), eq(variables.devType, 'n3xx'))
+ - download: ${{ parameters.uhdArtifactSource }}
+ artifact: e320-images
+ # Only sync the bz2 sdimg since the bmap
+ # is incompatible with mender
+ patterns: |
+ **/*.bz2
+ fitImage-manufacturing
+ displayName: Download $(devType)-images artifact
+ condition: and(succeeded(), eq(variables.devModel, 'e320'))
+
- script: |
cd $(Build.BinariesDirectory)/uhddev/build
mkdir -p fpga_images
rm -rf fpga_images/*
- python3 utils/uhd_images_downloader.py -t $(devModel) -i fpga_images \
+ python3 utils/uhd_images_downloader.py -t $(devModel)_fpga -i fpga_images \
-b $(sdr-fileserver)
if [ "$(devType)" = "b200" ]; then
python3 utils/uhd_images_downloader.py -t b2xx_common -i fpga_images \
@@ -93,7 +105,7 @@ jobs:
export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH
export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images
python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \
- --sdimage $(devType),$(devModel),$(uhd_artifact_directory)/$(devType)-images/$(devSDImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig),$(devHostname) \
+ --sdimage_sdmux $(devType),$(devModel),$(uhd_artifact_directory)/$(devType)-images/$(devSDImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig),$(devHostname) \
--fpgas $(devFpga) \
--sfp_addrs $(sfpAddrs) \
${{ parameters.redisHost }} $(devName) \
@@ -114,6 +126,27 @@ jobs:
export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH
export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images
python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \
+ --sdimage_tftp $(devType),$(devModel),$(uhd_artifact_directory)/e320-images/$(devSDImage),$(uhd_artifact_directory)/e320-images/$(devInitramfsImage),${{ parameters.uhdSrcDir }}/$(devLabgridConfig) \
+ --fpgas $(devFpga) \
+ --sfp_addrs $(sfpAddrs) \
+ ${{ parameters.redisHost }} $(devName) \
+ "$(Build.BinariesDirectory)/uhddev/build/utils/uhd_usrp_probe --args addr=$(devAddr)" \
+ "python3 ${{ parameters.uhdSrcDir }}/host/tests/devtest/run_testsuite.py \
+ --src-dir ${{ parameters.uhdSrcDir }}/host/tests/devtest \
+ --devtest-pattern $(devtestPattern) --args addr=$(devAddr),type=$(devType) \
+ --build-type Release --build-dir $(Build.BinariesDirectory)/uhddev/build \
+ --python-interp python3 --xml"
+ continueOnError: true
+ condition: and(succeeded(), eq(variables.devModel, 'e320'), eq(variables.devBus, 'ip'))
+ displayName: Run e320 devtest on $(devName)
+
+ - script: |
+ mkdir -p $(Common.TestResultsDirectory)/devtest
+ cd $(Common.TestResultsDirectory)/devtest
+ export PATH=$(Build.BinariesDirectory)/uhddev/build/utils:$(Build.BinariesDirectory)/uhddev/build/examples:$PATH
+ export LD_LIBRARY_PATH=$(Build.BinariesDirectory)/uhddev/build/lib:$LD_LIBRARY_PATH
+ export UHD_IMAGES_DIR=$(Build.BinariesDirectory)/uhddev/build/fpga_images
+ python3 ${{ parameters.uhdSrcDir }}/.ci/utils/mutex_hardware.py \
${{ parameters.redisHost }} $(devName) \
"$(Build.BinariesDirectory)/uhddev/build/utils/uhd_usrp_probe --args serial=$(devSerial)" \
"$(Build.BinariesDirectory)/uhddev/build/utils/uhd_image_loader --args serial=$(devSerial),type=$(devType)" \
diff --git a/.ci/templates/stages-uhd-pipeline.yml b/.ci/templates/stages-uhd-pipeline.yml
index 0b30012b4..a93ec5185 100644
--- a/.ci/templates/stages-uhd-pipeline.yml
+++ b/.ci/templates/stages-uhd-pipeline.yml
@@ -238,8 +238,8 @@ stages:
uhdSrcDir: $(Build.SourcesDirectory)
testDevices: 'x3xx,b2xx'
-- stage: devtest_uhd_n3xx_stage
- displayName: devtest UHD n3xx
+- stage: devtest_uhd_n3xx_e320_stage
+ displayName: devtest UHD n3xx e320
dependsOn:
- build_uhd_stage_linux
- build_uhd_embedded_system_images
@@ -248,7 +248,7 @@ stages:
parameters:
testOS: ubuntu2004
uhdSrcDir: $(Build.SourcesDirectory)
- testDevices: 'n3xx'
+ testDevices: 'n3xx,e320'
- stage: test_uhd_x4xx_stage
displayName: Test UHD x4xx
diff --git a/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml b/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml
index baf68bbd1..2fbf5052b 100755
--- a/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml
+++ b/.ci/templates/tests/rhombus-labgrid/crossbar/places.yaml
@@ -32,3 +32,19 @@ rhombus-n321-0:
reservation: null
tags: {}
+rhombus-e320-0:
+ acquired: null
+ acquired_resources: []
+ aliases: []
+ allowed: []
+ changed: 1654034475.1935894
+ comment: ''
+ created: 1654034136.0077882
+ matches:
+ - cls: '*'
+ exporter: '*'
+ group: rhombus-e320-0-group
+ name: null
+ rename: null
+ reservation: null
+ tags: {}
diff --git a/.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml b/.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml
new file mode 100644
index 000000000..e9c934817
--- /dev/null
+++ b/.ci/templates/tests/rhombus-labgrid/device-configs/rhombus-e320-0.yml
@@ -0,0 +1,14 @@
+targets:
+ main:
+ resources:
+ RemotePlace:
+ name: 'rhombus-e320-0'
+ drivers:
+ - SerialDriver:
+ name: 'linux_serial_driver'
+ bindings:
+ port: 'console-linux'
+ - SerialDriver:
+ name: 'scu_serial_driver'
+ bindings:
+ port: 'console-scu'
diff --git a/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml b/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml
index f13ebbe07..c15385aac 100755
--- a/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml
+++ b/.ci/templates/tests/rhombus-labgrid/exporter-conf/exporter.yaml
@@ -32,3 +32,16 @@ rhombus-n321-0-group:
match:
ID_SERIAL_SHORT: '000000001140'
+rhombus-e320-0-group:
+ console-scu:
+ cls: USBSerialPort
+ match:
+ ID_SERIAL: 'Silicon_Labs_CP2105_Dual_USB_to_UART_Bridge_Controller_0097841B'
+ ID_USB_INTERFACE_NUM: '00'
+ speed: 115200
+ console-linux:
+ cls: USBSerialPort
+ match:
+ ID_SERIAL: 'Silicon_Labs_CP2105_Dual_USB_to_UART_Bridge_Controller_0097841B'
+ ID_USB_INTERFACE_NUM: '01'
+ speed: 115200
diff --git a/.ci/utils/httpd.py b/.ci/utils/httpd.py
new file mode 100644
index 000000000..53741566e
--- /dev/null
+++ b/.ci/utils/httpd.py
@@ -0,0 +1,63 @@
+import http.server
+import time
+import os
+import pyroute2
+import socket
+import socketserver
+import threading
+from functools import partial
+from pathlib import Path
+
+class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
+ pass
+
+class HTTPServer:
+ def __init__(self, path, remote_ip):
+ self.path = path
+ self.port = None
+ self.old_path = None
+ self.httpd = None
+
+ with pyroute2.IPRoute() as ipr:
+ r = ipr.route('get', dst=remote_ip)
+ for attr in r[0]['attrs']:
+ if attr[0] == 'RTA_PREFSRC':
+ self.ip = attr[1]
+ with socket.socket() as s:
+ s.bind(('', 0))
+ self.port = s.getsockname()[1]
+
+ def get_url(self, filename):
+ path = Path(self.path) / filename
+ assert path.exists()
+ return f"http://{self.ip}:{self.port}/{filename}"
+
+ def __enter__(self):
+ def start_server():
+ Handler = http.server.SimpleHTTPRequestHandler
+ self.httpd = ThreadingHTTPServer(("", self.port), Handler)
+ self.httpd.serve_forever()
+
+ # Kind of annoying, but to work with older pythons where
+ # SimpleHTTPRequestHandler doesn't take a directory parameter but only
+ # serves the current directory:
+ self.old_path = os.getcwd()
+ os.chdir(self.path)
+
+ self.thread = threading.Thread(target=start_server)
+ self.thread.start()
+ return self
+
+ def __exit__(self, type, value, exc):
+ if self.httpd is not None:
+ self.httpd.shutdown()
+ self.httpd.server_close()
+ if self.old_path is not None:
+ os.chdir(self.old_path)
+
+if __name__ == '__main__':
+ with HTTPServer("/tmp", "127.0.0.1") as server:
+ print("server ip", server.ip)
+ print("server port", server.port)
+ time.sleep(300)
+
diff --git a/.ci/utils/mutex_hardware.py b/.ci/utils/mutex_hardware.py
index 485f0fcbd..9f48ed22e 100644
--- a/.ci/utils/mutex_hardware.py
+++ b/.ci/utils/mutex_hardware.py
@@ -8,17 +8,22 @@ import labgrid
import os
import pathlib
import shlex
+import socket
import subprocess
import sys
import time
from fabric import Connection
+from httpd import HTTPServer
from pottery import Redlock
from redis import Redis
+from tftp import TFTPServer
bitfile_name = "usrp_{}_fpga_{}.bit"
def jtag_x3xx(dev_type, dev_model, jtag_server, jtag_serial, fpga_folder, fpga, redis_server):
+ if dev_model not in ["x300", "x310"]:
+ raise RuntimeError(f'{dev_type} not supported with jtag_x3xx')
remote_working_dir = "pipeline_fpga"
vivado_program_jtag = "/opt/Xilinx/Vivado_Lab/2020.1/bin/vivado_lab -mode batch -source {}/viv_hardware_utils.tcl -nolog -nojournal -tclargs program".format(
remote_working_dir)
@@ -50,7 +55,12 @@ def set_sfp_addrs(mgmt_addr, sfp_addrs):
dut.run(f"ip link set sfp{idx} up")
time.sleep(30)
-def flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs):
+def flash_sdimage_sdmux(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs):
+ """ This method uses an sdmux (https://linux-automation.com/en/products/usb-sd-mux.html)
+ to reimage the sd card.
+ """
+ if dev_model not in ["n300", "n310", "n320", "n321"]:
+ raise RuntimeError(f'{dev_model} not supported with sdimage_sdmux')
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release --kick"))
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} acquire"))
env = labgrid.Environment(labgrid_device_yaml)
@@ -101,6 +111,97 @@ def flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_a
subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release"))
+def flash_sdimage_tftp(dev_model, sdimage_path, initramfs_path, labgrid_device_yaml, sfp_addrs, redis_server):
+ """ This method uses tftp to boot the device into a small Linux envionment to
+ write to the device's sd card. This method is used on the E320 since it has
+ a hardware incompatibility with sdmuxes.
+ """
+ if dev_model not in ["e320"]:
+ raise RuntimeError(f'{dev_model} not supported with sdimage_tftp')
+
+ if dev_model == "e320":
+ dev_ram_address = '0x20000000'
+ dev_bootm_config = 'conf@zynq-ni-${mboard}.dtb'
+
+ subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release --kick"))
+ subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} acquire"))
+ env = labgrid.Environment(labgrid_device_yaml)
+ target = env.get_target()
+
+ cp_scu = target.get_driver(labgrid.protocol.ConsoleProtocol, name="scu_serial_driver")
+ cp_linux = target.get_driver(labgrid.protocol.ConsoleProtocol, name="linux_serial_driver")
+
+ print("Powering down DUT", flush=True)
+ cp_scu.write("\napshutdown\n".encode())
+ time.sleep(10)
+
+ print("Powering on DUT", flush=True)
+ cp_scu.write("\npowerbtn\n".encode())
+ # Sometimes it requires multiple powerbtn calls to turn on device
+ try:
+ cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=5)
+ except Exception:
+ print("Device didn't power on with first attempt. Trying again...", flush=True)
+ cp_scu.write("\npowerbtn\n".encode())
+ cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=5)
+
+ print("Attempting to get into uboot console", flush=True)
+ cp_linux.write("noautoboot".encode())
+ # Handle if the watchdog triggers
+ try:
+ cp_linux.expect("Enter 'noautoboot' to enter prompt without timeout", timeout=30)
+ cp_linux.write("noautoboot".encode())
+ except Exception:
+ pass
+ cp_linux.expect("uboot>")
+ print("Waiting for NIC to come up", flush=True)
+ time.sleep(10)
+ cp_linux.write(f"setenv autoload no; dhcp;\n".encode())
+ cp_linux.expect("DHCP client bound to address")
+ expect_index, expect_before, expect_match , expect_after = cp_linux.expect(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b")
+ mgmt_addr = expect_match[0].decode()
+ print(f"Dev got IP Address {mgmt_addr}")
+
+ with TFTPServer(initramfs_path, mgmt_addr) as server:
+ time.sleep(10)
+ cp_linux.expect("uboot>")
+ cp_linux.write(f"setenv tftpdstp {server.port}\n".encode())
+ cp_linux.expect("uboot>")
+ print("TFTPing initramfs image", flush=True)
+ cp_linux.write(f"tftpboot {dev_ram_address} {server.ip}:{os.path.basename(initramfs_path)}\n".encode())
+ cp_linux.expect("uboot>", timeout=120)
+ print("Booting into initramfs", flush=True)
+ cp_linux.write(f"bootm {dev_ram_address}#{dev_bootm_config}\n".encode())
+ cp_linux.expect("mender login:", timeout=120)
+ print("Logging into Linux", flush=True)
+ cp_linux.write("root\n".encode())
+ cp_linux.expect("mender:~#")
+ print("Waiting for NIC to DHCP", flush=True)
+ time.sleep(10)
+
+ with HTTPServer(os.path.dirname(sdimage_path), mgmt_addr) as server:
+ print(f"Writing SD Card using {sdimage_path}", flush=True)
+ print("Running bmaptool... This will take awhile", flush=True)
+ cp_linux.write(f"bmaptool copy --nobmap {server.get_url(os.path.basename(sdimage_path))} /dev/mmcblk0\n".encode())
+ cp_linux.expect("mender:~#", timeout=1800)
+ cp_linux.write("echo bmaptool exit code: $?\n".encode())
+ cp_linux.expect("bmaptool exit code: 0", timeout=10)
+ time.sleep(10)
+ print("Rebooting into new image from sd card", flush=True)
+ cp_linux.write("reboot\n".encode())
+
+ print("Waiting 2 minutes for device to boot", flush=True)
+ time.sleep(120)
+ cp_linux.expect("login:", timeout=30)
+ known_hosts_path = os.path.expanduser("~/.ssh/known_hosts")
+ subprocess.run(shlex.split(f"ssh-keygen -f \"{known_hosts_path}\" -R \"{mgmt_addr}\""))
+
+ if sfp_addrs:
+ set_sfp_addrs(mgmt_addr, sfp_addrs)
+
+ subprocess.run(shlex.split(f"labgrid-client -c {labgrid_device_yaml} release"))
+ return mgmt_addr
+
def main(args):
redis_server = {Redis.from_url(
"redis://{}:6379/0".format(args.redis_server))}
@@ -108,13 +209,21 @@ def main(args):
with Redlock(key=args.dut_name, masters=redis_server, auto_release_time=1000 * 60 * args.dut_timeout):
print("Got mutex for {}".format(args.dut_name), flush=True)
- if args.sdimage:
- dev_type, dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr = args.sdimage.split(',')
+ if args.sdimage_sdmux:
+ dev_type, dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr = args.sdimage_sdmux.split(',')
+ if args.sfp_addrs:
+ sfp_addrs = args.sfp_addrs.split(',')
+ else:
+ sfp_addrs = None
+ flash_sdimage_sdmux(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs)
+
+ if args.sdimage_tftp:
+ dev_type, dev_model, sdimage_path, initramfs_path, labgrid_device_yaml = args.sdimage_tftp.split(',')
if args.sfp_addrs:
sfp_addrs = args.sfp_addrs.split(',')
else:
sfp_addrs = None
- flash_sdimage(dev_model, sdimage_path, labgrid_device_yaml, mgmt_addr, sfp_addrs)
+ mgmt_addr = flash_sdimage_tftp(dev_model, sdimage_path, initramfs_path, labgrid_device_yaml, sfp_addrs, redis_server)
if args.fpgas:
working_dir = os.getcwd()
@@ -146,14 +255,17 @@ def main(args):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group()
# jtag_x3xx will flash the fpga for a given jtag_serial using
# Vivado on jtag_server. It uses SSH to control jtag_server.
# Provide fpga_path as a local path and it will be copied
# to jtag_server.
- parser.add_argument("--jtag_x3xx", type=str,
+ group.add_argument("--jtag_x3xx", type=str,
help="dev_type,dev_model,user@jtag_server,jtag_serial,fpga_folder")
- parser.add_argument("--sdimage", type=str,
+ group.add_argument("--sdimage_sdmux", type=str,
help="dev_type,dev_model,sdimg_path,labgrid_device_yaml,mgmt_addr")
+ group.add_argument("--sdimage_tftp", type=str,
+ help="dev_type,dev_model,sdimg_path,initramfs_path,labgrid_device_yaml")
parser.add_argument("--sfp_addrs", type=str,
help="sfp0ip,sfp1ip,...")
parser.add_argument("--fpgas", type=str,
diff --git a/.ci/utils/tftp.py b/.ci/utils/tftp.py
new file mode 100644
index 000000000..44291cd93
--- /dev/null
+++ b/.ci/utils/tftp.py
@@ -0,0 +1,98 @@
+#!/usr/bin/env python3
+
+import asyncio
+import py3tftp.protocols
+import pyroute2
+import socket
+import threading
+from pathlib import Path
+
+
+class FileReaderSingle:
+ def __init__(self, path, fname_req, chunk_size=0):
+ self.path = path
+ # TODO: Should check fname_req against actual name
+ self.chunk_size = chunk_size
+ self._f = None
+ self._f = open(self.path, 'rb')
+ self.finished = False
+
+ def file_size(self):
+ return self.path.stat().st_size
+
+ def read_chunk(self, size=None):
+ size = size or self.chunk_size
+ if self.finished:
+ return b''
+
+ data = self._f.read(size)
+ if not data or (size > 0 and len(data) < size):
+ self._f.close()
+ self.finished = True
+
+ return data
+
+ def __del__(self):
+ if self._f and not self._f.closed:
+ self._f.close()
+
+
+class TFTPServerSingle(py3tftp.protocols.BaseTFTPServerProtocol):
+ def __init__(self, path, host_interface, loop, extra_opts):
+ super().__init__(host_interface, loop, extra_opts)
+ self.path = path
+
+ def select_protocol(self, packet):
+ if packet.is_rrq():
+ return py3tftp.protocols.RRQProtocol
+ raise py3tftp.protocols.ProtocolException("Unhandled protocol")
+
+ def select_file_handler(self, packet):
+ if packet.is_rrq():
+ return lambda filename, opts: FileReaderSingle(self.path, filename, opts)
+
+
+class TFTPServer:
+ """
+ Simple TFTP server, meant to be short-lived and capable of serving a single
+ file only
+ """
+ def __init__(self, filename, remote_ip, port=None):
+ self.path = Path(filename).absolute()
+ assert self.path.exists()
+ assert self.path.is_file()
+
+ self.filename = self.path.name
+
+ if port == None:
+ with socket.socket() as s:
+ s.bind(('', 0))
+ self.port = s.getsockname()[1]
+ else:
+ self.port = port
+
+ with pyroute2.IPRoute() as ipr:
+ r = ipr.route('get', dst=remote_ip)
+ for attr in r[0]['attrs']:
+ if attr[0] == 'RTA_PREFSRC':
+ self.ip = attr[1]
+
+ def __enter__(self):
+ self.loop = asyncio.new_event_loop()
+ listen = self.loop.create_datagram_endpoint(
+ lambda: TFTPServerSingle(self.path, self.ip, self.loop, {}),
+ local_addr=(self.ip, self.port))
+
+ def start_loop(loop):
+ asyncio.set_event_loop(loop)
+ loop.run_forever()
+
+ self.transport, protocol = self.loop.run_until_complete(listen)
+ self.thread = threading.Thread(target=start_loop, args=(self.loop,))
+ self.thread.start()
+ return self
+
+ def __exit__(self, type, value, exc):
+ self.transport.close()
+ self.loop.call_soon_threadsafe(self.loop.stop)
+ self.thread.join()