aboutsummaryrefslogtreecommitdiffstats
path: root/host/python/uhd/usrp/dram_utils.py
blob: e20138e9ff4122c3b9421acec36034e16e22278e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
#
# Copyright 2023 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
UHD Extension, helpers, and utilities for doing more with the available PL DRAM
"""

import time
from uhd import rfnoc
from uhd.usrp import StreamArgs
from uhd.types import TXMetadata, RXMetadata, RXMetadataErrorCode, StreamMode, StreamCMD, TimeSpec

def enumerate_radios(graph, radio_chans):
    """
    Return a list of radio block controllers/chan pairs to use for this test.
    """
    def unpack_rcp_spec(rcps):
        """
        Convert a radio/channel pair specification into a tuple of form
        (radio_block_id, radio_chan).

        Valid inputs (and their corresponding outputs are:
        - "0/Radio#0:0" -> ("0/Radio#0", 0)
        - "0/Radio#0" -> ("0/Radio#0", 0)      [Channel 0 is chosen as default)
        - ("0/Radio#0", 0) -> ("0/Radio#0", 0)
        """
        if isinstance(rcps, str):
            return (rcps.split(':', 2)[0], int(rcps.split(':', 2)[1])) \
                   if ':' in rcps else (rcps, 0)
        if isinstance(rcps, tuple) and len(rcps) == 2:
            return rcps
        raise RuntimeError(f"Unknown radio channel pair specification: {rcps}")
    radio_id_chan_pairs = [unpack_rcp_spec(r) for r in radio_chans]
    # Sanity checks
    available_radios = graph.find_blocks("Radio")
    radio_chan_pairs = []
    for rcp in radio_id_chan_pairs:
        if rcp[0] not in available_radios:
            raise RuntimeError(f"'{rcp[0]}' is not a valid radio block ID!")
        radio_chan_pairs.append((rfnoc.RadioControl(graph.get_block(rcp[0])), rcp[1]))
    return radio_chan_pairs

def find_replay_block(graph, replay_blockid):
    """
    Find any or a specific replay block
    """
    if replay_blockid is None:
        blocklist = graph.find_blocks("Replay")
        if not blocklist:
            raise RuntimeError("No Replay block found on RFNoC graph!")
        replay_blockid = blocklist[0]
    return rfnoc.ReplayBlockControl(graph.get_block(replay_blockid))

class DramTransmitter:
    """
    Helper class to stream data from DRAM to one or more radios.

    This can be useful when setting up a USRP as a transmitter, with a waveform
    preloaded into memory.

    NOTE: This assumes we are using a single memory bank, until UHD is upgraded
    to better handle multiple memory banks.

    Arguments:
    rfnoc_graph -- The graph object
    radio_chan -- A list of radio channels to connect to the DRAM, in one of
                  these formats:
                  - "0/Radio:0#0" (single string)
                  - "0/Radio:0" (single string, channel 0 by default)
                  - ("0/Radio#0", 0) (tuple of block id and channel number)
    replay_blockid -- If specified, use this replay block
    replay_ports -- List of replay ports to use. Must be at least as long as the 
                    list of radio channels. If not specified, use all ports.
                    Therefore this should be used if TX and RX are going to be used.
    cpu_format -- For the upload process, the data format to be used
    mem_regions -- A list of (memory start, memory size) tuples, one per replay block port.
                   If left out, the memory is split up evenly among available replay block ports.
    """
    def __init__(self,
                 rfnoc_graph,
                 radio_chans,
                 replay_blockid=None,
                 replay_ports=None,
                 cpu_format='fc32',
                 mem_regions=None,
                 ):
        # We make replay_blocks a list so we can support multiple replay blocks
        # (not only on multiple motherboards) in the future without changing APIs
        self.replay_blocks = [find_replay_block(rfnoc_graph, replay_blockid)]
        self.word_size = max(x.get_word_size() for x in self.replay_blocks)
        # In the radio, we always use sc16 regardless of cpu_format
        self.bytes_per_sample = 4
        if replay_ports is None:
            replay_ports = list(range(len(radio_chans)))
        self.replay_ports = replay_ports

        self.reconnect(rfnoc_graph, radio_chans, mem_regions)
        # Since for multi-channel we nevertheless only use one input port to upload the 
        # data we always take the first of the given replay ports
        self.replay_in_port = self.replay_ports[0]

        stream_args = StreamArgs(cpu_format, "sc16")
        self.tx_streamer = rfnoc_graph.create_tx_streamer(1, stream_args)

        rfnoc_graph.connect(
            self.tx_streamer, 0,
            self.replay_blocks[0].get_unique_id(), self.replay_in_port)
        rfnoc_graph.commit()

    def reconnect(self, rfnoc_graph, radio_chans, mem_regions=None):
        """
        Reconnect the replay block to a new set of radio channels.
        """
        self.replay_ports = self._sanitize_replay_ports(self.replay_ports)
        # Map requested radio_chans to the outputs of this replay block
        self.radio_chan_pairs = enumerate_radios(rfnoc_graph, radio_chans)
        if len(self.replay_ports) < len(self.radio_chan_pairs):
            raise RuntimeError(
                f"{len(self.radio_chan_pairs)} radio channels were requested to be "
                f"used with {len(self.replay_ports)} replay ports on "
                f"Replay block {self.replay_blocks[0].get_unique_id()}! "
                "There must be at least as many replay block ports as radio channels.")

        # Disconnect the ports of the replay block that we want to use later on
        active_conns = rfnoc_graph.enumerate_active_connections()
        for conn in active_conns:
            # If any of the connections that we require are taken already we will
            # disconnect them here. That is especially important if both the transmitter
            # and the receiver should be used.
            if (conn.src_blockid == self.replay_blocks[0].get_unique_id() and \
                conn.src_port in self.replay_ports) or \
               (conn.dst_blockid == self.replay_blocks[0].get_unique_id() and \
                conn.dst_port in self.replay_ports):
                if "Streamer" in conn.dst_blockid:
                    # Need to use streamer disconnect method for streamers
                    rfnoc_graph.disconnect(
                        conn.dst_blockid, conn.dst_port
                    )
                elif "Streamer" in conn.src_blockid:
                    pass
                else:
                    # Disconnect for all other kinds of blocks
                    rfnoc_graph.disconnect(
                        conn.src_blockid,
                        conn.src_port,
                        conn.dst_blockid,
                        conn.dst_port,
                    )

        self.duc_chan_pairs = []
        for replay_port_idx, rcp in enumerate(self.radio_chan_pairs):
            edge = rfnoc.connect_through_blocks(
                rfnoc_graph,
                self.replay_blocks[0].get_unique_id(), self.replay_ports[replay_port_idx],
                rcp[0].get_unique_id(), rcp[1], True)
            duc_edge = \
                next(filter(lambda edge: rfnoc.BlockID(edge.dst_blockid).match("DUC"), edge), None)
            if duc_edge:
                self.duc_chan_pairs.append((
                    rfnoc.DucBlockControl(rfnoc_graph.get_block(duc_edge.dst_blockid)),
                    duc_edge.dst_port
                ))
            else:
                self.duc_chan_pairs.append(None)
        # Assign memory regions to each output port (and leave space for potential input ports)
        if mem_regions is None:
            mem_stride = self.replay_blocks[0].get_mem_size() \
                // self.replay_blocks[0].get_num_output_ports()
            mem_stride -= mem_stride % self.word_size
            self.mem_regions = [
                (idx * mem_stride, mem_stride)
                for idx in range(self.replay_blocks[0].get_num_output_ports())
            ]
        else:
            # Create mem_regions list prepopulated with 0 and then fill it with the
            # given mem_regions. This way we can also handle the case where only
            # a subset of the available replay ports is used.
            self.mem_regions = self._sanitize_mem_regions(mem_regions)
        # This stores how much data we have currently uploaded to memory. We
        # initialize this with the full memory, not zero.
        self.upload_size = [x[1] for x in self.mem_regions]


    def _sanitize_replay_ports(self, ports):
        """
        Helper function to turn ports into a valid list of ports on the replay
        block.
        """
        if ports is None:
            ports = self.replay_ports
        if isinstance(ports, int):
            ports = [ports]
        if any((port >= self.replay_blocks[0].get_num_output_ports() for port in ports)):
            raise RuntimeError(
                    f"Invalid output port on replay block! Available ports: "
                    f"{self.replay_blocks[0].get_num_output_ports()} Requested: "
                    f"{ports}")
        return ports

    def _sanitize_mem_regions(self, custom_mem_regions=None):
        """
        Helper function to make a list of memory regions valid for this replay
        block and with given custom memory regions.
        """
        # If called without custom regions just return what we already have
        if custom_mem_regions is None:
            return self.mem_regions
        mem_regions = [(0,0)] * self.replay_blocks[0].get_num_output_ports()
        # If custom region is big enough for all possible ports, use it
        if len(mem_regions) == len(custom_mem_regions):
            return custom_mem_regions
        # Otherwise we have to distribute the given regions among the available ports
        for idx, mem_region in enumerate(custom_mem_regions):
            if idx < len(self.replay_ports):
                mem_regions[self.replay_ports[idx]] = mem_region
        return mem_regions

    def _upload(self, waveform, mem_start, mem_size):
        """
        Upload helper function

        Arguments:
        waveform: 1-dimensional numpy array with waveform data. Must be of the
                  same data type as specified during the constructor.
        mem_start: Memory address where the data should be uploaded to
        mem_size: Amount of available memory. If the data in waveforms exceeds
                  this value, then waveform will be only partially uploaded.
        """
        # Sanitize parameters
        assert mem_start < self.replay_blocks[0].get_mem_size(), \
            f"Invalid memory start location: {mem_start}"
        assert mem_start + mem_size <= self.replay_blocks[0].get_mem_size(), \
            f"Invalid memory range: {mem_start} - {mem_start + mem_size}"
        assert mem_start % self.word_size == 0, \
            f"Memory region start (0x{mem_start:X}) is not aligned with " \
            f"word size ({self.word_size})!"
        assert mem_size % self.word_size == 0, \
            f"Memory region size (0x{mem_size:X}) is not aligned with " \
            f"word size ({self.word_size})!"
        num_items = min(len(waveform), int(mem_size) // self.bytes_per_sample)
        waveform = waveform[:num_items]

        num_bytes = num_items * self.bytes_per_sample
        in_port = self.replay_in_port
        # Configure DRAM block for recording
        self.replay_blocks[0].record(mem_start, num_bytes, in_port)
        # Flush data on input buffer
        flush_timeout = time.monotonic() + .25
        while time.monotonic() < flush_timeout:
            if self.replay_blocks[0].get_record_fullness(in_port) == 0:
                break
            self.replay_blocks[0].record_restart(in_port)
        # Upload data
        tx_md = TXMetadata()
        tx_md.start_of_burst = True
        tx_md.end_of_burst = True
        if self.tx_streamer.send(waveform, tx_md, 10.0) != num_items:
            raise RuntimeError("Unable to upload all data without errors!")
        # Make sure DRAM is fully populated
        upload_timeout = time.monotonic() + 20.0
        while time.monotonic() < upload_timeout and \
                self.replay_blocks[0].get_record_fullness(in_port) < num_bytes:
            time.sleep(.05)
        fullness = self.replay_blocks[0].get_record_fullness(in_port)
        if fullness != num_bytes:
            raise RuntimeError(
                f"DRAM fullness did not reach expected levels! "
                f"{fullness}/{num_bytes} bytes.")
        return num_bytes

    def upload(self, waveform, ports=None, mem_regions=None):
        """
        Store a waveform to memory

        Arguments:
        ports: If this argument is given, then we use the mem_regions attribute
               of this class to identify where to store the waveform. If ports
               is a list, then the waveform will be uploaded once per port to
               the corresponding memory region.
        mem_regions: If this argument is given, ports is ignored. This will
                     directly specify the memory regions stored in this object.
                     NOTE: This class attempts to keep track of how many samples
                     have been uploaded into memory. If mem_regions is provided,
                     then the mapping between ports and memory regions is lost,
                     and this class cannot know how many samples are available
                     for a given port. Therefore, use this argument to
                     deliberately override where sample data should be stored,
                     e.g., to override sections of previously uploaded sample
                     data.

        If port is not specified, upload to all ports.
        """
        if mem_regions:
            ports = None
        else:
            ports = self._sanitize_replay_ports(ports)
            # Sanitize again in case self.mem_regions was changed from the outside
            mem_regions = self._sanitize_mem_regions(self.mem_regions)
            mem_regions = [ mem_regions[port] for port in ports]
        if len(mem_regions) == 2 and \
                isinstance(mem_regions, (tuple, list)) and \
                isinstance(mem_regions[0], int):
            mem_regions = [mem_regions]

        # If this method is called from send() then we will work on a per-channel
        # basis. Otherwise we will walk through the mem_regions that we have put
        # together above.
        if len(waveform.shape) == 1:
            waveform = [waveform] * len(mem_regions)
        if len(waveform) < len(mem_regions):
            raise RuntimeError("Number of waveforms in waveform array does not match "
                               "the number of memory regions!")
        for region_idx, mem_region in enumerate(mem_regions):
            # Since by default we slice the dram per input/output port, we only
            # want to upload to the memory regions that we are actually using.
            if ports is None or region_idx < len(ports):
                bytes_uploaded = self._upload(waveform[region_idx], *mem_region)
                if ports:
                    self.upload_size[ports[region_idx]] = bytes_uploaded

    def issue_stream_cmd(self, stream_cmd, ports=None):
        """
        Issue a command to start or stop the streaming from DRAM.

        If ports is not specified, issue the stream command on all ports.

        Under the hood, this will call replay_block_control::config_play() (if
        the stream command is a start-command) and replay_block_control::issue_stream_cmd().

        Note that this will configure the replay block for playback every time
        it is called. If upload() was called previously, then this class knows
        how many samples are available for a given port, and it will only use
        the occupied memory space. This means that if a subset of the available
        memory region for a port is occupied with valid sample data, and the
        number of samples requested in this stream command is either unlimited
        or larger than the available sample data, the replay block will correctly
        loop around the available sample data.
        """
        ports = self._sanitize_replay_ports(ports)
        mem_regions = self._sanitize_mem_regions(self.mem_regions)
        for port in ports:
            if stream_cmd.stream_mode in (
                    StreamMode.start_cont,
                    StreamMode.num_done,
                    StreamMode.num_more):
                mem_region = mem_regions[port]
                mem_size = min(self.upload_size[port], mem_region[1])
                self.replay_blocks[0].config_play(mem_region[0], mem_size, port)
            self.replay_blocks[0].issue_stream_cmd(stream_cmd, port)

    def send(self, data, metadata, timeout=0.1):
        """
        This is a wrapper around upload() and issue_stream_cmd() that can be
        used to use this class like you would use a TxStreamer object.

        Compared to calling those two functions directly, this is less flexible.
        It requires 'data' to be of the right shape (meaning that it needs as
        many sub-arrays as there are channels, just like when calling send() on
        a TxStreamer object), and it will only stream the data once (again, as
        if with a TX streamer object).

        Unlike TxStreamer.send(), you cannot call this repeatedly with small
        packets. Every call to this method will overwrite the previous call's
        data.

        The timeout parameter is unused, it is only there to retain the call
        signature compatibility. Time specs are pulled from the TX metadata object.
        """
        num_chans = len(self.radio_chan_pairs)
        num_samps = len(data) if len(data.shape) == 1 else data.shape[1]
        if num_samps == 0 or any((x == 0 for x in data.shape)):
            # When streaming to radio, we sometimes send an empty burst with an
            # EOB to receive the ACK. With the DRAM replay, this doesn't work so
            # well, and we always send an EOB from here anyway, so we just
            # pretend like we did this.
            return 0
        if (num_chans > 1 and len(data.shape) != 2) or (data.shape[0] < num_chans):
            raise RuntimeError(
                f"Number of TX channels {num_chans} does not match the dimensions "
                f"of the data array ({data.shape[0] if len(data.shape) == 2 else 1})")
        # First upload the data
        if num_chans == 1:
            if len(data.shape) == 2:
                data = data[0]
            self.upload(data, self.replay_ports[0])
        else:
            for chan, _ in enumerate(self.radio_chan_pairs):
                self.upload(data[chan], self.replay_ports[chan])
        # Then trigger stream command
        stream_cmd = StreamCMD(StreamMode.num_done)
        stream_cmd.stream_now = not metadata.has_time_spec
        stream_cmd.time_spec = metadata.time_spec
        stream_cmd.num_samps = num_samps
        # This defaults to "all ports"
        self.issue_stream_cmd(stream_cmd)
        return num_samps

    def recv_async_msg(self, timeout=0.1):
        """
        This emulates TxStreamer.recv_async_msg().
        """
        return self.replay_blocks[0].get_play_async_metadata(timeout)

class DramReceiver:
    """
    Helper class to stream data from one or more radios to DRAM.

    This can be useful when setting up a USRP as a receiver.

    NOTE: This assumes we are using a single memory bank, until UHD is upgraded
    to better handle multiple memory banks.

    Arguments:
    rfnoc_graph -- The graph object
    radio_chans -- A list of radio channels to connect to the DRAM, in one of
                  these formats:
                  - "0/Radio:0#0" (single string)
                  - "0/Radio:0" (single string, channel 0 by default)
                  - ("0/Radio#0", 0) (tuple of block id and channel number)
    replay_blockid -- If specified, use this replay block
    replay_ports -- List of replay ports to use. Must be at least as long as the 
                    list of radio channels. If not specified, use all ports.
                    Therefore this should be used if TX and RX are going to be used.
    cpu_format -- Desired data format of the downloaded, received data on the host.
    mem_regions -- A list of (memory start, memory size) tuples, one per replay block port.
                   If left out, the memory is split up evenly among available replay block ports.
    throttle -- Throttle factor for the streamer. This is a value between 0 and
                1 or a percentage in the range (0%, 100%] that is passed as string.
    """
    def __init__(self,
                 rfnoc_graph,
                 radio_chans,
                 replay_blockid=None,
                 replay_ports=None,
                 cpu_format='fc32',
                 mem_regions=None,
                 throttle="0.1"
                 ):
        # We make replay_blocks a list so we can support multiple replay blocks
        # (not only on multiple motherboards) in the future without changing APIs
        self.replay_blocks = [find_replay_block(rfnoc_graph, replay_blockid)]
        self.word_size = max(x.get_word_size() for x in self.replay_blocks)
        # In the radio, we always use sc16 regardless of cpu_format
        self.bytes_per_sample = 4
        stream_args = StreamArgs(cpu_format, "sc16")
        stream_args.args['throttle'] = throttle
        if replay_ports is None:
            replay_ports = list(range(len(radio_chans)))
        self.replay_ports = replay_ports
        self.receive_metadata = None

        self.reconnect(rfnoc_graph, radio_chans, mem_regions)
        # We only use the first of the given replay ports to download the data sequentially.
        self.replay_out_port = self.replay_ports[0]

        self.rx_streamer = rfnoc_graph.create_rx_streamer(1, stream_args)

        rfnoc_graph.connect(
            self.replay_blocks[0].get_unique_id(), self.replay_out_port,
            self.rx_streamer, 0)
        rfnoc_graph.commit()

    def reconnect(self, rfnoc_graph, radio_chans, mem_regions=None):
        """
        Reconnect the replay block to a new set of radio channels.
        """
        self.replay_ports = self._sanitize_replay_ports(self.replay_ports)
        # Map requested radio_chans to the inputs of this replay block
        self.radio_chan_pairs = enumerate_radios(rfnoc_graph, radio_chans)
        if len(self.replay_ports) < len(self.radio_chan_pairs):
            raise RuntimeError(
                f"{len(self.radio_chan_pairs)} radio channels were requested to be "
                f"used with {len(self.replay_ports)} replay ports on "
                f"Replay block {self.replay_blocks[0].get_unique_id()}!"
                "There must be at least as many replay block ports as radio channels.")

        # Disconnect the ports of the replay block that we want to use later on
        active_conns = rfnoc_graph.enumerate_active_connections()
        for conn in active_conns:
            # If any of the connections that we require are taken already we will
            # disconnect them here. That is especially important if both the transmitter
            # and the receiver should be used.
            if (conn.src_blockid == self.replay_blocks[0].get_unique_id() and \
                conn.src_port in self.replay_ports) or \
               (conn.dst_blockid == self.replay_blocks[0].get_unique_id() and \
                conn.dst_port in self.replay_ports):
                if "Streamer" in conn.src_blockid:
                    # Need to use streamer disconnect method for streamers
                    rfnoc_graph.disconnect(
                        conn.dst_blockid, conn.dst_port
                    )
                elif "Streamer" in conn.dst_blockid:
                    pass
                else:
                    # Disconnect for all other blocks
                    rfnoc_graph.disconnect(
                        conn.src_blockid,
                        conn.src_port,
                        conn.dst_blockid,
                        conn.dst_port,
                    )

        self.ddc_chan_pairs = []
        for replay_port_idx, rcp in enumerate(self.radio_chan_pairs):
            edge = rfnoc.connect_through_blocks(
                rfnoc_graph,
                rcp[0].get_unique_id(), rcp[1],
                self.replay_blocks[0].get_unique_id(), self.replay_ports[replay_port_idx], True)
            ddc_edge = \
                next(filter(lambda edge: rfnoc.BlockID(edge.dst_blockid).match("DDC"), edge), None)
            if ddc_edge:
                self.ddc_chan_pairs.append((
                    rfnoc.DdcBlockControl(rfnoc_graph.get_block(ddc_edge.dst_blockid)),
                    ddc_edge.dst_port
                ))
            else:
                self.ddc_chan_pairs.append(None)

        # Assign memory regions to each input port (and leave space for potential output ports)
        if mem_regions is None:
            mem_stride = self.replay_blocks[0].get_mem_size() \
                // self.replay_blocks[0].get_num_input_ports()
            mem_stride -= mem_stride % self.word_size
            self.mem_regions = [
                (idx * mem_stride, mem_stride)
                for idx in range(self.replay_blocks[0].get_num_input_ports())
            ]
        else:
            self.mem_regions = self._sanitize_mem_regions(mem_regions)
        # This stores how much data we have currently downloaded to memory. We
        # initialize this with the full memory, not zero.
        self.download_size = [x[1] for x in self.mem_regions]


    def _sanitize_replay_ports(self, ports):
        """
        Helper function to turn ports into a valid list of ports on the replay
        block.
        """
        if ports is None:
            ports = self.replay_ports
        if isinstance(ports, int):
            ports = [ports]
        if any((port >= self.replay_blocks[0].get_num_input_ports() for port in ports)):
            raise RuntimeError(
                    f"Invalid input port on replay block! Available ports: "
                    f"{self.replay_blocks[0].get_num_input_ports()} Requested: "
                    f"{ports}")
        return ports

    def _sanitize_mem_regions(self, custom_mem_regions=None):
        """
        Helper function to make a list of memory regions valid for this replay
        block and with given custom memory regions.
        """
        # If called without custom regions just return what we already have
        if custom_mem_regions is None:
            return self.mem_regions
        mem_regions = [(0,0)] * self.replay_blocks[0].get_num_input_ports()
        # If custom region is big enough for all possible ports, use it
        if len(mem_regions) == len(custom_mem_regions):
            return custom_mem_regions
        # Otherwise we have to distribute the given regions among the available ports
        for idx, mem_region in enumerate(custom_mem_regions):
            if idx < len(self.replay_ports):
                mem_regions[self.replay_ports[idx]] = mem_region
        return mem_regions

    def _download(self, waveform, mem_start, mem_size):
        """
        Download helper function

        Arguments:
        waveform: 1-dimensional numpy array with waveform data. Must be of the
                  same data type as specified during the constructor.
        mem_start: Memory address where the data should be downloaded from
        mem_size: Amount of available memory. This is the maximum length that 
                  a captured waveform can have.
        """
        # Sanitize parameters
        assert mem_start < self.replay_blocks[0].get_mem_size(), \
            f"Invalid memory start location: {mem_start}"
        assert mem_start + mem_size <= self.replay_blocks[0].get_mem_size(), \
            f"Invalid memory range: {mem_start} - {mem_start + mem_size}"
        assert mem_start % self.word_size == 0, \
            f"Memory region start (0x{mem_start:X}) is not aligned with " \
            f"word size ({self.word_size})!"
        assert mem_size % self.word_size == 0, \
            f"Memory region size (0x{mem_size:X}) is not aligned with " \
            f"word size ({self.word_size})!"
        num_items = min(len(waveform), int(mem_size) // self.bytes_per_sample)
        num_bytes = num_items * self.bytes_per_sample
        out_port = self.replay_out_port
        stream_cmd = StreamCMD(StreamMode.num_done)
        stream_cmd.num_samps = num_items
        stream_cmd.time_spec = TimeSpec(0.0)
        self.replay_blocks[0].config_play(mem_start, num_bytes, out_port)
        self.rx_streamer.issue_stream_cmd(stream_cmd)

        if not self.receive_metadata:
            self.receive_metadata = RXMetadata()
        num_items = self.rx_streamer.recv(waveform, self.receive_metadata, 15.0)
        if self.receive_metadata.error_code != RXMetadataErrorCode.none:
            # While the error code might be overwritten by the next call to _download(),
            # returning 0 will lead to recv() returning 0, too, which indicates an error.
            return 0
        return num_items

    def download(self, waveform, ports=None, mem_regions=None):
        """
        Download a waveform from memory to host

        Arguments:
        ports: If this argument is given, then we use the mem_regions attribute
               of this class to identify where the waveform is stored. If ports
               is a list, then the waveform will be downloaded once per port from
               the corresponding memory region.
        mem_regions: If this argument is given, ports is ignored. This will
                     directly specify the memory regions stored in this object.
                     NOTE: This class attempts to keep track of how many samples
                     have been sampled into memory. If mem_regions is provided,
                     then the mapping between ports and memory regions is lost,
                     and this class cannot know how many samples are available
                     for a given port. Therefore, use this argument to
                     deliberately override where sample data should be read from.

        If port is not specified, download from all ports.
        """
        if mem_regions:
            ports = None
        else:
            ports = self._sanitize_replay_ports(ports)
            # Sanitize again in case self.mem_regions was changed from the outside
            mem_regions = self._sanitize_mem_regions(self.mem_regions)
            mem_regions = [ mem_regions[port] for port in ports]
        if len(mem_regions) == 2 and \
                isinstance(mem_regions, (tuple, list)) and \
                isinstance(mem_regions[0], int):
            mem_regions = [mem_regions]

        # If this method is called from recv() then we will work on a per-channel
        # basis. Otherwise we will walk through the mem_regions that we have put
        # together above.
        if len(waveform.shape) == 1:
            bytes_downloaded = self._download(waveform, *mem_regions[0])
            self.download_size[ports[0]] = bytes_downloaded
        else:
            for region_idx, mem_region in enumerate(mem_regions):
                if ports is None or region_idx < len(self.radio_chan_pairs):
                    bytes_downloaded = self._download(waveform[region_idx], *mem_region)
                    if ports:
                        self.download_size[ports[region_idx]] = bytes_downloaded

    def issue_stream_cmd(self, stream_cmd, ports=None):
        """
        Issue a command to start or stop the streaming to DRAM.

        If ports is not specified, issue the stream command on all ports.
        """
        assert stream_cmd.stream_mode == StreamMode.num_done, \
            f"Invalid stream mode: {stream_cmd.stream_mode}"
        ports = self._sanitize_replay_ports(ports)
        mem_regions = self._sanitize_mem_regions(self.mem_regions)
        # Create a copy of the pointer to the original stream command to be able to edit it in case
        # we have to adjust the number of samples that the radio will send.
        tmp_stream_cmd = stream_cmd
        for idx, rcp in enumerate(self.radio_chan_pairs):
            stream_cmd = tmp_stream_cmd
            # Flush data on output buffer
            flush_timeout = time.monotonic() + .25
            while time.monotonic() < flush_timeout:
                if self.replay_blocks[0].get_record_fullness(ports[idx]) == 0:
                    break
                self.replay_blocks[0].record_restart(ports[idx])
            mem_region = mem_regions[ports[idx]]
            mem_size = min(stream_cmd.num_samps * self.bytes_per_sample, mem_region[1])
            self.replay_blocks[0].record(mem_region[0], mem_size, ports[idx])
            # In case we're using a DDC, we need to adjust the number of samples that the radio
            # will send, so that after down-converting it meets what the replay block expects.
            if self.ddc_chan_pairs[idx]:
                rate_ratio = self.ddc_chan_pairs[idx][0].get_input_rate(
                    self.ddc_chan_pairs[idx][1]) / \
                    self.ddc_chan_pairs[idx][0].get_output_rate(
                    self.ddc_chan_pairs[idx][1])
                stream_cmd = StreamCMD(StreamMode.num_done)
                stream_cmd.num_samps = int(tmp_stream_cmd.num_samps * rate_ratio)
                stream_cmd.stream_now = tmp_stream_cmd.stream_now
                stream_cmd.time_spec = tmp_stream_cmd.time_spec
            rcp[0].issue_stream_cmd(stream_cmd, rcp[1])

        timeout = time.monotonic() + 15.0
        while any((self.replay_blocks[0].get_record_fullness(ports[idx]) < mem_size
            for idx, _ in enumerate(self.radio_chan_pairs))):
            time.sleep(0.200)
            if time.monotonic() > timeout:
                raise RuntimeError("Timeout while loading replay buffer!")

    def recv(self, data, metadata, timeout=0.1):
        """
        This is a wrapper around download() that can be used to use this class
        like you would use an RxStreamer object.

        Unlike RxStreamer.recv(), you cannot call this repeatedly with small
        packets. Every call to this method will overwrite the previous call's
        data.

        Unlike RxStreamer.recv(), DramReceiver.recv() streams the received data
        from the replay block to the host on a per-channel basis. Therefore it
        generates RxMetadata for each channel which is not merged into a single
        metadata object. Instead, the last received metadata object is stored.
        If no error occured this should be the same for all channels. If an error
        occurs, the _download() method will throw an error which differs from the
        behavior of the RxStreamer.recv() method.

        The timeout parameter is unused, it is only there to retain the call
        signature compatibility. Time specs are pulled from the RX metadata object.
        """
        num_chans = len(self.radio_chan_pairs)
        self.receive_metadata = metadata
        if num_chans == 1:
            self.download(data, self.replay_ports[0])
        else:
            for idx, _ in enumerate(self.radio_chan_pairs):
                self.download(data[idx], self.replay_ports[idx])
        return min(self.download_size)