aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/async_msg_handler.cpp
blob: 0f325b450143bab3f28a6e45aacdda4a5b489687 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//
// Copyright 2018 Ettus Research, a National Instruments Company
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/exception.hpp>
#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/chdr.hpp>
#include <uhd/transport/zero_copy.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/log.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhdlib/rfnoc/async_msg_handler.hpp>
#include <boost/make_shared.hpp>
#include <mutex>

using namespace uhd;
using namespace uhd::rfnoc;

template <endianness_t _endianness>
class async_msg_handler_impl : public async_msg_handler
{
public:
    /************************************************************************
     * Types
     ***********************************************************************/
    typedef uhd::transport::bounded_buffer<async_msg_t> async_md_type;

    /************************************************************************
     * Structors
     ***********************************************************************/
    async_msg_handler_impl(uhd::transport::zero_copy_if::sptr recv,
        uhd::transport::zero_copy_if::sptr send,
        uhd::sid_t sid)
        : _rx_xport(recv), _tx_xport(send), _sid(sid)
    {
        // Launch receive thread
        _recv_msg_task = task::make([=]() { this->handle_async_msgs(); });
    }

    ~async_msg_handler_impl() {}

    /************************************************************************
     * API calls
     ***********************************************************************/
    int register_event_handler(
        const async_msg_t::event_code_t event_code, async_handler_type handler)
    {
        _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>(
            event_code, handler));
        return _event_handlers.count(event_code);
    }

    void post_async_msg(const async_msg_t& metadata)
    {
        std::lock_guard<std::mutex> lock(_mutex);

        for (const auto& event_handler : _event_handlers) {
            // If the event code in the message matches the event code used at
            // registration time, call the event handler
            if ((metadata.event_code & event_handler.first) == event_handler.first) {
                event_handler.second(metadata);
            }
        }

        // Print
        if (metadata.event_code & async_msg_t::EVENT_CODE_UNDERFLOW) {
            UHD_LOG_FASTPATH("U")
        } else if (metadata.event_code
                   & (async_msg_t::EVENT_CODE_SEQ_ERROR
                         | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST)) {
            UHD_LOG_FASTPATH("S")
        } else if (metadata.event_code
                   & (async_msg_t::EVENT_CODE_LATE_CMD_ERROR
                         | async_msg_t::EVENT_CODE_LATE_DATA_ERROR)) {
            UHD_LOG_FASTPATH("L")
        } else if (metadata.event_code & async_msg_t::EVENT_CODE_OVERRUN) {
            UHD_LOG_FASTPATH("O")
        }
    }

private: // methods
    /************************************************************************
     * Internals
     ***********************************************************************/
    /*! Packet receiver thread call.
     */
    void handle_async_msgs()
    {
        using namespace uhd::transport;
        managed_recv_buffer::sptr buff = _rx_xport->get_recv_buff();
        if (not buff)
            return;

        // Get packet info
        vrt::if_packet_info_t if_packet_info;
        if_packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t);
        const uint32_t* packet_buff       = buff->cast<const uint32_t*>();

        // unpacking can fail
        uint32_t (*endian_conv)(uint32_t) = uhd::ntohx;
        try {
            if (_endianness == ENDIANNESS_BIG) {
                vrt::chdr::if_hdr_unpack_be(packet_buff, if_packet_info);
                endian_conv = uhd::ntohx;
            } else {
                vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info);
                endian_conv = uhd::wtohx;
            }
        } catch (const uhd::value_error& ex) {
            UHD_LOGGER_ERROR("RFNOC")
                << "[async message handler] Error parsing async message packet: "
                << ex.what() << std::endl;
            return;
        }

        // We discard anything that's not actually a command or response packet.
        if (not(if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD)
            or if_packet_info.num_packet_words32 == 0) {
            return;
        }

        const uint32_t* payload = packet_buff + if_packet_info.num_header_words32;
        async_msg_t metadata(if_packet_info.num_payload_words32 - 1);
        metadata.has_time_spec = if_packet_info.has_tsf;
        // FIXME: not hardcoding tick rate
        metadata.time_spec  = time_spec_t::from_ticks(if_packet_info.tsf, 1);
        metadata.event_code = async_msg_t::event_code_t(endian_conv(payload[0]) & 0xFFFF);
        metadata.sid        = if_packet_info.sid;

        // load user payload
        for (size_t i = 1; i < if_packet_info.num_payload_words32; i++) {
            metadata.payload[i - 1] = endian_conv(payload[i]);
        }

        this->post_async_msg(metadata);
    }

    uint32_t get_local_addr() const
    {
        return _sid.get_src();
    }

private: // members
    std::mutex _mutex;
    //! Store event handlers
    std::multimap<async_msg_t::event_code_t, async_handler_type> _event_handlers;
    //! port that receive messge
    uhd::transport::zero_copy_if::sptr _rx_xport;

    //! port that send out respond
    uhd::transport::zero_copy_if::sptr _tx_xport;

    //! The source part of \p _sid is the address of this async message handler.
    uhd::sid_t _sid;

    //! Stores the task that polls the Rx queue
    task::sptr _recv_msg_task;
};

async_msg_handler::sptr async_msg_handler::make(uhd::transport::zero_copy_if::sptr recv,
    uhd::transport::zero_copy_if::sptr send,
    uhd::sid_t sid,
    endianness_t endianness)
{
    if (endianness == ENDIANNESS_BIG) {
        return boost::make_shared<async_msg_handler_impl<ENDIANNESS_BIG>>(
            recv, send, sid);
    } else {
        return boost::make_shared<async_msg_handler_impl<ENDIANNESS_LITTLE>>(
            recv, send, sid);
    }
}