aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
blob: d337dec5e9d5421b58da34abd10fe6b878bc0ba5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
#include <uhd/utils/safe_call.hpp>
#include <uhd/utils/thread.hpp>
#include <uhdlib/rfnoc/chdr_ctrl_endpoint.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <boost/format.hpp>
#include <atomic>
#include <mutex>
#include <thread>

using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::rfnoc::chdr;


chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;

class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
{
public:
    chdr_ctrl_endpoint_impl(chdr_ctrl_xport::sptr xport,
        const chdr::chdr_packet_factory& pkt_factory,
        sep_id_t my_epid)
        : _my_epid(my_epid)
        , _xport(xport)
        , _send_seqnum(0)
        , _send_pkt(pkt_factory.make_ctrl())
        , _recv_pkt(pkt_factory.make_ctrl())
        , _stop_recv_thread(false)
        , _recv_thread([this]() { recv_worker(); })
    {
        const std::string thread_name(str(boost::format("uhd_ctrl_ep%04x") % _my_epid));
        uhd::set_thread_name(&_recv_thread, thread_name);
        UHD_LOG_DEBUG("RFNOC",
            boost::format(
                "Started thread %s to process messages control messages on EPID %d")
                % thread_name % _my_epid);
    }

    virtual ~chdr_ctrl_endpoint_impl()
    {
        UHD_SAFE_CALL(
            // Interrupt buffer updater loop
            _stop_recv_thread = true;
            // Wait for loop to finish
            // No timeout on join. The recv loop is guaranteed
            // to terminate in a reasonable amount of time because
            // there are no timed blocks on the underlying.
            _recv_thread.join();
            // Flush base transport
            while (true) {
                auto buff = _xport->get_recv_buff(100);
                if (buff) {
                    _xport->release_recv_buff(std::move(buff));
                } else {
                    break;
                }
            }
            // Release child endpoints
            _endpoint_map.clear(););
    }

    virtual ctrlport_endpoint::sptr get_ctrlport_ep(sep_id_t dst_epid,
        uint16_t dst_port,
        size_t buff_capacity,
        size_t max_outstanding_async_msgs,
        const clock_iface& client_clk,
        const clock_iface& timebase_clk)
    {
        std::lock_guard<std::mutex> lock(_mutex);

        ep_map_key_t key{dst_epid, dst_port};
        // Function to send a control payload
        auto send_fn = [this, dst_epid](const ctrl_payload& payload, double timeout) {
            // Build header
            chdr_header header;
            header.set_pkt_type(PKT_TYPE_CTRL);
            header.set_num_mdata(0);
            header.set_seq_num(_send_seqnum++);
            header.set_dst_epid(dst_epid);
            // Acquire send buffer and send the packet
            std::lock_guard<std::mutex> lock(_send_mutex);
            auto send_buff = _xport->get_send_buff(timeout * 1000);
            _send_pkt->refresh(send_buff->data(), header, payload);
            send_buff->set_packet_size(header.get_length());
            _xport->release_send_buff(std::move(send_buff));
        };

        if (_endpoint_map.find(key) == _endpoint_map.end()) {
            ctrlport_endpoint::sptr ctrlport_ep = ctrlport_endpoint::make(send_fn,
                _my_epid,
                dst_port,
                buff_capacity,
                max_outstanding_async_msgs,
                client_clk,
                timebase_clk);
            _endpoint_map.insert(std::make_pair(key, ctrlport_ep));
            UHD_LOG_DEBUG("RFNOC",
                boost::format("Created ctrlport endpoint for port %d on EPID %d")
                    % dst_port % _my_epid);
            return ctrlport_ep;
        } else {
            return _endpoint_map.at(key);
        }
    }

    virtual size_t get_num_drops() const
    {
        return _num_drops;
    }

private:
    void recv_worker()
    {
        // Run forever:
        // - Pull packets from the base transport
        // - Route them based on the dst_port
        // - Pass them to the ctrlport_endpoint for additional processing
        while (not _stop_recv_thread) {
            // FIXME Move lock back once have threaded_io_service
            std::unique_lock<std::mutex> lock(_mutex);
            auto buff = _xport->get_recv_buff(0);
            if (buff) {
                // FIXME Move lock back to here once have threaded_io_service
                // std::lock_guard<std::mutex> lock(_mutex);
                try {
                    _recv_pkt->refresh(buff->data());
                    const ctrl_payload payload = _recv_pkt->get_payload();
                    ep_map_key_t key{payload.src_epid, payload.dst_port};
                    if (_endpoint_map.find(key) != _endpoint_map.end()) {
                        _endpoint_map.at(key)->handle_recv(payload);
                    }
                } catch (...) {
                    // Ignore all errors
                }
                _xport->release_recv_buff(std::move(buff));
            } else {
                // FIXME Move lock back to lock_guard once have threaded_io_service
                lock.unlock();
                // Be a good citizen and yield if no packet is processed
                static const size_t MIN_DUR = 1;
                boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
                // We call sleep(MIN_DUR) above instead of yield() to ensure that we
                // relinquish the current scheduler time slot.
                // yield() is a hint to the scheduler to end the time
                // slice early and schedule in another thread that is ready to run.
                // However in most situations, there will be no other thread and
                // this thread will continue to run which will rail a CPU core.
                // We call sleep(MIN_DUR=1) instead which will sleep for a minimum
                // time. Ideally we would like to use boost::chrono::.*seconds::min()
                // but that is bound to 0, which causes the sleep_for call to be a
                // no-op and thus useless to actually force a sleep.
            }
        }
    }

    using ep_map_key_t = std::pair<sep_id_t, uint16_t>;

    // The endpoint ID of this software endpoint
    const sep_id_t _my_epid;
    // Send/recv transports
    chdr_ctrl_xport::sptr _xport;
    // The curent sequence number for a send packet
    size_t _send_seqnum = 0;
    // The number of packets dropped
    size_t _num_drops = 0;
    // Packet containers
    chdr_ctrl_packet::uptr _send_pkt;
    chdr_ctrl_packet::cuptr _recv_pkt;
    // A collection of ctrlport endpoints (keyed by the port number)
    std::map<ep_map_key_t, ctrlport_endpoint::sptr> _endpoint_map;
    // A thread that will handle all responses and async message requests
    std::atomic_bool _stop_recv_thread;
    std::thread _recv_thread;
    // Mutex that protects all state in this class except for _send_pkt
    std::mutex _mutex;
    // Mutex that protects _send_pkt and _xport.send
    std::mutex _send_mutex;
};

chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(chdr_ctrl_xport::sptr xport,
    const chdr::chdr_packet_factory& pkt_factory,
    sep_id_t my_epid)
{
    return std::make_unique<chdr_ctrl_endpoint_impl>(xport, pkt_factory, my_epid);
}