aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Anderseck <martin.anderseck@ni.com>2023-12-21 16:22:01 +0100
committerAki Tomita <121511582+atomita-ni@users.noreply.github.com>2024-02-27 11:29:00 -0600
commitad6607a8fe21434d0b7d89703a5c7541262faa7b (patch)
treed811d9f2e3f625f60b8333995e05d5dc04499596
parentcmake: Change ENABLE_PYMOD_UTILS default to ON (diff)
downloaduhd-ad6607a8fe21434d0b7d89703a5c7541262faa7b.tar.xz
uhd-ad6607a8fe21434d0b7d89703a5c7541262faa7b.zip
python: Add DramReceiver class
This class is a utility to more easily using an available replay block as a receiver
-rw-r--r--host/python/uhd/usrp/dram_utils.py474
1 files changed, 429 insertions, 45 deletions
diff --git a/host/python/uhd/usrp/dram_utils.py b/host/python/uhd/usrp/dram_utils.py
index 7479875af..e20138e9f 100644
--- a/host/python/uhd/usrp/dram_utils.py
+++ b/host/python/uhd/usrp/dram_utils.py
@@ -10,7 +10,7 @@ UHD Extension, helpers, and utilities for doing more with the available PL DRAM
import time
from uhd import rfnoc
from uhd.usrp import StreamArgs
-from uhd.types import TXMetadata, StreamMode, StreamCMD
+from uhd.types import TXMetadata, RXMetadata, RXMetadataErrorCode, StreamMode, StreamCMD, TimeSpec
def enumerate_radios(graph, radio_chans):
"""
@@ -71,26 +71,35 @@ class DramTransmitter:
- "0/Radio:0" (single string, channel 0 by default)
- ("0/Radio#0", 0) (tuple of block id and channel number)
replay_blockid -- If specified, use this replay block
+ replay_ports -- List of replay ports to use. Must be at least as long as the
+ list of radio channels. If not specified, use all ports.
+ Therefore this should be used if TX and RX are going to be used.
cpu_format -- For the upload process, the data format to be used
- mem_regions -- A list of (memory start, memory size) tuples, one per channel.
- If left out, the memory is split up evenly among channels.
+ mem_regions -- A list of (memory start, memory size) tuples, one per replay block port.
+ If left out, the memory is split up evenly among available replay block ports.
"""
def __init__(self,
rfnoc_graph,
- radio_chans=None,
+ radio_chans,
replay_blockid=None,
+ replay_ports=None,
cpu_format='fc32',
mem_regions=None,
):
# We make replay_blocks a list so we can support multiple replay blocks
- # (on multiple motherboards) in the future without changing APIs
+ # (not only on multiple motherboards) in the future without changing APIs
self.replay_blocks = [find_replay_block(rfnoc_graph, replay_blockid)]
self.word_size = max(x.get_word_size() for x in self.replay_blocks)
# In the radio, we always use sc16 regardless of cpu_format
- self.item_size = 4
+ self.bytes_per_sample = 4
+ if replay_ports is None:
+ replay_ports = list(range(len(radio_chans)))
+ self.replay_ports = replay_ports
self.reconnect(rfnoc_graph, radio_chans, mem_regions)
- self.replay_in_port = 0
+ # Since for multi-channel we nevertheless only use one input port to upload the
+ # data we always take the first of the given replay ports
+ self.replay_in_port = self.replay_ports[0]
stream_args = StreamArgs(cpu_format, "sc16")
self.tx_streamer = rfnoc_graph.create_tx_streamer(1, stream_args)
@@ -104,29 +113,48 @@ class DramTransmitter:
"""
Reconnect the replay block to a new set of radio channels.
"""
- # Disconnect the replay block
- active_conns = rfnoc_graph.enumerate_active_connections()
- for conn in active_conns:
- if conn.src_blockid == self.replay_blocks[0].get_unique_id():
- rfnoc_graph.disconnect(
- conn.src_blockid,
- conn.src_port,
- conn.dst_blockid,
- conn.dst_port,
- )
+ self.replay_ports = self._sanitize_replay_ports(self.replay_ports)
# Map requested radio_chans to the outputs of this replay block
self.radio_chan_pairs = enumerate_radios(rfnoc_graph, radio_chans)
- if self.replay_blocks[0].get_num_output_ports() < len(self.radio_chan_pairs):
+ if len(self.replay_ports) < len(self.radio_chan_pairs):
raise RuntimeError(
- f"Replay block {self.replay_blocks[0].get_unique_id()} only has "
- f"{self.replay_blocks[0].get_num_output_ports()} ports, "
- f"but {len(self.radio_chan_pairs)} radio channels were requested!")
+ f"{len(self.radio_chan_pairs)} radio channels were requested to be "
+ f"used with {len(self.replay_ports)} replay ports on "
+ f"Replay block {self.replay_blocks[0].get_unique_id()}! "
+ "There must be at least as many replay block ports as radio channels.")
+
+ # Disconnect the ports of the replay block that we want to use later on
+ active_conns = rfnoc_graph.enumerate_active_connections()
+ for conn in active_conns:
+ # If any of the connections that we require are taken already we will
+ # disconnect them here. That is especially important if both the transmitter
+ # and the receiver should be used.
+ if (conn.src_blockid == self.replay_blocks[0].get_unique_id() and \
+ conn.src_port in self.replay_ports) or \
+ (conn.dst_blockid == self.replay_blocks[0].get_unique_id() and \
+ conn.dst_port in self.replay_ports):
+ if "Streamer" in conn.dst_blockid:
+ # Need to use streamer disconnect method for streamers
+ rfnoc_graph.disconnect(
+ conn.dst_blockid, conn.dst_port
+ )
+ elif "Streamer" in conn.src_blockid:
+ pass
+ else:
+ # Disconnect for all other kinds of blocks
+ rfnoc_graph.disconnect(
+ conn.src_blockid,
+ conn.src_port,
+ conn.dst_blockid,
+ conn.dst_port,
+ )
+
self.duc_chan_pairs = []
for replay_port_idx, rcp in enumerate(self.radio_chan_pairs):
edge = rfnoc.connect_through_blocks(
rfnoc_graph,
- self.replay_blocks[0].get_unique_id(), replay_port_idx,
- rcp[0].get_unique_id(), rcp[1])
+ self.replay_blocks[0].get_unique_id(), self.replay_ports[replay_port_idx],
+ rcp[0].get_unique_id(), rcp[1], True)
duc_edge = \
next(filter(lambda edge: rfnoc.BlockID(edge.dst_blockid).match("DUC"), edge), None)
if duc_edge:
@@ -136,17 +164,20 @@ class DramTransmitter:
))
else:
self.duc_chan_pairs.append(None)
- self.replay_ports = list(range(len(self.radio_chan_pairs)))
- # Assign memory regions to each output port
+ # Assign memory regions to each output port (and leave space for potential input ports)
if mem_regions is None:
- mem_stride = self.replay_blocks[0].get_mem_size() // len(self.radio_chan_pairs)
+ mem_stride = self.replay_blocks[0].get_mem_size() \
+ // self.replay_blocks[0].get_num_output_ports()
mem_stride -= mem_stride % self.word_size
self.mem_regions = [
(idx * mem_stride, mem_stride)
- for idx, _ in enumerate(self.radio_chan_pairs)
+ for idx in range(self.replay_blocks[0].get_num_output_ports())
]
else:
- self.mem_regions = mem_regions
+ # Create mem_regions list prepopulated with 0 and then fill it with the
+ # given mem_regions. This way we can also handle the case where only
+ # a subset of the available replay ports is used.
+ self.mem_regions = self._sanitize_mem_regions(mem_regions)
# This stores how much data we have currently uploaded to memory. We
# initialize this with the full memory, not zero.
self.upload_size = [x[1] for x in self.mem_regions]
@@ -157,7 +188,8 @@ class DramTransmitter:
Helper function to turn ports into a valid list of ports on the replay
block.
"""
- ports = ports or self.replay_ports
+ if ports is None:
+ ports = self.replay_ports
if isinstance(ports, int):
ports = [ports]
if any((port >= self.replay_blocks[0].get_num_output_ports() for port in ports)):
@@ -167,6 +199,23 @@ class DramTransmitter:
f"{ports}")
return ports
+ def _sanitize_mem_regions(self, custom_mem_regions=None):
+ """
+ Helper function to make a list of memory regions valid for this replay
+ block and with given custom memory regions.
+ """
+ # If called without custom regions just return what we already have
+ if custom_mem_regions is None:
+ return self.mem_regions
+ mem_regions = [(0,0)] * self.replay_blocks[0].get_num_output_ports()
+ # If custom region is big enough for all possible ports, use it
+ if len(mem_regions) == len(custom_mem_regions):
+ return custom_mem_regions
+ # Otherwise we have to distribute the given regions among the available ports
+ for idx, mem_region in enumerate(custom_mem_regions):
+ if idx < len(self.replay_ports):
+ mem_regions[self.replay_ports[idx]] = mem_region
+ return mem_regions
def _upload(self, waveform, mem_start, mem_size):
"""
@@ -190,13 +239,10 @@ class DramTransmitter:
assert mem_size % self.word_size == 0, \
f"Memory region size (0x{mem_size:X}) is not aligned with " \
f"word size ({self.word_size})!"
- num_items = min(len(waveform), int(mem_size) // waveform.dtype.itemsize)
- if len(waveform.shape) == 2:
- waveform = waveform[:, :num_items]
- elif len(waveform.shape) == 1:
- waveform = waveform[:num_items]
+ num_items = min(len(waveform), int(mem_size) // self.bytes_per_sample)
+ waveform = waveform[:num_items]
- num_bytes = num_items * self.item_size
+ num_bytes = num_items * self.bytes_per_sample
in_port = self.replay_in_port
# Configure DRAM block for recording
self.replay_blocks[0].record(mem_start, num_bytes, in_port)
@@ -250,15 +296,29 @@ class DramTransmitter:
ports = None
else:
ports = self._sanitize_replay_ports(ports)
- mem_regions = [ self.mem_regions[port] for port in ports]
+ # Sanitize again in case self.mem_regions was changed from the outside
+ mem_regions = self._sanitize_mem_regions(self.mem_regions)
+ mem_regions = [ mem_regions[port] for port in ports]
if len(mem_regions) == 2 and \
isinstance(mem_regions, (tuple, list)) and \
isinstance(mem_regions[0], int):
mem_regions = [mem_regions]
+
+ # If this method is called from send() then we will work on a per-channel
+ # basis. Otherwise we will walk through the mem_regions that we have put
+ # together above.
+ if len(waveform.shape) == 1:
+ waveform = [waveform] * len(mem_regions)
+ if len(waveform) < len(mem_regions):
+ raise RuntimeError("Number of waveforms in waveform array does not match "
+ "the number of memory regions!")
for region_idx, mem_region in enumerate(mem_regions):
- bytes_uploaded = self._upload(waveform, *mem_region)
- if ports:
- self.upload_size[region_idx] = bytes_uploaded
+ # Since by default we slice the dram per input/output port, we only
+ # want to upload to the memory regions that we are actually using.
+ if ports is None or region_idx < len(ports):
+ bytes_uploaded = self._upload(waveform[region_idx], *mem_region)
+ if ports:
+ self.upload_size[ports[region_idx]] = bytes_uploaded
def issue_stream_cmd(self, stream_cmd, ports=None):
"""
@@ -279,17 +339,17 @@ class DramTransmitter:
loop around the available sample data.
"""
ports = self._sanitize_replay_ports(ports)
+ mem_regions = self._sanitize_mem_regions(self.mem_regions)
for port in ports:
if stream_cmd.stream_mode in (
StreamMode.start_cont,
StreamMode.num_done,
StreamMode.num_more):
- mem_region = self.mem_regions[port]
+ mem_region = mem_regions[port]
mem_size = min(self.upload_size[port], mem_region[1])
self.replay_blocks[0].config_play(mem_region[0], mem_size, port)
self.replay_blocks[0].issue_stream_cmd(stream_cmd, port)
-
def send(self, data, metadata, timeout=0.1):
"""
This is a wrapper around upload() and issue_stream_cmd() that can be
@@ -319,14 +379,15 @@ class DramTransmitter:
if (num_chans > 1 and len(data.shape) != 2) or (data.shape[0] < num_chans):
raise RuntimeError(
f"Number of TX channels {num_chans} does not match the dimensions "
- f"of the data array (xxx {data.shape})")
- # f"of the data array ({data.shape[0] if len(data.shape) == 2 else 1})")
+ f"of the data array ({data.shape[0] if len(data.shape) == 2 else 1})")
# First upload the data
if num_chans == 1:
- self.upload(data, 0)
+ if len(data.shape) == 2:
+ data = data[0]
+ self.upload(data, self.replay_ports[0])
else:
for chan, _ in enumerate(self.radio_chan_pairs):
- self.upload(data[chan], chan)
+ self.upload(data[chan], self.replay_ports[chan])
# Then trigger stream command
stream_cmd = StreamCMD(StreamMode.num_done)
stream_cmd.stream_now = not metadata.has_time_spec
@@ -341,3 +402,326 @@ class DramTransmitter:
This emulates TxStreamer.recv_async_msg().
"""
return self.replay_blocks[0].get_play_async_metadata(timeout)
+
+class DramReceiver:
+ """
+ Helper class to stream data from one or more radios to DRAM.
+
+ This can be useful when setting up a USRP as a receiver.
+
+ NOTE: This assumes we are using a single memory bank, until UHD is upgraded
+ to better handle multiple memory banks.
+
+ Arguments:
+ rfnoc_graph -- The graph object
+ radio_chans -- A list of radio channels to connect to the DRAM, in one of
+ these formats:
+ - "0/Radio:0#0" (single string)
+ - "0/Radio:0" (single string, channel 0 by default)
+ - ("0/Radio#0", 0) (tuple of block id and channel number)
+ replay_blockid -- If specified, use this replay block
+ replay_ports -- List of replay ports to use. Must be at least as long as the
+ list of radio channels. If not specified, use all ports.
+ Therefore this should be used if TX and RX are going to be used.
+ cpu_format -- Desired data format of the downloaded, received data on the host.
+ mem_regions -- A list of (memory start, memory size) tuples, one per replay block port.
+ If left out, the memory is split up evenly among available replay block ports.
+ throttle -- Throttle factor for the streamer. This is a value between 0 and
+ 1 or a percentage in the range (0%, 100%] that is passed as string.
+ """
+ def __init__(self,
+ rfnoc_graph,
+ radio_chans,
+ replay_blockid=None,
+ replay_ports=None,
+ cpu_format='fc32',
+ mem_regions=None,
+ throttle="0.1"
+ ):
+ # We make replay_blocks a list so we can support multiple replay blocks
+ # (not only on multiple motherboards) in the future without changing APIs
+ self.replay_blocks = [find_replay_block(rfnoc_graph, replay_blockid)]
+ self.word_size = max(x.get_word_size() for x in self.replay_blocks)
+ # In the radio, we always use sc16 regardless of cpu_format
+ self.bytes_per_sample = 4
+ stream_args = StreamArgs(cpu_format, "sc16")
+ stream_args.args['throttle'] = throttle
+ if replay_ports is None:
+ replay_ports = list(range(len(radio_chans)))
+ self.replay_ports = replay_ports
+ self.receive_metadata = None
+
+ self.reconnect(rfnoc_graph, radio_chans, mem_regions)
+ # We only use the first of the given replay ports to download the data sequentially.
+ self.replay_out_port = self.replay_ports[0]
+
+ self.rx_streamer = rfnoc_graph.create_rx_streamer(1, stream_args)
+
+ rfnoc_graph.connect(
+ self.replay_blocks[0].get_unique_id(), self.replay_out_port,
+ self.rx_streamer, 0)
+ rfnoc_graph.commit()
+
+ def reconnect(self, rfnoc_graph, radio_chans, mem_regions=None):
+ """
+ Reconnect the replay block to a new set of radio channels.
+ """
+ self.replay_ports = self._sanitize_replay_ports(self.replay_ports)
+ # Map requested radio_chans to the inputs of this replay block
+ self.radio_chan_pairs = enumerate_radios(rfnoc_graph, radio_chans)
+ if len(self.replay_ports) < len(self.radio_chan_pairs):
+ raise RuntimeError(
+ f"{len(self.radio_chan_pairs)} radio channels were requested to be "
+ f"used with {len(self.replay_ports)} replay ports on "
+ f"Replay block {self.replay_blocks[0].get_unique_id()}!"
+ "There must be at least as many replay block ports as radio channels.")
+
+ # Disconnect the ports of the replay block that we want to use later on
+ active_conns = rfnoc_graph.enumerate_active_connections()
+ for conn in active_conns:
+ # If any of the connections that we require are taken already we will
+ # disconnect them here. That is especially important if both the transmitter
+ # and the receiver should be used.
+ if (conn.src_blockid == self.replay_blocks[0].get_unique_id() and \
+ conn.src_port in self.replay_ports) or \
+ (conn.dst_blockid == self.replay_blocks[0].get_unique_id() and \
+ conn.dst_port in self.replay_ports):
+ if "Streamer" in conn.src_blockid:
+ # Need to use streamer disconnect method for streamers
+ rfnoc_graph.disconnect(
+ conn.dst_blockid, conn.dst_port
+ )
+ elif "Streamer" in conn.dst_blockid:
+ pass
+ else:
+ # Disconnect for all other blocks
+ rfnoc_graph.disconnect(
+ conn.src_blockid,
+ conn.src_port,
+ conn.dst_blockid,
+ conn.dst_port,
+ )
+
+ self.ddc_chan_pairs = []
+ for replay_port_idx, rcp in enumerate(self.radio_chan_pairs):
+ edge = rfnoc.connect_through_blocks(
+ rfnoc_graph,
+ rcp[0].get_unique_id(), rcp[1],
+ self.replay_blocks[0].get_unique_id(), self.replay_ports[replay_port_idx], True)
+ ddc_edge = \
+ next(filter(lambda edge: rfnoc.BlockID(edge.dst_blockid).match("DDC"), edge), None)
+ if ddc_edge:
+ self.ddc_chan_pairs.append((
+ rfnoc.DdcBlockControl(rfnoc_graph.get_block(ddc_edge.dst_blockid)),
+ ddc_edge.dst_port
+ ))
+ else:
+ self.ddc_chan_pairs.append(None)
+
+ # Assign memory regions to each input port (and leave space for potential output ports)
+ if mem_regions is None:
+ mem_stride = self.replay_blocks[0].get_mem_size() \
+ // self.replay_blocks[0].get_num_input_ports()
+ mem_stride -= mem_stride % self.word_size
+ self.mem_regions = [
+ (idx * mem_stride, mem_stride)
+ for idx in range(self.replay_blocks[0].get_num_input_ports())
+ ]
+ else:
+ self.mem_regions = self._sanitize_mem_regions(mem_regions)
+ # This stores how much data we have currently downloaded to memory. We
+ # initialize this with the full memory, not zero.
+ self.download_size = [x[1] for x in self.mem_regions]
+
+
+ def _sanitize_replay_ports(self, ports):
+ """
+ Helper function to turn ports into a valid list of ports on the replay
+ block.
+ """
+ if ports is None:
+ ports = self.replay_ports
+ if isinstance(ports, int):
+ ports = [ports]
+ if any((port >= self.replay_blocks[0].get_num_input_ports() for port in ports)):
+ raise RuntimeError(
+ f"Invalid input port on replay block! Available ports: "
+ f"{self.replay_blocks[0].get_num_input_ports()} Requested: "
+ f"{ports}")
+ return ports
+
+ def _sanitize_mem_regions(self, custom_mem_regions=None):
+ """
+ Helper function to make a list of memory regions valid for this replay
+ block and with given custom memory regions.
+ """
+ # If called without custom regions just return what we already have
+ if custom_mem_regions is None:
+ return self.mem_regions
+ mem_regions = [(0,0)] * self.replay_blocks[0].get_num_input_ports()
+ # If custom region is big enough for all possible ports, use it
+ if len(mem_regions) == len(custom_mem_regions):
+ return custom_mem_regions
+ # Otherwise we have to distribute the given regions among the available ports
+ for idx, mem_region in enumerate(custom_mem_regions):
+ if idx < len(self.replay_ports):
+ mem_regions[self.replay_ports[idx]] = mem_region
+ return mem_regions
+
+ def _download(self, waveform, mem_start, mem_size):
+ """
+ Download helper function
+
+ Arguments:
+ waveform: 1-dimensional numpy array with waveform data. Must be of the
+ same data type as specified during the constructor.
+ mem_start: Memory address where the data should be downloaded from
+ mem_size: Amount of available memory. This is the maximum length that
+ a captured waveform can have.
+ """
+ # Sanitize parameters
+ assert mem_start < self.replay_blocks[0].get_mem_size(), \
+ f"Invalid memory start location: {mem_start}"
+ assert mem_start + mem_size <= self.replay_blocks[0].get_mem_size(), \
+ f"Invalid memory range: {mem_start} - {mem_start + mem_size}"
+ assert mem_start % self.word_size == 0, \
+ f"Memory region start (0x{mem_start:X}) is not aligned with " \
+ f"word size ({self.word_size})!"
+ assert mem_size % self.word_size == 0, \
+ f"Memory region size (0x{mem_size:X}) is not aligned with " \
+ f"word size ({self.word_size})!"
+ num_items = min(len(waveform), int(mem_size) // self.bytes_per_sample)
+ num_bytes = num_items * self.bytes_per_sample
+ out_port = self.replay_out_port
+ stream_cmd = StreamCMD(StreamMode.num_done)
+ stream_cmd.num_samps = num_items
+ stream_cmd.time_spec = TimeSpec(0.0)
+ self.replay_blocks[0].config_play(mem_start, num_bytes, out_port)
+ self.rx_streamer.issue_stream_cmd(stream_cmd)
+
+ if not self.receive_metadata:
+ self.receive_metadata = RXMetadata()
+ num_items = self.rx_streamer.recv(waveform, self.receive_metadata, 15.0)
+ if self.receive_metadata.error_code != RXMetadataErrorCode.none:
+ # While the error code might be overwritten by the next call to _download(),
+ # returning 0 will lead to recv() returning 0, too, which indicates an error.
+ return 0
+ return num_items
+
+ def download(self, waveform, ports=None, mem_regions=None):
+ """
+ Download a waveform from memory to host
+
+ Arguments:
+ ports: If this argument is given, then we use the mem_regions attribute
+ of this class to identify where the waveform is stored. If ports
+ is a list, then the waveform will be downloaded once per port from
+ the corresponding memory region.
+ mem_regions: If this argument is given, ports is ignored. This will
+ directly specify the memory regions stored in this object.
+ NOTE: This class attempts to keep track of how many samples
+ have been sampled into memory. If mem_regions is provided,
+ then the mapping between ports and memory regions is lost,
+ and this class cannot know how many samples are available
+ for a given port. Therefore, use this argument to
+ deliberately override where sample data should be read from.
+
+ If port is not specified, download from all ports.
+ """
+ if mem_regions:
+ ports = None
+ else:
+ ports = self._sanitize_replay_ports(ports)
+ # Sanitize again in case self.mem_regions was changed from the outside
+ mem_regions = self._sanitize_mem_regions(self.mem_regions)
+ mem_regions = [ mem_regions[port] for port in ports]
+ if len(mem_regions) == 2 and \
+ isinstance(mem_regions, (tuple, list)) and \
+ isinstance(mem_regions[0], int):
+ mem_regions = [mem_regions]
+
+ # If this method is called from recv() then we will work on a per-channel
+ # basis. Otherwise we will walk through the mem_regions that we have put
+ # together above.
+ if len(waveform.shape) == 1:
+ bytes_downloaded = self._download(waveform, *mem_regions[0])
+ self.download_size[ports[0]] = bytes_downloaded
+ else:
+ for region_idx, mem_region in enumerate(mem_regions):
+ if ports is None or region_idx < len(self.radio_chan_pairs):
+ bytes_downloaded = self._download(waveform[region_idx], *mem_region)
+ if ports:
+ self.download_size[ports[region_idx]] = bytes_downloaded
+
+ def issue_stream_cmd(self, stream_cmd, ports=None):
+ """
+ Issue a command to start or stop the streaming to DRAM.
+
+ If ports is not specified, issue the stream command on all ports.
+ """
+ assert stream_cmd.stream_mode == StreamMode.num_done, \
+ f"Invalid stream mode: {stream_cmd.stream_mode}"
+ ports = self._sanitize_replay_ports(ports)
+ mem_regions = self._sanitize_mem_regions(self.mem_regions)
+ # Create a copy of the pointer to the original stream command to be able to edit it in case
+ # we have to adjust the number of samples that the radio will send.
+ tmp_stream_cmd = stream_cmd
+ for idx, rcp in enumerate(self.radio_chan_pairs):
+ stream_cmd = tmp_stream_cmd
+ # Flush data on output buffer
+ flush_timeout = time.monotonic() + .25
+ while time.monotonic() < flush_timeout:
+ if self.replay_blocks[0].get_record_fullness(ports[idx]) == 0:
+ break
+ self.replay_blocks[0].record_restart(ports[idx])
+ mem_region = mem_regions[ports[idx]]
+ mem_size = min(stream_cmd.num_samps * self.bytes_per_sample, mem_region[1])
+ self.replay_blocks[0].record(mem_region[0], mem_size, ports[idx])
+ # In case we're using a DDC, we need to adjust the number of samples that the radio
+ # will send, so that after down-converting it meets what the replay block expects.
+ if self.ddc_chan_pairs[idx]:
+ rate_ratio = self.ddc_chan_pairs[idx][0].get_input_rate(
+ self.ddc_chan_pairs[idx][1]) / \
+ self.ddc_chan_pairs[idx][0].get_output_rate(
+ self.ddc_chan_pairs[idx][1])
+ stream_cmd = StreamCMD(StreamMode.num_done)
+ stream_cmd.num_samps = int(tmp_stream_cmd.num_samps * rate_ratio)
+ stream_cmd.stream_now = tmp_stream_cmd.stream_now
+ stream_cmd.time_spec = tmp_stream_cmd.time_spec
+ rcp[0].issue_stream_cmd(stream_cmd, rcp[1])
+
+ timeout = time.monotonic() + 15.0
+ while any((self.replay_blocks[0].get_record_fullness(ports[idx]) < mem_size
+ for idx, _ in enumerate(self.radio_chan_pairs))):
+ time.sleep(0.200)
+ if time.monotonic() > timeout:
+ raise RuntimeError("Timeout while loading replay buffer!")
+
+ def recv(self, data, metadata, timeout=0.1):
+ """
+ This is a wrapper around download() that can be used to use this class
+ like you would use an RxStreamer object.
+
+ Unlike RxStreamer.recv(), you cannot call this repeatedly with small
+ packets. Every call to this method will overwrite the previous call's
+ data.
+
+ Unlike RxStreamer.recv(), DramReceiver.recv() streams the received data
+ from the replay block to the host on a per-channel basis. Therefore it
+ generates RxMetadata for each channel which is not merged into a single
+ metadata object. Instead, the last received metadata object is stored.
+ If no error occured this should be the same for all channels. If an error
+ occurs, the _download() method will throw an error which differs from the
+ behavior of the RxStreamer.recv() method.
+
+ The timeout parameter is unused, it is only there to retain the call
+ signature compatibility. Time specs are pulled from the RX metadata object.
+ """
+ num_chans = len(self.radio_chan_pairs)
+ self.receive_metadata = metadata
+ if num_chans == 1:
+ self.download(data, self.replay_ports[0])
+ else:
+ for idx, _ in enumerate(self.radio_chan_pairs):
+ self.download(data[idx], self.replay_ports[idx])
+ return min(self.download_size)