aboutsummaryrefslogtreecommitdiffstats
path: root/gnuradio-runtime/include/gnuradio/basic_block.h
blob: de1cbe41f6f6f1b6e0f1570b7ba2ba0cce2a82e7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
/* -*- c++ -*- */
/*
 * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * GNU Radio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 *
 * GNU Radio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with GNU Radio; see the file COPYING.  If not, write to
 * the Free Software Foundation, Inc., 51 Franklin Street,
 * Boston, MA 02110-1301, USA.
 */

#ifndef INCLUDED_GR_BASIC_BLOCK_H
#define INCLUDED_GR_BASIC_BLOCK_H

#include <gnuradio/api.h>
#include <gnuradio/io_signature.h>
#include <gnuradio/msg_accepter.h>
#include <gnuradio/runtime_types.h>
#include <gnuradio/sptr_magic.h>
#include <gnuradio/thread/thread.h>
#include <boost/enable_shared_from_this.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/thread/condition_variable.hpp>
#include <deque>
#include <iostream>
#include <map>
#include <string>

#ifdef GR_CTRLPORT
#include <gnuradio/rpcregisterhelpers.h>
#endif

namespace gr {

/*!
 * \brief The abstract base class for all signal processing blocks.
 * \ingroup internal
 *
 * Basic blocks are the bare abstraction of an entity that has a
 * name, a set of inputs and outputs, and a message queue.  These
 * are never instantiated directly; rather, this is the abstract
 * parent class of both gr_hier_block, which is a recursive
 * container, and block, which implements actual signal
 * processing functions.
 */
class GR_RUNTIME_API basic_block : public msg_accepter,
                                   public boost::enable_shared_from_this<basic_block>
{
    typedef boost::function<void(pmt::pmt_t)> msg_handler_t;

private:
    typedef std::map<pmt::pmt_t, msg_handler_t, pmt::comparator> d_msg_handlers_t;
    d_msg_handlers_t d_msg_handlers;

    typedef std::deque<pmt::pmt_t> msg_queue_t;
    typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator> msg_queue_map_t;
    typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comparator>::iterator
        msg_queue_map_itr;
    std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comparator>
        msg_queue_ready;

    gr::thread::mutex mutex; //< protects all vars

protected:
    friend class flowgraph;
    friend class flat_flowgraph; // TODO: will be redundant
    friend class tpb_thread_body;

    enum vcolor { WHITE, GREY, BLACK };

    std::string d_name;
    gr::io_signature::sptr d_input_signature;
    gr::io_signature::sptr d_output_signature;
    long d_unique_id;
    long d_symbolic_id;
    std::string d_symbol_name;
    std::string d_symbol_alias;
    vcolor d_color;
    bool d_rpc_set;

    msg_queue_map_t msg_queue;
    std::vector<rpcbasic_sptr> d_rpc_vars; // container for all RPC variables

    basic_block(void) {} // allows pure virtual interface sub-classes

    //! Protected constructor prevents instantiation by non-derived classes
    basic_block(const std::string& name,
                gr::io_signature::sptr input_signature,
                gr::io_signature::sptr output_signature);

    //! may only be called during constructor
    void set_input_signature(gr::io_signature::sptr iosig) { d_input_signature = iosig; }

    //! may only be called during constructor
    void set_output_signature(gr::io_signature::sptr iosig)
    {
        d_output_signature = iosig;
    }

    /*!
     * \brief Allow the flowgraph to set for sorting and partitioning
     */
    void set_color(vcolor color) { d_color = color; }
    vcolor color() const { return d_color; }

    /*!
     * \brief Tests if there is a handler attached to port \p which_port
     */
    virtual bool has_msg_handler(pmt::pmt_t which_port)
    {
        return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
    }

    /*
     * This function is called by the runtime system to dispatch messages.
     *
     * The thread-safety guarantees mentioned in set_msg_handler are
     * implemented by the callers of this method.
     */
    virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
    {
        // AA Update this
        if (has_msg_handler(which_port)) {   // Is there a handler?
            d_msg_handlers[which_port](msg); // Yes, invoke it.
        }
    }

    // Message passing interface
    pmt::pmt_t d_message_subscribers;

public:
    pmt::pmt_t message_subscribers(pmt::pmt_t port);
    virtual ~basic_block();
    long unique_id() const { return d_unique_id; }
    long symbolic_id() const { return d_symbolic_id; }

    /*! The name of the block */
    std::string name() const { return d_name; }

    /*!
     * The sybolic name of the block, which is used in the
     * block_registry. The name is assigned by the block's constructor
     * and never changes during the life of the block.
     */
    std::string symbol_name() const { return d_symbol_name; }
    std::string identifier() const
    {
        return this->name() + "(" + std::to_string(this->unique_id()) + ")";
    }

    gr::io_signature::sptr input_signature() const { return d_input_signature; }
    gr::io_signature::sptr output_signature() const { return d_output_signature; }
    basic_block_sptr to_basic_block(); // Needed for Python type coercion

    /*!
     * True if the block has an alias (see set_block_alias).
     */
    bool alias_set() const { return !d_symbol_alias.empty(); }

    /*!
     * Returns the block's alias as a string.
     */
    std::string alias() const { return alias_set() ? d_symbol_alias : symbol_name(); }

    /*!
     * Returns the block's alias as PMT.
     */
    pmt::pmt_t alias_pmt() const { return pmt::intern(alias()); }

    /*!
     * Set's a new alias for the block; also adds an entry into the
     * block_registry to get the block using either the alias or the
     * original symbol name.
     */
    void set_block_alias(std::string name);

    // ** Message passing interface **
    void message_port_register_in(pmt::pmt_t port_id);
    void message_port_register_out(pmt::pmt_t port_id);
    void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
    void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
    void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);

    virtual bool message_port_is_hier(pmt::pmt_t port_id)
    {
        (void)port_id;
        return false;
    }
    virtual bool message_port_is_hier_in(pmt::pmt_t port_id)
    {
        (void)port_id;
        return false;
    }
    virtual bool message_port_is_hier_out(pmt::pmt_t port_id)
    {
        (void)port_id;
        return false;
    }

    /*!
     * \brief Get input message port names.
     *
     * Returns the available input message ports for a block. The
     * return object is a PMT vector that is filled with PMT symbols.
     */
    pmt::pmt_t message_ports_in();

    /*!
     * \brief Get output message port names.
     *
     * Returns the available output message ports for a block. The
     * return object is a PMT vector that is filled with PMT symbols.
     */
    pmt::pmt_t message_ports_out();

    /*!
     * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
     */
    void _post(pmt::pmt_t which_port, pmt::pmt_t msg);

    //! is the queue empty?
    bool empty_p(pmt::pmt_t which_port)
    {
        if (msg_queue.find(which_port) == msg_queue.end())
            throw std::runtime_error("port does not exist!");
        return msg_queue[which_port].empty();
    }
    bool empty_p()
    {
        bool rv = true;
        BOOST_FOREACH (msg_queue_map_t::value_type& i, msg_queue) {
            rv &= msg_queue[i.first].empty();
        }
        return rv;
    }

    //! are all msg ports with handlers empty?
    bool empty_handled_p(pmt::pmt_t which_port)
    {
        return (empty_p(which_port) || !has_msg_handler(which_port));
    }
    bool empty_handled_p()
    {
        bool rv = true;
        BOOST_FOREACH (msg_queue_map_t::value_type& i, msg_queue) {
            rv &= empty_handled_p(i.first);
        }
        return rv;
    }

    //! How many messages in the queue?
    size_t nmsgs(pmt::pmt_t which_port)
    {
        if (msg_queue.find(which_port) == msg_queue.end())
            throw std::runtime_error("port does not exist!");
        return msg_queue[which_port].size();
    }

    //| Acquires and release the mutex
    void insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg);
    /*!
     * \returns returns pmt at head of queue or pmt::pmt_t() if empty.
     */
    pmt::pmt_t delete_head_nowait(pmt::pmt_t which_port);

    msg_queue_t::iterator get_iterator(pmt::pmt_t which_port)
    {
        return msg_queue[which_port].begin();
    }

    void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it)
    {
        msg_queue[which_port].erase(it);
    }

    virtual bool has_msg_port(pmt::pmt_t which_port)
    {
        if (msg_queue.find(which_port) != msg_queue.end()) {
            return true;
        }
        if (pmt::dict_has_key(d_message_subscribers, which_port)) {
            return true;
        }
        return false;
    }

    const msg_queue_map_t& get_msg_map(void) const { return msg_queue; }

#ifdef GR_CTRLPORT
    /*!
     * \brief Add an RPC variable (get or set).
     *
     * Using controlport, we create new getters/setters and need to
     * store them. Each block has a vector to do this, and these never
     * need to be accessed again once they are registered with the RPC
     * backend. This function takes a
     * boost::shared_sptr<rpcbasic_base> so that when the block is
     * deleted, all RPC registered variables are cleaned up.
     *
     * \param s an rpcbasic_sptr of the new RPC variable register to store.
     */
    void add_rpc_variable(rpcbasic_sptr s) { d_rpc_vars.push_back(s); }
#endif /* GR_CTRLPORT */

    /*!
     * \brief Set up the RPC registered variables.
     *
     * This must be overloaded by a block that wants to use
     * controlport. This is where rpcbasic_register_{get,set} pointers
     * are created, which then get wrapped as shared pointers
     * (rpcbasic_sptr(...)) and stored using add_rpc_variable.
     */
    virtual void setup_rpc(){};

    /*!
     * \brief Ask if this block has been registered to the RPC.
     *
     * We can only register a block once, so we use this to protect us
     * from calling it multiple times.
     */
    bool is_rpc_set() { return d_rpc_set; }

    /*!
     * \brief When the block is registered with the RPC, set this.
     */
    void rpc_set() { d_rpc_set = true; }

    /*!
     * \brief Confirm that ninputs and noutputs is an acceptable combination.
     *
     * \param ninputs	number of input streams connected
     * \param noutputs	number of output streams connected
     *
     * \returns true if this is a valid configuration for this block.
     *
     * This function is called by the runtime system whenever the
     * topology changes. Most classes do not need to override this.
     * This check is in addition to the constraints specified by the
     * input and output gr::io_signatures.
     */
    virtual bool check_topology(int ninputs, int noutputs)
    {
        (void)ninputs;
        (void)noutputs;
        return true;
    }

    /*!
     * \brief Set the callback that is fired when messages are available.
     *
     * \p msg_handler can be any kind of function pointer or function object
     * that has the signature:
     * <pre>
     *    void msg_handler(pmt::pmt msg);
     * </pre>
     *
     * (You may want to use boost::bind to massage your callable into
     * the correct form.  See gr::blocks::nop for an example that sets
     * up a class method as the callback.)
     *
     * Blocks that desire to handle messages must call this method in
     * their constructors to register the handler that will be invoked
     * when messages are available.
     *
     * If the block inherits from block, the runtime system will
     * ensure that msg_handler is called in a thread-safe manner, such
     * that work and msg_handler will never be called concurrently.
     * This allows msg_handler to update state variables without
     * having to worry about thread-safety issues with work,
     * general_work or another invocation of msg_handler.
     *
     * If the block inherits from hier_block2, the runtime system
     * will ensure that no reentrant calls are made to msg_handler.
     */
    template <typename T>
    void set_msg_handler(pmt::pmt_t which_port, T msg_handler)
    {
        if (msg_queue.find(which_port) == msg_queue.end()) {
            throw std::runtime_error(
                "attempt to set_msg_handler() on bad input message port!");
        }
        d_msg_handlers[which_port] = msg_handler_t(msg_handler);
    }

    virtual void set_processor_affinity(const std::vector<int>& mask) = 0;

    virtual void unset_processor_affinity() = 0;

    virtual std::vector<int> processor_affinity() = 0;

    virtual void set_log_level(std::string level) = 0;

    virtual std::string log_level() = 0;
};

inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs)
{
    return lhs->unique_id() < rhs->unique_id();
}

typedef std::vector<basic_block_sptr> basic_block_vector_t;
typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t;

GR_RUNTIME_API long basic_block_ncurrently_allocated();

inline std::ostream& operator<<(std::ostream& os, basic_block_sptr basic_block)
{
    os << basic_block->identifier();
    return os;
}

} /* namespace gr */

#endif /* INCLUDED_GR_BASIC_BLOCK_H */