diff options
author | Jacob Gilbert <jacob.gilbert@protonmail.com> | 2021-03-13 18:10:44 -0800 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-03-18 16:35:41 -0400 |
commit | a8a9913136a64da903f190493bdc117b5349625e (patch) | |
tree | cb14b39daa613397e324efd063f688c91e9b3bfa /gr-network | |
parent | gr-pdu: initial commit with the PDU module (diff) | |
download | gnuradio-a8a9913136a64da903f190493bdc117b5349625e.tar.xz gnuradio-a8a9913136a64da903f190493bdc117b5349625e.zip |
gr-pdu: move pdu blocks to gr::pdu
Moving the following from gr::blocks into gr-pdu:
- pdu_filter block
- pdu_remove block
- pdu_set block
- pdu_to_tagged_stream block
- random_pdu block
- tagged_stream_to_pdu block
Moving the following from gr::blocks into gr-network:
- socket_pdu block
- stream_pdu_base (noblock)
- tcp_connection (noblock)
- tuntap_pdu block
Moving the following from gr::blocks into gr:
- pdu (noblock, general PDU functions)
Signed-off-by: Jacob Gilbert <jacob.gilbert@protonmail.com>
Diffstat (limited to 'gr-network')
23 files changed, 1374 insertions, 5 deletions
diff --git a/gr-network/grc/CMakeLists.txt b/gr-network/grc/CMakeLists.txt index 8ac8bb68c..622624e4e 100644 --- a/gr-network/grc/CMakeLists.txt +++ b/gr-network/grc/CMakeLists.txt @@ -7,8 +7,10 @@ # install(FILES + network_socket_pdu.block.yml network_tcp_sink.block.yml network_tcp_source.block.yml + network_tuntap_pdu.block.yml network_udp_sink.block.yml network_udp_source.block.yml DESTINATION share/gnuradio/grc/blocks diff --git a/gr-network/grc/network_socket_pdu.block.yml b/gr-network/grc/network_socket_pdu.block.yml new file mode 100644 index 000000000..0033c769e --- /dev/null +++ b/gr-network/grc/network_socket_pdu.block.yml @@ -0,0 +1,57 @@ +id: network_socket_pdu +label: Socket PDU +category: '[Core]/Networking Tools' +flags: [ python, cpp ] + +parameters: +- id: type + label: Type + dtype: enum + default: TCP_SERVER + options: ["TCP_SERVER", "TCP_CLIENT", "UDP_SERVER", "UDP_CLIENT"] + option_labels: [TCP Server, TCP Client, UDP Server, UDP Client] +- id: host + label: Host + dtype: string +- id: port + label: Port + dtype: string + default: '52001' +- id: mtu + label: MTU + dtype: int + default: '10000' +- id: tcp_no_delay + label: TCP No Delay + dtype: enum + default: 'False' + options: ['True', 'False'] + option_labels: [Enabled, Disabled] + hide: ${ (( 'part' if (str(tcp_no_delay) == 'False') else 'none') if ((type == 'TCP_CLIENT') or (type == 'TCP_SERVER')) else 'all') } + +inputs: +- domain: message + id: pdus + optional: true + +outputs: +- domain: message + id: pdus + optional: true + +templates: + imports: from gnuradio import network + make: network.socket_pdu(${repr(type)}, ${host}, ${port}, ${mtu}, ${tcp_no_delay}) + +cpp_templates: + includes: ['#include <gnuradio/network/socket_pdu.h>'] + declarations: 'network::socket_pdu::sptr ${id};' + make: 'this->${id} = network::socket_pdu::make("${type}", ${host}, ${port}, ${mtu}, ${tcp_no_delay});' + translations: + 'True': 'true' + 'False': 'false' + +documentation: |- + For server modes, leave Host blank to bind to all interfaces (equivalent to 0.0.0.0). + +file_format: 1 diff --git a/gr-network/grc/network_tuntap_pdu.block.yml b/gr-network/grc/network_tuntap_pdu.block.yml new file mode 100644 index 000000000..1542b89a9 --- /dev/null +++ b/gr-network/grc/network_tuntap_pdu.block.yml @@ -0,0 +1,44 @@ +id: network_tuntap_pdu +label: TUNTAP PDU +category: '[Core]/Networking Tools' +flags: [ python, cpp ] + +parameters: +- id: ifn + label: Interface Name + dtype: string + default: tap0 +- id: mtu + label: MTU + dtype: int + default: '10000' +- id: istunflag + label: Flag + dtype: enum + default: 'False' + options: ['True', 'False'] + option_labels: [TUN(IP Packet), TAP(Ethernet Frame)] + +inputs: +- domain: message + id: pdus + optional: true + +outputs: +- domain: message + id: pdus + optional: true + +templates: + imports: from gnuradio import network + make: network.tuntap_pdu(${ifn}, ${mtu}, ${istunflag}) + +cpp_templates: + includes: ['#include <gnuradio/network/tuntap_pdu.h>'] + declarations: 'network::tuntap_pdu::sptr ${id};' + make: 'this->${id} = network::tuntap_pdu::make(${ifn}, ${mtu}, ${istunflag});' + translations: + 'True': 'true' + 'False': 'false' + +file_format: 1 diff --git a/gr-network/include/gnuradio/network/CMakeLists.txt b/gr-network/include/gnuradio/network/CMakeLists.txt index 8712ed00d..36ff1b48a 100644 --- a/gr-network/include/gnuradio/network/CMakeLists.txt +++ b/gr-network/include/gnuradio/network/CMakeLists.txt @@ -11,7 +11,9 @@ install(FILES api.h packet_headers.h + socket_pdu.h tcp_sink.h + tuntap_pdu.h udp_header_types.h udp_sink.h udp_source.h diff --git a/gr-network/include/gnuradio/network/socket_pdu.h b/gr-network/include/gnuradio/network/socket_pdu.h new file mode 100644 index 000000000..dba2522fa --- /dev/null +++ b/gr-network/include/gnuradio/network/socket_pdu.h @@ -0,0 +1,48 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_NETWORK_SOCKET_PDU_H +#define INCLUDED_NETWORK_SOCKET_PDU_H + +#include <gnuradio/block.h> +#include <gnuradio/network/api.h> + +namespace gr { +namespace network { + +/*! + * \brief Creates socket interface and translates traffic to PDUs + * \ingroup networking_tools_blk + */ +class NETWORK_API socket_pdu : virtual public block +{ +public: + // gr::network::socket_pdu::sptr + typedef std::shared_ptr<socket_pdu> sptr; + + /*! + * \brief Construct a SOCKET PDU interface + * \param type "TCP_SERVER", "TCP_CLIENT", "UDP_SERVER", or "UDP_CLIENT" + * \param addr network address to use + * \param port network port to use + * \param MTU maximum transmission unit + * \param tcp_no_delay TCP No Delay option (set to True to disable Nagle algorithm) + */ + static sptr make(std::string type, + std::string addr, + std::string port, + int MTU = 10000, + bool tcp_no_delay = false); +}; + +} /* namespace network */ +} /* namespace gr */ + +#endif /* INCLUDED_NETWORK_SOCKET_PDU_H */ diff --git a/gr-network/include/gnuradio/network/tuntap_pdu.h b/gr-network/include/gnuradio/network/tuntap_pdu.h new file mode 100644 index 000000000..6ef34aeee --- /dev/null +++ b/gr-network/include/gnuradio/network/tuntap_pdu.h @@ -0,0 +1,42 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_NETWORK_TUNTAP_PDU_H +#define INCLUDED_NETWORK_TUNTAP_PDU_H + +#include <gnuradio/block.h> +#include <gnuradio/network/api.h> + +namespace gr { +namespace network { + +/*! + * \brief Creates TUNTAP interface and translates traffic to PDUs + * \ingroup networking_tools_blk + */ +class NETWORK_API tuntap_pdu : virtual public block +{ +public: + // gr::network::tuntap_pdu::sptr + typedef std::shared_ptr<tuntap_pdu> sptr; + + /*! + * \brief Construct a TUNTAP PDU interface + * \param dev Device name to create + * \param MTU Maximum Transmission Unit size + * \param istunflag Flag to indicate TUN or Tap + */ + static sptr make(std::string dev, int MTU = 10000, bool istunflag = false); +}; + +} /* namespace network */ +} /* namespace gr */ + +#endif /* INCLUDED_NETWORK_TUNTAP_PDU_H */ diff --git a/gr-network/lib/CMakeLists.txt b/gr-network/lib/CMakeLists.txt index 52a267079..0eecda139 100644 --- a/gr-network/lib/CMakeLists.txt +++ b/gr-network/lib/CMakeLists.txt @@ -13,7 +13,11 @@ include(GrPlatform) #define LIB_SUFFIX list(APPEND network_sources + socket_pdu_impl.cc + stream_pdu_base.cc + tcp_connection.cc tcp_sink_impl.cc + tuntap_pdu_impl.cc udp_sink_impl.cc udp_source_impl.cc ) diff --git a/gr-network/lib/socket_pdu_impl.cc b/gr-network/lib/socket_pdu_impl.cc new file mode 100644 index 000000000..a9e854447 --- /dev/null +++ b/gr-network/lib/socket_pdu_impl.cc @@ -0,0 +1,271 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2019 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "socket_pdu_impl.h" +#include "tcp_connection.h" +#include <gnuradio/io_signature.h> +#include <gnuradio/pdu.h> + +namespace gr { +namespace network { + +socket_pdu::sptr socket_pdu::make(std::string type, + std::string addr, + std::string port, + int MTU /*= 10000*/, + bool tcp_no_delay /*= false*/) +{ + return gnuradio::make_block_sptr<socket_pdu_impl>( + type, addr, port, MTU, tcp_no_delay); +} + +socket_pdu_impl::socket_pdu_impl(std::string type, + std::string addr, + std::string port, + int MTU /*= 10000*/, + bool tcp_no_delay /*= false*/) + : block("socket_pdu", io_signature::make(0, 0, 0), io_signature::make(0, 0, 0)), + d_tcp_no_delay(tcp_no_delay) +{ + d_rxbuf.resize(MTU); + + message_port_register_in(msgport_names::pdus()); + message_port_register_out(msgport_names::pdus()); + + if ((type == "TCP_SERVER") && + ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces + int port_num = atoi(port.c_str()); + if (port_num == 0) + throw std::invalid_argument( + "gr::pdu:socket_pdu: invalid port for TCP_SERVER"); + d_tcp_endpoint = + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num); + } else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) { + boost::asio::ip::tcp::resolver resolver(d_io_service); + boost::asio::ip::tcp::resolver::query query( + boost::asio::ip::tcp::v4(), + addr, + port, + boost::asio::ip::resolver_query_base::passive); + d_tcp_endpoint = *resolver.resolve(query); + } else if ((type == "UDP_SERVER") && + ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces + int port_num = atoi(port.c_str()); + if (port_num == 0) + throw std::invalid_argument( + "gr::pdu:socket_pdu: invalid port for UDP_SERVER"); + d_udp_endpoint = + boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num); + } else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) { + boost::asio::ip::udp::resolver resolver(d_io_service); + boost::asio::ip::udp::resolver::query query( + boost::asio::ip::udp::v4(), + addr, + port, + boost::asio::ip::resolver_query_base::passive); + + if (type == "UDP_SERVER") + d_udp_endpoint = *resolver.resolve(query); + else + d_udp_endpoint_other = *resolver.resolve(query); + } + + if (type == "TCP_SERVER") { + d_acceptor_tcp = std::make_shared<boost::asio::ip::tcp::acceptor>(d_io_service, + d_tcp_endpoint); + d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + + start_tcp_accept(); + + set_msg_handler(msgport_names::pdus(), + [this](pmt::pmt_t msg) { this->tcp_server_send(msg); }); + } else if (type == "TCP_CLIENT") { + boost::system::error_code error = boost::asio::error::host_not_found; + d_tcp_socket = std::make_shared<boost::asio::ip::tcp::socket>(d_io_service); + d_tcp_socket->connect(d_tcp_endpoint, error); + if (error) + throw boost::system::system_error(error); + d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay)); + + set_msg_handler(msgport_names::pdus(), + [this](pmt::pmt_t msg) { this->tcp_client_send(msg); }); + + d_tcp_socket->async_read_some( + boost::asio::buffer(d_rxbuf), + boost::bind(&socket_pdu_impl::handle_tcp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } else if (type == "UDP_SERVER") { + d_udp_socket = + std::make_shared<boost::asio::ip::udp::socket>(d_io_service, d_udp_endpoint); + d_udp_socket->async_receive_from( + boost::asio::buffer(d_rxbuf), + d_udp_endpoint_other, + boost::bind(&socket_pdu_impl::handle_udp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + + set_msg_handler(msgport_names::pdus(), + [this](pmt::pmt_t msg) { this->udp_send(msg); }); + } else if (type == "UDP_CLIENT") { + d_udp_socket = + std::make_shared<boost::asio::ip::udp::socket>(d_io_service, d_udp_endpoint); + d_udp_socket->async_receive_from( + boost::asio::buffer(d_rxbuf), + d_udp_endpoint_other, + boost::bind(&socket_pdu_impl::handle_udp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + + set_msg_handler(msgport_names::pdus(), + [this](pmt::pmt_t msg) { this->udp_send(msg); }); + } else + throw std::runtime_error("gr::pdu:socket_pdu: unknown socket type"); + + d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this)); + d_started = true; +} + +socket_pdu_impl::~socket_pdu_impl() { stop(); } + +bool socket_pdu_impl::stop() +{ + if (d_started) { + d_io_service.stop(); + d_thread.interrupt(); + d_thread.join(); + } + d_started = false; + return true; +} + +void socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, + size_t bytes_transferred) +{ + if (!error) { + pmt::pmt_t vector = + pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); + pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); + message_port_pub(msgport_names::pdus(), pdu); + + d_tcp_socket->async_read_some( + boost::asio::buffer(d_rxbuf), + boost::bind(&socket_pdu_impl::handle_tcp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } else + throw boost::system::system_error(error); +} + +void socket_pdu_impl::start_tcp_accept() +{ +#if (BOOST_VERSION >= 107000) + tcp_connection::sptr new_connection = + tcp_connection::make(d_io_service, d_rxbuf.size(), d_tcp_no_delay); +#else + tcp_connection::sptr new_connection = tcp_connection::make( + d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay); +#endif + + d_acceptor_tcp->async_accept(new_connection->socket(), + boost::bind(&socket_pdu_impl::handle_tcp_accept, + this, + new_connection, + boost::asio::placeholders::error)); +} + +void socket_pdu_impl::tcp_server_send(pmt::pmt_t msg) +{ + pmt::pmt_t vector = pmt::cdr(msg); + for (size_t i = 0; i < d_tcp_connections.size(); i++) + d_tcp_connections[i]->send(vector); +} + +void socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, + const boost::system::error_code& error) +{ + if (!error) { + // Garbage collect closed sockets + std::vector<tcp_connection::sptr>::iterator it = d_tcp_connections.begin(); + while (it != d_tcp_connections.end()) { + if (!(**it).socket().is_open()) + it = d_tcp_connections.erase(it); + else + ++it; + } + + new_connection->start(this); + d_tcp_connections.push_back(new_connection); + start_tcp_accept(); + } else + std::cout << error << std::endl; +} + +void socket_pdu_impl::tcp_client_send(pmt::pmt_t msg) +{ + pmt::pmt_t vector = pmt::cdr(msg); + size_t len = pmt::blob_length(vector); + size_t offset = 0; + std::vector<char> txbuf(std::min(len, d_rxbuf.size())); + while (offset < len) { + size_t send_len = std::min((len - offset), txbuf.size()); + memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); + offset += send_len; + d_tcp_socket->send(boost::asio::buffer(txbuf, send_len)); + } +} + +void socket_pdu_impl::udp_send(pmt::pmt_t msg) +{ + if (d_udp_endpoint_other.address().to_string() == "0.0.0.0") + return; + + pmt::pmt_t vector = pmt::cdr(msg); + size_t len = pmt::blob_length(vector); + size_t offset = 0; + std::vector<char> txbuf(std::min(len, d_rxbuf.size())); + while (offset < len) { + size_t send_len = std::min((len - offset), txbuf.size()); + memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); + offset += send_len; + d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), d_udp_endpoint_other); + } +} + +void socket_pdu_impl::handle_udp_read(const boost::system::error_code& error, + size_t bytes_transferred) +{ + if (!error) { + pmt::pmt_t vector = + pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); + pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); + + message_port_pub(msgport_names::pdus(), pdu); + + d_udp_socket->async_receive_from( + boost::asio::buffer(d_rxbuf), + d_udp_endpoint_other, + boost::bind(&socket_pdu_impl::handle_udp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } +} + +} /* namespace network */ +} /* namespace gr */ diff --git a/gr-network/lib/socket_pdu_impl.h b/gr-network/lib/socket_pdu_impl.h new file mode 100644 index 000000000..9175290a4 --- /dev/null +++ b/gr-network/lib/socket_pdu_impl.h @@ -0,0 +1,68 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_NETWORK_SOCKET_PDU_IMPL_H +#define INCLUDED_NETWORK_SOCKET_PDU_IMPL_H + +#include "tcp_connection.h" +#include <gnuradio/network/socket_pdu.h> + +namespace gr { +namespace network { + +class socket_pdu_impl : public socket_pdu +{ +private: + boost::asio::io_service d_io_service; + std::vector<char> d_rxbuf; + void run_io_service() { d_io_service.run(); } + gr::thread::thread d_thread; + bool d_started; + + // TCP specific + boost::asio::ip::tcp::endpoint d_tcp_endpoint; + std::vector<tcp_connection::sptr> d_tcp_connections; + void handle_tcp_read(const boost::system::error_code& error, + size_t bytes_transferred); + const bool d_tcp_no_delay; + + // TCP server specific + std::shared_ptr<boost::asio::ip::tcp::acceptor> d_acceptor_tcp; + void start_tcp_accept(); + void tcp_server_send(pmt::pmt_t msg); + void handle_tcp_accept(tcp_connection::sptr new_connection, + const boost::system::error_code& error); + + // TCP client specific + std::shared_ptr<boost::asio::ip::tcp::socket> d_tcp_socket; + void tcp_client_send(pmt::pmt_t msg); + + // UDP specific + boost::asio::ip::udp::endpoint d_udp_endpoint; + boost::asio::ip::udp::endpoint d_udp_endpoint_other; + std::shared_ptr<boost::asio::ip::udp::socket> d_udp_socket; + void handle_udp_read(const boost::system::error_code& error, + size_t bytes_transferred); + void udp_send(pmt::pmt_t msg); + +public: + socket_pdu_impl(std::string type, + std::string addr, + std::string port, + int MTU = 10000, + bool tcp_no_delay = false); + ~socket_pdu_impl() override; + bool stop() override; +}; + +} /* namespace network */ +} /* namespace gr */ + +#endif /* INCLUDED_NETWORK_SOCKET_PDU_IMPL_H */ diff --git a/gr-network/lib/stream_pdu_base.cc b/gr-network/lib/stream_pdu_base.cc new file mode 100644 index 000000000..a6519ceda --- /dev/null +++ b/gr-network/lib/stream_pdu_base.cc @@ -0,0 +1,110 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_IO_H +#include <io.h> +#endif + +#ifdef HAVE_WINDOWS_H +#include <winsock2.h> +#endif + +#include "stream_pdu_base.h" +#include <gnuradio/basic_block.h> +#include <gnuradio/logger.h> +#include <gnuradio/pdu.h> +#include <boost/format.hpp> + +static const long timeout_us = 100 * 1000; // 100ms + +namespace gr { +namespace network { + +stream_pdu_base::stream_pdu_base(int MTU) : d_fd(-1), d_started(false), d_finished(false) +{ + gr::configure_default_loggers(d_pdu_logger, d_pdu_debug_logger, "stream_pdu_base"); + // reserve space for rx buffer + d_rxbuf.resize(MTU, 0); +} + +stream_pdu_base::~stream_pdu_base() { stop_rxthread(); } + +void stream_pdu_base::start_rxthread(basic_block* blk, pmt::pmt_t port) +{ + d_blk = blk; + d_port = port; + d_thread = gr::thread::thread(std::bind(&stream_pdu_base::run, this)); + d_started = true; +} + +void stream_pdu_base::stop_rxthread() +{ + d_finished = true; + + if (d_started) { + d_thread.interrupt(); + d_thread.join(); + } +} + +void stream_pdu_base::run() +{ + while (!d_finished) { + if (!wait_ready()) + continue; + + const int result = read(d_fd, &d_rxbuf[0], d_rxbuf.size()); + if (result <= 0) + throw std::runtime_error("stream_pdu_base, bad socket read!"); + + pmt::pmt_t vector = pmt::init_u8vector(result, &d_rxbuf[0]); + pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); + + d_blk->message_port_pub(d_port, pdu); + } +} + +bool stream_pdu_base::wait_ready() +{ + // setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = timeout_us; + + // setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(d_fd, &rset); + + // call select with timeout on receive socket + return ::select(d_fd + 1, &rset, NULL, NULL, &tv) > 0; +} + +void stream_pdu_base::send(pmt::pmt_t msg) +{ + pmt::pmt_t vector = pmt::cdr(msg); + size_t offset(0); + size_t itemsize(pdu::itemsize(pdu::type_from_pmt(vector))); + int len(pmt::length(vector) * itemsize); + + const int rv = write(d_fd, pmt::uniform_vector_elements(vector, offset), len); + if (rv != len) { + static auto msg = boost::format( + "stream_pdu_base::send(pdu) write failed! (d_fd=%d, len=%d, rv=%d)"); + GR_LOG_WARN(d_pdu_logger, msg % d_fd % len % rv); + } +} + +} /* namespace network */ +} /* namespace gr */ diff --git a/gr-network/lib/stream_pdu_base.h b/gr-network/lib/stream_pdu_base.h new file mode 100644 index 000000000..40940de7d --- /dev/null +++ b/gr-network/lib/stream_pdu_base.h @@ -0,0 +1,52 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_STREAM_PDU_BASE_H +#define INCLUDED_STREAM_PDU_BASE_H + +#include <gnuradio/basic_block.h> +#include <gnuradio/logger.h> +#include <gnuradio/thread/thread.h> +#include <pmt/pmt.h> + +class basic_block; + +namespace gr { +namespace network { + +class stream_pdu_base +{ +public: + stream_pdu_base(int MTU = 10000); + ~stream_pdu_base(); + +protected: + int d_fd; + bool d_started; + bool d_finished; + std::vector<uint8_t> d_rxbuf; + gr::thread::thread d_thread; + + pmt::pmt_t d_port; + basic_block* d_blk; + + void run(); + void send(pmt::pmt_t msg); + bool wait_ready(); + void start_rxthread(basic_block* blk, pmt::pmt_t rxport); + void stop_rxthread(); + + gr::logger_ptr d_pdu_logger, d_pdu_debug_logger; +}; + +} /* namespace network */ +} /* namespace gr */ + +#endif /* INCLUDED_STREAM_PDU_BASE_H */ diff --git a/gr-network/lib/tcp_connection.cc b/gr-network/lib/tcp_connection.cc new file mode 100644 index 000000000..dfb511e05 --- /dev/null +++ b/gr-network/lib/tcp_connection.cc @@ -0,0 +1,101 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "tcp_connection.h" +#include <gnuradio/basic_block.h> +#include <gnuradio/pdu.h> + +namespace gr { +namespace network { + +tcp_connection::sptr tcp_connection::make(boost::asio::io_service& io_service, + int MTU /*= 10000*/, + bool no_delay /*=false*/) +{ + return sptr(new tcp_connection(io_service, MTU, no_delay)); +} + +tcp_connection::tcp_connection(boost::asio::io_service& io_service, + int MTU /*= 10000*/, + bool no_delay /*=false*/) + : d_socket(io_service), d_buf(MTU), d_block(NULL), d_no_delay(no_delay) +{ + try { + d_socket.set_option(boost::asio::ip::tcp::no_delay(no_delay)); + } catch (...) { + // Silently ignore failure (socket might be current in accept stage) and try again + // in 'start' + } +} + +void tcp_connection::send(pmt::pmt_t vector) +{ + size_t len = pmt::blob_length(vector); + + // Asio async_write() requires the buffer to remain valid until the handler is called. + auto txbuf = std::make_shared<std::vector<char>>(len); + + size_t temp = 0; + memcpy(txbuf->data(), pmt::uniform_vector_elements(vector, temp), len); + + size_t offset = 0; + while (offset < len) { + // Limit the size of each write() to the MTU. + // FIXME: Note that this has the effect of breaking a large PDU into several + // smaller PDUs, each containing <= MTU bytes. Is this the desired behavior? + size_t send_len = std::min((len - offset), d_buf.size()); + boost::asio::async_write( + d_socket, + boost::asio::buffer(txbuf->data() + offset, send_len), + [txbuf](const boost::system::error_code& error, size_t bytes_transferred) {}); + offset += send_len; + } +} + +void tcp_connection::start(gr::basic_block* block) +{ + d_block = block; + d_socket.set_option(boost::asio::ip::tcp::no_delay(d_no_delay)); + d_socket.async_read_some(boost::asio::buffer(d_buf), + boost::bind(&tcp_connection::handle_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void tcp_connection::handle_read(const boost::system::error_code& error, + size_t bytes_transferred) +{ + if (!error) { + if (d_block) { + pmt::pmt_t vector = + pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_buf[0]); + pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); + + d_block->message_port_pub(msgport_names::pdus(), pdu); + } + + d_socket.async_read_some( + boost::asio::buffer(d_buf), + boost::bind(&tcp_connection::handle_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } else { + d_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both); + d_socket.close(); + } +} +} /* namespace network */ +} /* namespace gr */ diff --git a/gr-network/lib/tcp_connection.h b/gr-network/lib/tcp_connection.h new file mode 100644 index 000000000..29a2bd79f --- /dev/null +++ b/gr-network/lib/tcp_connection.h @@ -0,0 +1,54 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_TCP_CONNECTION_H +#define INCLUDED_TCP_CONNECTION_H + +#include <pmt/pmt.h> +#include <boost/array.hpp> +#include <boost/asio.hpp> +#include <memory> + +namespace gr { + +class basic_block; + +namespace network { + +class tcp_connection +{ +private: + boost::asio::ip::tcp::socket d_socket; + std::vector<char> d_buf; + basic_block* d_block; + bool d_no_delay; + + tcp_connection(boost::asio::io_service& io_service, + int MTU = 10000, + bool no_delay = false); + + void handle_read(const boost::system::error_code& error, size_t bytes_transferred); + +public: + typedef std::shared_ptr<tcp_connection> sptr; + + static sptr + make(boost::asio::io_service& io_service, int MTU = 10000, bool no_delay = false); + + boost::asio::ip::tcp::socket& socket() { return d_socket; }; + + void start(gr::basic_block* block); + void send(pmt::pmt_t vector); +}; + +} /* namespace network */ +} /* namespace gr */ + +#endif /* INCLUDED_TCP_CONNECTION_H */ diff --git a/gr-network/lib/tuntap_pdu_impl.cc b/gr-network/lib/tuntap_pdu_impl.cc new file mode 100644 index 000000000..87c3750fd --- /dev/null +++ b/gr-network/lib/tuntap_pdu_impl.cc @@ -0,0 +1,172 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "tuntap_pdu_impl.h" +#include <gnuradio/io_signature.h> +#include <gnuradio/pdu.h> +#include <boost/format.hpp> + +#include <fcntl.h> +#include <sys/stat.h> +#include <sys/types.h> + +#if (defined(linux) || defined(__linux) || defined(__linux__)) +#include <arpa/inet.h> +#include <linux/if.h> +#include <sys/ioctl.h> +#endif + +namespace gr { +namespace network { + +tuntap_pdu::sptr tuntap_pdu::make(std::string dev, int MTU, bool istunflag) +{ +#if (defined(linux) || defined(__linux) || defined(__linux__)) + return gnuradio::make_block_sptr<tuntap_pdu_impl>(dev, MTU, istunflag); +#else + throw std::runtime_error("tuntap_pdu not implemented on this platform"); +#endif +} + +#if (defined(linux) || defined(__linux) || defined(__linux__)) +tuntap_pdu_impl::tuntap_pdu_impl(std::string dev, int MTU, bool istunflag) + : block("tuntap_pdu", io_signature::make(0, 0, 0), io_signature::make(0, 0, 0)), + stream_pdu_base(istunflag ? MTU : MTU + 14), + d_dev(dev), + d_istunflag(istunflag) +{ + // make the tuntap + char dev_cstr[IFNAMSIZ]; + memset(dev_cstr, 0x00, IFNAMSIZ); + strncpy(dev_cstr, dev.c_str(), IFNAMSIZ); + dev_cstr[IFNAMSIZ - 1] = '\0'; + + + bool istun = d_istunflag; + if (istun) { + d_fd = tun_alloc(dev_cstr, (IFF_TUN | IFF_NO_PI)); + } else { + d_fd = tun_alloc(dev_cstr, (IFF_TAP | IFF_NO_PI)); + } + + if (d_fd <= 0) + throw std::runtime_error( + "gr::tuntap_pdu::make: tun_alloc failed (are you running as root?)"); + + int err = set_mtu(dev_cstr, MTU); + if (err < 0) { + std::ostringstream msg; + msg << boost::format("failed to set MTU to %d. You should use ifconfig to set " + "the MTU. E.g., `$ sudo ifconfig %s mtu %d`") % + MTU % dev % MTU; + GR_LOG_ERROR(d_logger, msg.str()); + } + + + std::cout << boost::format("Allocated virtual ethernet interface: %s\n" + "You must now use ifconfig to set its IP address. E.g.,\n" + " $ sudo ifconfig %s 192.168.200.1\n" + "Be sure to use a different address in the same subnet " + "for each machine.\n") % + dev % dev + << std::endl; + + // set up output message port + message_port_register_out(msgport_names::pdus()); + start_rxthread(this, msgport_names::pdus()); + + // set up input message port + message_port_register_in(msgport_names::pdus()); + set_msg_handler(msgport_names::pdus(), [this](pmt::pmt_t msg) { this->send(msg); }); +} + +int tuntap_pdu_impl::tun_alloc(char* dev, int flags) +{ + struct ifreq ifr; + int fd, err; + const char* clonedev = "/dev/net/tun"; + + /* Arguments taken by the function: + * + * char *dev: the name of an interface (or '\0'). MUST have enough + * space to hold the interface name if '\0' is passed + * int flags: interface flags (eg, IFF_TUN etc.) + */ + + /* open the clone device */ + if ((fd = open(clonedev, O_RDWR)) < 0) + return fd; + + /* preparation of the struct ifr, of type "struct ifreq" */ + memset(&ifr, 0, sizeof(ifr)); + + ifr.ifr_flags = flags; /* IFF_TUN or IFF_TAP, plus maybe IFF_NO_PI */ + + /* if a device name was specified, put it in the structure; otherwise, + * the kernel will try to allocate the "next" device of the + * specified type + */ + if (*dev) + strncpy(ifr.ifr_name, dev, IFNAMSIZ - 1); + + /* try to create the device */ + if ((err = ioctl(fd, TUNSETIFF, (void*)&ifr)) < 0) { + close(fd); + return err; + } + + /* if the operation was successful, write back the name of the + * interface to the variable "dev", so the caller can know + * it. Note that the caller MUST reserve space in *dev (see calling + * code below) + */ + strcpy(dev, ifr.ifr_name); + + /* this is the special file descriptor that the caller will use to talk + * with the virtual interface + */ + return fd; +} + +int tuntap_pdu_impl::set_mtu(const char* dev, int MTU) +{ + struct ifreq ifr; + int sfd, err; + + /* MTU must be set by passing a socket fd to ioctl; + * create an arbitrary socket for this purpose + */ + if ((sfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + return sfd; + + /* preparation of the struct ifr, of type "struct ifreq" */ + memset(&ifr, 0, sizeof(ifr)); + strncpy(ifr.ifr_name, dev, IFNAMSIZ); + ifr.ifr_name[IFNAMSIZ - 1] = '\0'; + ifr.ifr_addr.sa_family = AF_INET; /* address family */ + ifr.ifr_mtu = MTU; + + /* try to set MTU */ + if ((err = ioctl(sfd, SIOCSIFMTU, (void*)&ifr)) < 0) { + close(sfd); + return err; + } + + close(sfd); + return MTU; +} +#endif + +} /* namespace network */ +} /* namespace gr */ diff --git a/gr-network/lib/tuntap_pdu_impl.h b/gr-network/lib/tuntap_pdu_impl.h new file mode 100644 index 000000000..6811712fe --- /dev/null +++ b/gr-network/lib/tuntap_pdu_impl.h @@ -0,0 +1,41 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_NETWORK_TUNTAP_PDU_IMPL_H +#define INCLUDED_NETWORK_TUNTAP_PDU_IMPL_H + +#include "stream_pdu_base.h" +#include <gnuradio/network/tuntap_pdu.h> + +#if (defined(linux) || defined(__linux) || defined(__linux__)) +#include <linux/if_tun.h> +#endif + +namespace gr { +namespace network { + +class tuntap_pdu_impl : public tuntap_pdu, public stream_pdu_base +{ +#if (defined(linux) || defined(__linux) || defined(__linux__)) +private: + const std::string d_dev; + const bool d_istunflag; + int tun_alloc(char* dev, int flags); + int set_mtu(const char* dev, int MTU); + +public: + tuntap_pdu_impl(std::string dev, int MTU, bool istunflag); +#endif +}; + +} /* namespace network */ +} /* namespace gr */ + +#endif /* INCLUDED_NETWORK_TUNTAP_PDU_IMPL_H */ diff --git a/gr-network/python/network/CMakeLists.txt b/gr-network/python/network/CMakeLists.txt index a524e6390..c6feaffc2 100644 --- a/gr-network/python/network/CMakeLists.txt +++ b/gr-network/python/network/CMakeLists.txt @@ -27,8 +27,18 @@ GR_PYTHON_INSTALL( ######################################################################## # Handle the unit tests ######################################################################## -include(GrTest) - -set(GR_TEST_TARGET_DEPS gnuradio-network) +if(ENABLE_TESTING) + set(GR_TEST_TARGET_DEPS "") + set(GR_TEST_LIBRARY_DIRS "") + set(GR_TEST_PYTHON_DIRS + ${CMAKE_BINARY_DIR}/gnuradio-runtime/python + ) + include(GrTest) + file(GLOB py_qa_test_files "qa_*.py") + foreach(py_qa_test_file ${py_qa_test_files}) + get_filename_component(py_qa_test_name ${py_qa_test_file} NAME_WE) + GR_ADD_TEST(${py_qa_test_name} ${QA_PYTHON_EXECUTABLE} -B ${py_qa_test_file}) + endforeach(py_qa_test_file) +endif(ENABLE_TESTING) add_subdirectory(bindings) diff --git a/gr-network/python/network/bindings/CMakeLists.txt b/gr-network/python/network/bindings/CMakeLists.txt index 80f02c00b..d87f644fd 100644 --- a/gr-network/python/network/bindings/CMakeLists.txt +++ b/gr-network/python/network/bindings/CMakeLists.txt @@ -6,14 +6,16 @@ include(GrPybind) list(APPEND network_python_files # packet_headers_python.cc + socket_pdu_python.cc tcp_sink_python.cc + tuntap_pdu_python.cc # udp_header_types_python.cc udp_sink_python.cc udp_source_python.cc python_bindings.cc) -GR_PYBIND_MAKE_CHECK_HASH(network - ../../.. +GR_PYBIND_MAKE_CHECK_HASH(network + ../../.. gr::network "${network_python_files}") diff --git a/gr-network/python/network/bindings/docstrings/socket_pdu_pydoc_template.h b/gr-network/python/network/bindings/docstrings/socket_pdu_pydoc_template.h new file mode 100644 index 000000000..f7b9cb72b --- /dev/null +++ b/gr-network/python/network/bindings/docstrings/socket_pdu_pydoc_template.h @@ -0,0 +1,24 @@ +/* + * Copyright 2021 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include "pydoc_macros.h" +#define D(...) DOC(gr, network, __VA_ARGS__) +/* + This file contains placeholders for docstrings for the Python bindings. + Do not edit! These were automatically extracted during the binding process + and will be overwritten during the build process + */ + + +static const char* __doc_gr_network_socket_pdu = R"doc()doc"; + + +static const char* __doc_gr_network_socket_pdu_socket_pdu = R"doc()doc"; + + +static const char* __doc_gr_network_socket_pdu_make = R"doc()doc"; diff --git a/gr-network/python/network/bindings/docstrings/tuntap_pdu_pydoc_template.h b/gr-network/python/network/bindings/docstrings/tuntap_pdu_pydoc_template.h new file mode 100644 index 000000000..ec5bf4be2 --- /dev/null +++ b/gr-network/python/network/bindings/docstrings/tuntap_pdu_pydoc_template.h @@ -0,0 +1,24 @@ +/* + * Copyright 2021 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include "pydoc_macros.h" +#define D(...) DOC(gr, network, __VA_ARGS__) +/* + This file contains placeholders for docstrings for the Python bindings. + Do not edit! These were automatically extracted during the binding process + and will be overwritten during the build process + */ + + +static const char* __doc_gr_network_tuntap_pdu = R"doc()doc"; + + +static const char* __doc_gr_network_tuntap_pdu_tuntap_pdu = R"doc()doc"; + + +static const char* __doc_gr_network_tuntap_pdu_make = R"doc()doc"; diff --git a/gr-network/python/network/bindings/python_bindings.cc b/gr-network/python/network/bindings/python_bindings.cc index 3de111aa7..cacd4a1bf 100644 --- a/gr-network/python/network/bindings/python_bindings.cc +++ b/gr-network/python/network/bindings/python_bindings.cc @@ -16,7 +16,9 @@ namespace py = pybind11; // void bind_packet_headers(py::module&); +void bind_socket_pdu(py::module&); void bind_tcp_sink(py::module&); +void bind_tuntap_pdu(py::module&); // void bind_udp_header_types(py::module&); void bind_udp_sink(py::module&); void bind_udp_source(py::module&); @@ -41,7 +43,9 @@ PYBIND11_MODULE(network_python, m) py::module::import("gnuradio.gr"); // bind_packet_headers(m); + bind_socket_pdu(m); bind_tcp_sink(m); + bind_tuntap_pdu(m); // bind_udp_header_types(m); bind_udp_sink(m); bind_udp_source(m); diff --git a/gr-network/python/network/bindings/socket_pdu_python.cc b/gr-network/python/network/bindings/socket_pdu_python.cc new file mode 100644 index 000000000..a3fb134ec --- /dev/null +++ b/gr-network/python/network/bindings/socket_pdu_python.cc @@ -0,0 +1,49 @@ +/* + * Copyright 2021 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +/***********************************************************************************/ +/* This file is automatically generated using bindtool and can be manually edited */ +/* The following lines can be configured to regenerate this file during cmake */ +/* If manual edits are made, the following tags should be modified accordingly. */ +/* BINDTOOL_GEN_AUTOMATIC(0) */ +/* BINDTOOL_USE_PYGCCXML(0) */ +/* BINDTOOL_HEADER_FILE(socket_pdu.h) */ +/* BINDTOOL_HEADER_FILE_HASH(4c11b80b4561122125568a9ab76cea3a) */ +/***********************************************************************************/ + +#include <pybind11/complex.h> +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +namespace py = pybind11; + +#include <gnuradio/network/socket_pdu.h> +// pydoc.h is automatically generated in the build directory +#include <socket_pdu_pydoc.h> + +void bind_socket_pdu(py::module& m) +{ + + using socket_pdu = gr::network::socket_pdu; + + + py::class_<socket_pdu, gr::block, gr::basic_block, std::shared_ptr<socket_pdu>>( + m, "socket_pdu", D(socket_pdu)) + + .def(py::init(&socket_pdu::make), + py::arg("type"), + py::arg("addr"), + py::arg("port"), + py::arg("MTU") = 10000, + py::arg("tcp_no_delay") = false, + D(socket_pdu, make)) + + + ; +} diff --git a/gr-network/python/network/bindings/tuntap_pdu_python.cc b/gr-network/python/network/bindings/tuntap_pdu_python.cc new file mode 100644 index 000000000..a5a09f251 --- /dev/null +++ b/gr-network/python/network/bindings/tuntap_pdu_python.cc @@ -0,0 +1,47 @@ +/* + * Copyright 2021 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +/***********************************************************************************/ +/* This file is automatically generated using bindtool and can be manually edited */ +/* The following lines can be configured to regenerate this file during cmake */ +/* If manual edits are made, the following tags should be modified accordingly. */ +/* BINDTOOL_GEN_AUTOMATIC(0) */ +/* BINDTOOL_USE_PYGCCXML(0) */ +/* BINDTOOL_HEADER_FILE(tuntap_pdu.h) */ +/* BINDTOOL_HEADER_FILE_HASH(26b7f31af528ff1bf62a965e6b0dd636) */ +/***********************************************************************************/ + +#include <pybind11/complex.h> +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +namespace py = pybind11; + +#include <gnuradio/network/tuntap_pdu.h> +// pydoc.h is automatically generated in the build directory +#include <tuntap_pdu_pydoc.h> + +void bind_tuntap_pdu(py::module& m) +{ + + using tuntap_pdu = gr::network::tuntap_pdu; + + + py::class_<tuntap_pdu, gr::block, gr::basic_block, std::shared_ptr<tuntap_pdu>>( + m, "tuntap_pdu", D(tuntap_pdu)) + + .def(py::init(&tuntap_pdu::make), + py::arg("dev"), + py::arg("MTU") = 10000, + py::arg("istunflag") = false, + D(tuntap_pdu, make)) + + + ; +} diff --git a/gr-network/python/network/qa_socket_pdu.py b/gr-network/python/network/qa_socket_pdu.py new file mode 100644 index 000000000..136f7e4cc --- /dev/null +++ b/gr-network/python/network/qa_socket_pdu.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# + + +from gnuradio import gr, gr_unittest, blocks, pdu +from gnuradio import network +import random +import pmt +import time + + +class qa_socket_pdu (gr_unittest.TestCase): + + def setUp(self): + random.seed(0) + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + # Test that blocks can be created and destroyed without hanging + port = str(random.Random().randint(0, 30000) + 10000) + self.pdu_send = network.socket_pdu("UDP_CLIENT", "localhost", port) + self.pdu_recv = network.socket_pdu("UDP_SERVER", "localhost", port) + self.pdu_send = None + self.pdu_recv = None + + def test_002(self): + # Send a PDU through a pair of UDP sockets + port = str(random.Random().randint(0, 30000) + 10000) + srcdata = (0x64, 0x6f, 0x67, 0x65) + data = pmt.init_u8vector(srcdata.__len__(), srcdata) + pdu_msg = pmt.cons(pmt.PMT_NIL, data) + + self.pdu_source = blocks.message_strobe(pdu_msg, 500) + self.pdu_recv = network.socket_pdu("UDP_SERVER", "localhost", port) + self.pdu_send = network.socket_pdu("UDP_CLIENT", "localhost", port) + + self.dbg = blocks.message_debug() + + self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus") + self.tb.msg_connect(self.pdu_recv, "pdus", self.dbg, "store") + + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + self.pdu_send = None + self.pdu_recv = None + + received = self.dbg.get_message(0) + received_data = pmt.cdr(received) + msg_data = [] + for i in range(4): + msg_data.append(pmt.u8vector_ref(received_data, i)) + self.assertEqual(srcdata, tuple(msg_data)) + + def test_003(self): + # Test that block stops when interacting with streaming interface + port = str(random.Random().randint(0, 30000) + 10000) + srcdata = ( + 0x73, + 0x75, + 0x63, + 0x68, + 0x74, + 0x65, + 0x73, + 0x74, + 0x76, + 0x65, + 0x72, + 0x79, + 0x70, + 0x61, + 0x73, + 0x73) + tag_dict = {"offset": 0} + tag_dict["key"] = pmt.intern("len") + tag_dict["value"] = pmt.from_long(8) + tag1 = gr.python_to_tag(tag_dict) + tag_dict["offset"] = 8 + tag2 = gr.python_to_tag(tag_dict) + tags = [tag1, tag2] + + src = blocks.vector_source_b(srcdata, False, 1, tags) + ts_to_pdu = pdu.tagged_stream_to_pdu(gr.types.byte_t, "len") + pdu_send = network.socket_pdu("UDP_CLIENT", "localhost", "4141") + #pdu_recv = network.socket_pdu("UDP_SERVER", "localhost", port) + pdu_to_ts = pdu.pdu_to_tagged_stream(gr.types.byte_t, "len") + head = blocks.head(gr.sizeof_char, 10) + sink = blocks.vector_sink_b(1) + + self.tb.connect(src, ts_to_pdu) + self.tb.msg_connect(ts_to_pdu, "pdus", pdu_send, "pdus") + # a UDP socket connects pdu_send to pdu_recv + # TODO: test that the recv socket can be destroyed from downstream + # that signals DONE. Also that we get the PDUs we sent + #self.tb.msg_connect(pdu_recv, "pdus", pdu_to_ts, "pdus") + #self.tb.connect(pdu_to_ts, head, sink) + self.tb.run() + + def test_004(self): + # Test that the TCP server can stream PDUs <= the MTU size. + port = str(random.Random().randint(0, 30000) + 10000) + mtu = 10000 + srcdata = tuple(x % 256 for x in range(mtu)) + data = pmt.init_u8vector(srcdata.__len__(), srcdata) + pdu_msg = pmt.cons(pmt.PMT_NIL, data) + + self.pdu_source = blocks.message_strobe(pdu_msg, 500) + self.pdu_send = network.socket_pdu("TCP_SERVER", "localhost", port, mtu) + self.pdu_recv = network.socket_pdu("TCP_CLIENT", "localhost", port, mtu) + self.pdu_sink = blocks.message_debug() + + self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus") + self.tb.msg_connect(self.pdu_recv, "pdus", self.pdu_sink, "store") + + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + + received = self.pdu_sink.get_message(0) + received_data = pmt.cdr(received) + msg_data = [] + for i in range(mtu): + msg_data.append(pmt.u8vector_ref(received_data, i)) + self.assertEqual(srcdata, tuple(msg_data)) + + +if __name__ == '__main__': + gr_unittest.run(qa_socket_pdu) |