diff options
Diffstat (limited to 'blocklib/zeromq')
46 files changed, 0 insertions, 1978 deletions
diff --git a/blocklib/zeromq/.gitignore b/blocklib/zeromq/.gitignore deleted file mode 100644 index 25d053a52..000000000 --- a/blocklib/zeromq/.gitignore +++ /dev/null @@ -1 +0,0 @@ -meson.build
\ No newline at end of file diff --git a/blocklib/zeromq/include/gnuradio/zeromq/.gitignore b/blocklib/zeromq/include/gnuradio/zeromq/.gitignore deleted file mode 100644 index d53050d7d..000000000 --- a/blocklib/zeromq/include/gnuradio/zeromq/.gitignore +++ /dev/null @@ -1 +0,0 @@ -!meson.build diff --git a/blocklib/zeromq/include/gnuradio/zeromq/api.h b/blocklib/zeromq/include/gnuradio/zeromq/api.h deleted file mode 100644 index 29941d0f4..000000000 --- a/blocklib/zeromq/include/gnuradio/zeromq/api.h +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2011 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#pragma once - -#include <gnuradio/attributes.h> - -#ifdef gnuradio_blocks_EXPORTS -#define ZEROMQ_API __GR_ATTR_EXPORT -#else -#define ZEROMQ_API __GR_ATTR_IMPORT -#endif diff --git a/blocklib/zeromq/include/gnuradio/zeromq/meson.build b/blocklib/zeromq/include/gnuradio/zeromq/meson.build deleted file mode 100644 index 9bb4be16a..000000000 --- a/blocklib/zeromq/include/gnuradio/zeromq/meson.build +++ /dev/null @@ -1,25 +0,0 @@ -zeromq_headers = [ - 'api.h' -] - -install_headers(zeromq_headers, subdir : 'gnuradio/zeromq') - -cmake_conf = configuration_data() -cmake_conf.set('libdir', join_paths(prefix,get_option('libdir'))) -cmake_conf.set('module', 'zeromq') -cmake.configure_package_config_file( - name : 'gnuradio-zeromq', - input : join_paths(meson.source_root(),'cmake','Modules','gnuradioConfigModule.cmake.in'), - install_dir : get_option('prefix') / get_option('libdir') / 'cmake' / 'gnuradio', - configuration : cmake_conf -) - -pkg = import('pkgconfig') -libs = [] # the library/libraries users need to link against -h = ['.'] # subdirectories of ${prefix}/${includedir} to add to header path -pkg.generate(libraries : libs, - subdirs : h, - version : meson.project_version(), - name : 'libgnuradio-zeromq', - filebase : 'gnuradio-zeromq', - description : 'GNU Radio ZeroMQ Blocks') diff --git a/blocklib/zeromq/lib/.gitignore b/blocklib/zeromq/lib/.gitignore deleted file mode 100644 index 01ecb66ff..000000000 --- a/blocklib/zeromq/lib/.gitignore +++ /dev/null @@ -1 +0,0 @@ -!meson.build
\ No newline at end of file diff --git a/blocklib/zeromq/lib/base.cc b/blocklib/zeromq/lib/base.cc deleted file mode 100644 index 10a07cfa3..000000000 --- a/blocklib/zeromq/lib/base.cc +++ /dev/null @@ -1,236 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2016,2019 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "base.h" -#include "tag_headers.h" - -namespace { -constexpr int LINGER_DEFAULT = 1000; // 1 second. -} - -namespace gr { -namespace zeromq { - -base::base(int type, size_t itemsize, int timeout, bool pass_tags, const std::string& key) - : d_context(1), - d_socket(d_context, type), - d_vsize(itemsize), - d_timeout(timeout), - d_pass_tags(pass_tags), - d_key(key) -{ - /* "Fix" timeout value (ms for new API, us for old API) */ - int major, minor, patch; - zmq::version(&major, &minor, &patch); - - if (major < 3) { - d_timeout *= 1000; - } -} - -std::string base::last_endpoint() const -{ - return d_socket.get(zmq::sockopt::last_endpoint); -} - - -base_sink::base_sink(int type, - size_t itemsize, - const std::string& address, - int timeout, - bool pass_tags, - int hwm, - const std::string& key) - : base(type, itemsize, timeout, pass_tags, key) -{ - gr::configure_default_loggers(d_base_logger, d_base_debug_logger, "zmq_base_source"); - - /* Set high watermark */ - if (hwm >= 0) { - d_socket.set(zmq::sockopt::sndhwm, hwm); - } - - /* Set ZMQ_LINGER so socket won't infinitely block during teardown */ - d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT); - - /* Bind */ - d_socket.bind(address); -} - -int base_sink::send_message(const void* in_buf, - const int in_nitems, - const uint64_t in_offset, - const std::vector<tag_t>& tags) -{ - /* Send key if it exists */ - if (!d_key.empty()) { - zmq::message_t key_message(d_key.size()); - memcpy(key_message.data(), d_key.data(), d_key.size()); - d_socket.send(key_message, zmq::send_flags::sndmore); - } - /* Meta-data header */ - std::string header(""); - if (d_pass_tags) { - // std::vector<gr::tag_t> tags; - // get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems); - header = gen_tag_header(in_offset, tags); - } - - /* Create message */ - size_t payload_len = in_nitems * d_vsize; - size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; - zmq::message_t msg(msg_len); - - if (d_pass_tags) { - memcpy(msg.data(), header.c_str(), header.length()); - memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len); - } - else { - memcpy(msg.data(), in_buf, payload_len); - } - - /* Send */ - d_socket.send(msg, zmq::send_flags::none); - - /* Report back */ - return in_nitems; -} - -base_source::base_source(int type, - size_t itemsize, - const std::string& address, - int timeout, - bool pass_tags, - int hwm, - const std::string& key) - : base(type, itemsize, timeout, pass_tags, key), - d_consumed_bytes(0), - d_consumed_items(0) -{ - /* Set high watermark */ - if (hwm >= 0) { - d_socket.set(zmq::sockopt::rcvhwm, hwm); - } - - /* Set ZMQ_LINGER so socket won't infinitely block during teardown */ - d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT); - - /* Connect */ - d_socket.connect(address); -} - -bool base_source::has_pending() { return d_msg.size() > d_consumed_bytes; } - -int base_source::flush_pending(block_work_output& work_output, - const int out_nitems, - const uint64_t out_offset) -{ - /* How much to copy in this call */ - int to_copy_items = - std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); - int to_copy_bytes = d_vsize * to_copy_items; - auto nw = work_output.nitems_written(); - - /* Copy actual data */ - memcpy(work_output.items<uint8_t>() + out_offset * d_vsize, - (uint8_t*)d_msg.data() + d_consumed_bytes, - to_copy_bytes); - - /* Add tags matching this segment of samples */ - for (unsigned int i = 0; i < d_tags.size(); i++) { - if ((d_tags[i].offset() >= (uint64_t)d_consumed_items) && - (d_tags[i].offset() < (uint64_t)d_consumed_items + to_copy_items)) { - gr::tag_t nt = d_tags[i]; - nt.set_offset(nt.offset() + nw + out_offset - d_consumed_items); - work_output.add_tag(nt); - } - } - - /* Update pointer */ - d_consumed_items += to_copy_items; - d_consumed_bytes += to_copy_bytes; - - return to_copy_items; -} - -bool base_source::load_message(bool wait) -{ - /* Poll for input */ - zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, std::chrono::milliseconds{ wait ? d_timeout : 0 }); - - if (!(items[0].revents & ZMQ_POLLIN)) - return false; - - /* Is this the start or continuation of a multi-part message? */ - auto more = d_socket.get(zmq::sockopt::rcvmore); - - /* Reset */ - d_msg.rebuild(); - d_tags.clear(); - d_consumed_items = 0; - d_consumed_bytes = 0; - - /* Get the message */ - const bool ok = bool(d_socket.recv(d_msg)); - - if (!ok) { - // This shouldn't happen since we polled POLLIN, but ZMQ wants us to check - // the return value. - d_base_logger->warn("Failed to recv() message."); - return false; - } - - /* Throw away key and get the first message. Avoid blocking if a multi-part - * message is not sent */ - if (!d_key.empty() && !more) { - auto is_multipart = d_socket.get(zmq::sockopt::rcvmore); - - d_msg.rebuild(); - if (is_multipart) { - const bool multi_ok = bool(d_socket.recv(d_msg)); - - if (!multi_ok) { - d_base_logger->error("Failure to receive multi-part message."); - } - } - else { - return false; - } - } - /* Parse header from the first (or only) message of a multi-part message */ - if (d_pass_tags && !more) { - uint64_t rcv_offset; - - /* Parse header */ - d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags); - - /* Fixup the tags offset to be relative to the start of this message */ - for (unsigned int i = 0; i < d_tags.size(); i++) { - d_tags[i].set_offset(d_tags[i].offset() - rcv_offset); - } - } - - /* Each message must contain an integer multiple of data vectors */ - if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) { - throw std::runtime_error("Incompatible vector sizes: need a multiple of " + - std::to_string(d_vsize) + " bytes per message"); - } - - /* We got one ! */ - return true; -} - -} /* namespace zeromq */ -} /* namespace gr */ diff --git a/blocklib/zeromq/lib/base.h b/blocklib/zeromq/lib/base.h deleted file mode 100644 index 7fb6d2ccb..000000000 --- a/blocklib/zeromq/lib/base.h +++ /dev/null @@ -1,89 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2016,2019 Free Software Foundation, Inc. - * Copyright 2022 Josh Morman - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#pragma once - -#include "zmq_common_impl.h" -#include <gnuradio/block_work_io.h> -#include <gnuradio/logger.h> -#include <gnuradio/tag.h> - -namespace gr { -namespace zeromq { - -class base -{ -public: - base(int type, - size_t itemsize, - int timeout, - bool pass_tags, - const std::string& key = ""); - - void set_vsize(size_t vsize) { d_vsize = vsize; } - -protected: - std::string last_endpoint() const; - zmq::context_t d_context; - zmq::socket_t d_socket; - size_t d_vsize; - int d_timeout; - bool d_pass_tags; - const std::string d_key; - - logger_ptr d_base_logger; - logger_ptr d_base_debug_logger; -}; - -class base_sink : public base -{ -public: - base_sink(int type, - size_t itemsize, - const std::string& address, - int timeout, - bool pass_tags, - int hwm, - const std::string& key = ""); - -protected: - int send_message(const void* in_buf, - const int in_nitems, - const uint64_t in_offset, - const std::vector<tag_t>& tags); -}; - -class base_source : public base -{ -public: - base_source(int type, - size_t itemsize, - const std::string& address, - int timeout, - bool pass_tags, - int hwm, - const std::string& key = ""); - -protected: - zmq::message_t d_msg; - std::vector<gr::tag_t> d_tags; - size_t d_consumed_bytes; - int d_consumed_items; - - bool has_pending(); - int flush_pending(block_work_output& work_output, - const int out_nitems, - const uint64_t out_offset); - bool load_message(bool wait); -}; - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/lib/meson.build b/blocklib/zeromq/lib/meson.build deleted file mode 100644 index c3f036763..000000000 --- a/blocklib/zeromq/lib/meson.build +++ /dev/null @@ -1,36 +0,0 @@ -zeromq_deps += [gnuradio_gr_dep, volk_dep, fmt_dep, pmtf_dep, cppzmq_dep] -zeromq_sources += ['base.cc', 'tag_headers.cc'] -block_cpp_args = ['-DHAVE_CPU'] - -incdir = include_directories(['../include/gnuradio/zeromq','../include']) -gnuradio_blocklib_zeromq_lib = library('gnuradio-blocklib-zeromq', - zeromq_sources, - include_directories : incdir, - install : true, - link_language: 'cpp', - dependencies : zeromq_deps, - cpp_args : block_cpp_args) - -gnuradio_blocklib_zeromq_dep = declare_dependency(include_directories : incdir, - link_with : gnuradio_blocklib_zeromq_lib, - dependencies : zeromq_deps) - -cmake_conf = configuration_data() -cmake_conf.set('libdir', join_paths(prefix,get_option('libdir'))) -cmake_conf.set('module', 'zeromq') -cmake.configure_package_config_file( - name : 'gnuradio-zeromq', - input : join_paths(meson.source_root(),'cmake','Modules','gnuradioConfigModule.cmake.in'), - install_dir : get_option('prefix') / get_option('libdir') / 'cmake' / 'gnuradio', - configuration : cmake_conf -) - -pkg = import('pkgconfig') -libs = [gnuradio_blocklib_zeromq_lib] # the library/libraries users need to link against -h = ['.'] # subdirectories of ${prefix}/${includedir} to add to header path -pkg.generate(libraries : libs, - subdirs : h, - version : meson.project_version(), - name : 'libgnuradio-zeromq', - filebase : 'gnuradio-zeromq', - description : 'GNU Radio ZeroMQ Module') diff --git a/blocklib/zeromq/lib/tag_headers.cc b/blocklib/zeromq/lib/tag_headers.cc deleted file mode 100644 index fd814bead..000000000 --- a/blocklib/zeromq/lib/tag_headers.cc +++ /dev/null @@ -1,89 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2014 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#include "zmq_common_impl.h" -#include <gnuradio/tag.h> -#include <cstring> -#include <sstream> - -#define GR_HEADER_MAGIC 0x5FF0 -#define GR_HEADER_VERSION 0x02 - -namespace gr { -namespace zeromq { - -struct membuf : std::streambuf { - membuf(void* b, size_t len) - { - char* bc = static_cast<char*>(b); - this->setg(bc, bc, bc + len); - } -}; - -std::string gen_tag_header(uint64_t offset, const std::vector<gr::tag_t>& tags) -{ - std::stringbuf sb(""); - std::ostream ss(&sb); - - uint16_t header_magic = GR_HEADER_MAGIC; - uint8_t header_version = GR_HEADER_VERSION; - uint64_t ntags = (uint64_t)tags.size(); - - ss.write((const char*)&header_magic, sizeof(uint16_t)); - ss.write((const char*)&header_version, sizeof(uint8_t)); - ss.write((const char*)&offset, sizeof(uint64_t)); - ss.write((const char*)&ntags, sizeof(uint64_t)); - - for (auto& tag : tags) { - tag.serialize(sb); - } - - return sb.str(); -} - -size_t parse_tag_header(zmq::message_t& msg, - uint64_t& offset_out, - std::vector<gr::tag_t>& tags_out) -{ - membuf sb(msg.data(), msg.size()); - std::istream iss(&sb); - - size_t min_len = - sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t); - if (msg.size() < min_len) - throw std::runtime_error("incoming zmq msg too small to hold gr tag header!"); - - uint16_t header_magic; - uint8_t header_version; - uint64_t rcv_ntags; - - iss.read((char*)&header_magic, sizeof(uint16_t)); - iss.read((char*)&header_version, sizeof(uint8_t)); - - if (header_magic != GR_HEADER_MAGIC) - throw std::runtime_error("gr header magic does not match!"); - - if (header_version != GR_HEADER_VERSION) - throw std::runtime_error("gr header version does not match!"); - - iss.read((char*)&offset_out, sizeof(uint64_t)); - iss.read((char*)&rcv_ntags, sizeof(uint64_t)); - - for (size_t i = 0; i < rcv_ntags; i++) { - gr::tag_t newtag = tag_t::deserialize(sb); - tags_out.push_back(newtag); - } - - return msg.size() - sb.in_avail(); -} -} /* namespace zeromq */ -} /* namespace gr */ - -// vim: ts=2 sw=2 expandtab diff --git a/blocklib/zeromq/lib/tag_headers.h b/blocklib/zeromq/lib/tag_headers.h deleted file mode 100644 index b8b7a09b1..000000000 --- a/blocklib/zeromq/lib/tag_headers.h +++ /dev/null @@ -1,28 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2014 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#ifndef ZEROMQ_TAG_HEADERS_H -#define ZEROMQ_TAG_HEADERS_H - -#include "zmq_common_impl.h" -#include <gnuradio/tag.h> - -namespace gr { -namespace zeromq { - -std::string gen_tag_header(uint64_t offset, const std::vector<gr::tag_t>& tags); -size_t parse_tag_header(zmq::message_t& msg, - uint64_t& offset_out, - std::vector<gr::tag_t>& tags_out); - -} /* namespace zeromq */ -} /* namespace gr */ - -#endif /* ZEROMQ_TAG_HEADERS_H */ diff --git a/blocklib/zeromq/lib/zmq_common_impl.h b/blocklib/zeromq/lib/zmq_common_impl.h deleted file mode 100644 index fc08ef301..000000000 --- a/blocklib/zeromq/lib/zmq_common_impl.h +++ /dev/null @@ -1,13 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2019 Free Software Foundation, Inc. - * Copyright 2022 Josh Morman - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ -#pragma once - -#include <zmq.hpp> diff --git a/blocklib/zeromq/pub_sink/pub_sink.yml b/blocklib/zeromq/pub_sink/pub_sink.yml deleted file mode 100644 index f65b4a70f..000000000 --- a/blocklib/zeromq/pub_sink/pub_sink.yml +++ /dev/null @@ -1,55 +0,0 @@ -module: zeromq -block: pub_sink -label: PUB Sink -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' - -# Example Parameters -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: pass_tags - label: Pass Tags - dtype: bool - settable: false - default: "false" - - id: hwm - label: HWM - dtype: int - settable: false - default: -1 - - id: key - label: Key - dtype: string - settable: false - default: "\"\"" - - id: itemsize - label: Item Size - dtype: size - settable: false - default: 0 - -callbacks: -- id: last_endpoint - return: std::string - const: true - # inherited: true - -ports: - - domain: stream - id: in - direction: input - type: untyped - size: parameters/itemsize - -implementations: -- id: cpu - -file_format: 1 diff --git a/blocklib/zeromq/pub_sink/pub_sink_cpu.cc b/blocklib/zeromq/pub_sink/pub_sink_cpu.cc deleted file mode 100644 index 2a5d9d3f5..000000000 --- a/blocklib/zeromq/pub_sink/pub_sink_cpu.cc +++ /dev/null @@ -1,33 +0,0 @@ -#include "pub_sink_cpu.h" -#include "pub_sink_cpu_gen.h" - -namespace gr { -namespace zeromq { - -pub_sink_cpu::pub_sink_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - base_sink(ZMQ_PUB, - args.itemsize, - args.address, - args.timeout, - args.pass_tags, - args.hwm, - args.key) -{ -} - -work_return_t pub_sink_cpu::work(work_io& wio) -{ - auto noutput_items = wio.inputs()[0].n_items; - auto nread = wio.inputs()[0].nitems_read(); - auto nsent = send_message(wio.inputs()[0].raw_items(), - noutput_items, - nread, - wio.inputs()[0].tags_in_window(0, noutput_items)); - wio.consume_each(nsent); - return work_return_t::OK; -} - - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/pub_sink/pub_sink_cpu.h b/blocklib/zeromq/pub_sink/pub_sink_cpu.h deleted file mode 100644 index e55d125e7..000000000 --- a/blocklib/zeromq/pub_sink/pub_sink_cpu.h +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include "base.h" -#include <gnuradio/zeromq/pub_sink.h> - -namespace gr { -namespace zeromq { - -class pub_sink_cpu : public virtual pub_sink, public virtual base_sink -{ -public: - pub_sink_cpu(block_args args); - work_return_t work(work_io&) override; - std::string last_endpoint() const override { return base_sink::last_endpoint(); } - - // Since vsize can be set as 0, then inferred on flowgraph init, set it during start() - bool start() override - { - set_vsize(this->input_stream_ports()[0]->itemsize()); - return pub_sink::start(); - } - -private: - // private variables here -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/pull_msg_source/pull_msg_source.yml b/blocklib/zeromq/pull_msg_source/pull_msg_source.yml deleted file mode 100644 index 3cde228af..000000000 --- a/blocklib/zeromq/pull_msg_source/pull_msg_source.yml +++ /dev/null @@ -1,37 +0,0 @@ -module: zeromq -block: pull_msg_source -label: Pull Msg Source -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' - -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: bind - label: Bind - dtype: bool - settable: false - default: "false" - -callbacks: -- id: last_endpoint - return: std::string - const: true - -ports: -- domain: message - id: out - direction: output - -implementations: -- id: cpu -# - id: cuda - -file_format: 1
\ No newline at end of file diff --git a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc b/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc deleted file mode 100644 index 94c81afb6..000000000 --- a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc +++ /dev/null @@ -1,96 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014,2019 Free Software Foundation, Inc. - * Copyright 2022 Josh Morman - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#include "pull_msg_source_cpu.h" -#include "pull_msg_source_cpu_gen.h" - -#include <chrono> - -namespace { -constexpr int LINGER_DEFAULT = 1000; // 1 second. -} - -namespace gr { -namespace zeromq { - -pull_msg_source_cpu::pull_msg_source_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - d_timeout(args.timeout), - d_context(1), - d_socket(d_context, ZMQ_PULL) -{ - - d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT); - - if (args.bind) { - d_socket.bind(args.address); - } - else { - d_socket.connect(args.address); - } -} - -bool pull_msg_source_cpu::start() -{ - d_finished = false; - d_thread = std::thread([this] { readloop(); }); - return block::start(); -} - -bool pull_msg_source_cpu::stop() -{ - d_finished = true; - if (d_thread.joinable()) { - d_thread.join(); - } - return block::stop(); -} - -void pull_msg_source_cpu::readloop() -{ - using namespace std::chrono_literals; - while (!d_finished) { - - zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, std::chrono::milliseconds{ d_timeout }); - - // If we got a reply, process - if (items[0].revents & ZMQ_POLLIN) { - - // Receive data - zmq::message_t msg; - const bool ok = bool(d_socket.recv(msg)); - - if (!ok) { - // Should not happen, we've checked POLLIN. - d_logger->error("Failed to receive message."); - std::this_thread::sleep_for(100us); - continue; - } - - std::string buf(static_cast<char*>(msg.data()), msg.size()); - std::stringbuf sb(buf); - try { - auto m = pmtf::pmt::deserialize(sb); - d_msg_out->post(m); - } catch (...) { // Take out PMT specific exception for now - d_logger->error("Invalid PMT message"); - } - } - else { - std::this_thread::sleep_for(100us); - } - } -} - - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h b/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h deleted file mode 100644 index a31704ffd..000000000 --- a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h +++ /dev/null @@ -1,45 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014,2019 Free Software Foundation, Inc. - * Copyright 2022 Josh Morman - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#pragma once - -#include <gnuradio/zeromq/pull_msg_source.h> - -#include <zmq.hpp> -#include <thread> - -namespace gr { -namespace zeromq { - -class pull_msg_source_cpu : public virtual pull_msg_source -{ -public: - pull_msg_source_cpu(block_args args); - bool start() override; - bool stop() override; - std::string last_endpoint() const override - { - return d_socket.get(zmq::sockopt::last_endpoint); - } - -private: - bool d_finished; - int d_timeout; // microseconds, -1 is blocking - zmq::context_t d_context; - zmq::socket_t d_socket; - std::thread d_thread; - - - void readloop(); -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/pull_source/pull_source.yml b/blocklib/zeromq/pull_source/pull_source.yml deleted file mode 100644 index 491211407..000000000 --- a/blocklib/zeromq/pull_source/pull_source.yml +++ /dev/null @@ -1,43 +0,0 @@ -module: zeromq -block: pull_source -label: PULL Source -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' -# inherits: base - -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: pass_tags - label: Pass Tags - dtype: bool - settable: false - default: "false" - - id: hwm - label: HWM - dtype: int - settable: false - default: -1 - - id: itemsize - label: Item Size - dtype: size - settable: false - default: 0 -ports: - - domain: stream - id: out - direction: output - type: untyped - size: parameters/itemsize - -implementations: - - id: cpu - -file_format: 1 diff --git a/blocklib/zeromq/pull_source/pull_source_cpu.cc b/blocklib/zeromq/pull_source/pull_source_cpu.cc deleted file mode 100644 index 6331db2cc..000000000 --- a/blocklib/zeromq/pull_source/pull_source_cpu.cc +++ /dev/null @@ -1,53 +0,0 @@ -#include "pull_source_cpu.h" -#include "pull_source_cpu_gen.h" - -#include <chrono> -#include <thread> - -namespace gr { -namespace zeromq { - -pull_source_cpu::pull_source_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - base_source( - ZMQ_PULL, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm) -{ -} - -work_return_t pull_source_cpu::work(work_io& wio) -{ - - auto noutput_items = wio.outputs()[0].n_items; - bool first = true; - size_t done = 0; - - /* Process as much as we can */ - while (1) { - if (has_pending()) { - /* Flush anything pending */ - done += flush_pending(wio.outputs()[0], noutput_items - done, done); - - /* No more space ? */ - if (done == noutput_items) - break; - } - else { - /* Try to get the next message */ - if (!load_message(first)) { - // Launch a thread to come back and try again some time later - come_back_later(100); - break; /* No message, we're done for now */ - } - - /* Not the first anymore */ - first = false; - } - } - - wio.outputs()[0].n_produced = done; - return work_return_t::OK; -} - - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/pull_source/pull_source_cpu.h b/blocklib/zeromq/pull_source/pull_source_cpu.h deleted file mode 100644 index 157a9ba45..000000000 --- a/blocklib/zeromq/pull_source/pull_source_cpu.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include "base.h" -#include <gnuradio/zeromq/pull_source.h> - -namespace gr { -namespace zeromq { - -class pull_source_cpu : public virtual pull_source, public virtual base_source -{ -public: - pull_source_cpu(block_args args); - work_return_t work(work_io&) override; - - // Since vsize can be set as 0, then inferred on flowgraph init, set it during start() - bool start() override - { - set_vsize(this->output_stream_ports()[0]->itemsize()); - return pull_source::start(); - } -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/push_msg_sink/push_msg_sink.yml b/blocklib/zeromq/push_msg_sink/push_msg_sink.yml deleted file mode 100644 index 172631951..000000000 --- a/blocklib/zeromq/push_msg_sink/push_msg_sink.yml +++ /dev/null @@ -1,38 +0,0 @@ -module: zeromq -block: push_msg_sink -label: Push Msg Sink -blocktype: block -category: '[Core]/ZeroMQ Interfaces' - -# Example Parameters -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: bind - label: Bind - dtype: bool - settable: false - default: "true" - -callbacks: -- id: last_endpoint - return: std::string - const: true - -ports: -- domain: message - id: in - direction: input - -implementations: -- id: cpu -# - id: cuda - -file_format: 1 diff --git a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc b/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc deleted file mode 100644 index dd044868d..000000000 --- a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc +++ /dev/null @@ -1,50 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014 Free Software Foundation, Inc. - * Copyright 2022 Josh Morman - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#include "push_msg_sink_cpu.h" -#include "push_msg_sink_cpu_gen.h" - -namespace { -constexpr int LINGER_DEFAULT = 1000; // 1 second. -} - -namespace gr { -namespace zeromq { - -push_msg_sink_cpu::push_msg_sink_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - d_timeout(args.timeout), - d_context(1), - d_socket(d_context, ZMQ_PUSH) -{ - - d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT); - - if (args.bind) { - d_socket.bind(args.address); - } - else { - d_socket.connect(args.address); - } -} - -void push_msg_sink_cpu::handle_msg_in(pmtf::pmt msg) -{ - std::stringbuf sb(""); - msg.serialize(sb); - std::string s = sb.str(); - zmq::message_t zmsg(s.size()); - memcpy(zmsg.data(), s.c_str(), s.size()); - d_socket.send(zmsg, zmq::send_flags::none); -} - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h b/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h deleted file mode 100644 index ad8e01d4e..000000000 --- a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h +++ /dev/null @@ -1,40 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014 Free Software Foundation, Inc. - * Copyright 2022 Josh Morman - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#pragma once - -#include <gnuradio/zeromq/push_msg_sink.h> - -#include <zmq.hpp> - -namespace gr { -namespace zeromq { - -class push_msg_sink_cpu : public virtual push_msg_sink -{ -public: - push_msg_sink_cpu(block_args args); - - std::string last_endpoint() const override - { - return d_socket.get(zmq::sockopt::last_endpoint); - } - -private: - int d_timeout; // microseconds, -1 is blocking - zmq::context_t d_context; - zmq::socket_t d_socket; - - void handle_msg_in(pmtf::pmt msg) override; -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/push_sink/push_sink.yml b/blocklib/zeromq/push_sink/push_sink.yml deleted file mode 100644 index 9e45debb1..000000000 --- a/blocklib/zeromq/push_sink/push_sink.yml +++ /dev/null @@ -1,51 +0,0 @@ -module: zeromq -block: push_sink -label: PUSH Sink -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' -# inherits: gr::zeromq::base -# includes: -# - gnuradio/zeromq/base.h - -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: pass_tags - label: Pass Tags - dtype: bool - settable: false - default: "false" - - id: hwm - label: HWM - dtype: int - settable: false - default: -1 - - id: itemsize - label: Item Size - dtype: size - settable: false - default: 0 -callbacks: -- id: last_endpoint - return: std::string - const: true - # inherited: true - -ports: - - domain: stream - id: in - direction: input - type: untyped - size: parameters/itemsize - -implementations: - - id: cpu - -file_format: 1 diff --git a/blocklib/zeromq/push_sink/push_sink_cpu.cc b/blocklib/zeromq/push_sink/push_sink_cpu.cc deleted file mode 100644 index 85de05df0..000000000 --- a/blocklib/zeromq/push_sink/push_sink_cpu.cc +++ /dev/null @@ -1,34 +0,0 @@ -#include "push_sink_cpu.h" -#include "push_sink_cpu_gen.h" - -namespace gr { -namespace zeromq { - -push_sink_cpu::push_sink_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - base_sink( - ZMQ_PUSH, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm) -{ -} -work_return_t push_sink_cpu::work(work_io& wio) - -{ - // Poll with a timeout (FIXME: scheduler can't wait for us) - zmq::pollitem_t itemsout[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLOUT, 0 } }; - zmq::poll(&itemsout[0], 1, std::chrono::milliseconds{ d_timeout }); - - // If we can send something, do it - if (itemsout[0].revents & ZMQ_POLLOUT) { - wio.inputs()[0].n_consumed = - send_message(wio.inputs()[0].raw_items(), - wio.inputs()[0].n_items, - wio.inputs()[0].nitems_read(), - wio.inputs()[0].tags_in_window(0, wio.inputs()[0].n_items)); - } - - return work_return_t::OK; -} - - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/push_sink/push_sink_cpu.h b/blocklib/zeromq/push_sink/push_sink_cpu.h deleted file mode 100644 index f58102cfa..000000000 --- a/blocklib/zeromq/push_sink/push_sink_cpu.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include "base.h" -#include <gnuradio/zeromq/push_sink.h> - -namespace gr { -namespace zeromq { - -class push_sink_cpu : public virtual push_sink, public virtual base_sink -{ -public: - push_sink_cpu(block_args args); - work_return_t work(work_io&) override; - std::string last_endpoint() const override { return base_sink::last_endpoint(); } - - // Since vsize can be set as 0, then inferred on flowgraph init, set it during start() - bool start() override - { - set_vsize(this->input_stream_ports()[0]->itemsize()); - return push_sink::start(); - } -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/python/gnuradio/zeromq/.gitignore b/blocklib/zeromq/python/gnuradio/zeromq/.gitignore deleted file mode 100644 index 25d053a52..000000000 --- a/blocklib/zeromq/python/gnuradio/zeromq/.gitignore +++ /dev/null @@ -1 +0,0 @@ -meson.build
\ No newline at end of file diff --git a/blocklib/zeromq/python/gnuradio/zeromq/__init__.py b/blocklib/zeromq/python/gnuradio/zeromq/__init__.py deleted file mode 100644 index ee01dd5ab..000000000 --- a/blocklib/zeromq/python/gnuradio/zeromq/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ - -import os - -try: - from .zeromq_python import * -except ImportError: - dirname, filename = os.path.split(os.path.abspath(__file__)) - __path__.append(os.path.join(dirname, "bindings")) - from .zeromq_python import * diff --git a/blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore b/blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore deleted file mode 100644 index 01ecb66ff..000000000 --- a/blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore +++ /dev/null @@ -1 +0,0 @@ -!meson.build
\ No newline at end of file diff --git a/blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc b/blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc deleted file mode 100644 index 59d6b86a8..000000000 --- a/blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2020 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -/***********************************************************************************/ -/* This file is automatically generated using bindtool and can be manually edited */ -/* The following lines can be configured to regenerate this file during cmake */ -/* If manual edits are made, the following tags should be modified accordingly. */ -/* BINDTOOL_GEN_AUTOMATIC(0) */ -/* BINDTOOL_USE_PYGCCXML(0) */ -/* BINDTOOL_HEADER_FILE(firdes.h) */ -/* BINDTOOL_HEADER_FILE_HASH(10cf0c4b9664ba7e2931c2375c13c68c) */ -/***********************************************************************************/ - -#include <pybind11/complex.h> -#include <pybind11/pybind11.h> -#include <pybind11/stl.h> - -namespace py = pybind11; - -#include <gnuradio/zeromq/base.h> - -void bind_base(py::module& m) -{ - using base = gr::zeromq::base; - using base_source = gr::zeromq::base_source; - using base_sink = gr::zeromq::base_sink; - - py::class_<base, std::shared_ptr<base>> base_class(m, "base"); - py::class_<base_source, std::shared_ptr<base_source>> base_class(m, "base_source"); - py::class_<base_sink, std::shared_ptr<base_sink>> base_class(m, "base_sink"); - - base.def("last_endpoint", &base::last_endpoint) -} diff --git a/blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build b/blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build deleted file mode 100644 index 8a830aaec..000000000 --- a/blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build +++ /dev/null @@ -1,2 +0,0 @@ -# zeromq_pybind_sources = [files('base_pybind.cc')] + zeromq_pybind_sources -# zeromq_pybind_names = ['base'] + zeromq_pybind_names
\ No newline at end of file diff --git a/blocklib/zeromq/rep_sink/rep_sink.yml b/blocklib/zeromq/rep_sink/rep_sink.yml deleted file mode 100644 index 2d6a7647e..000000000 --- a/blocklib/zeromq/rep_sink/rep_sink.yml +++ /dev/null @@ -1,51 +0,0 @@ -module: zeromq -block: rep_sink -label: REP Sink -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' -# inherits: gr::zeromq::base -# includes: -# - gnuradio/zeromq/base.h - -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: pass_tags - label: Pass Tags - dtype: bool - settable: false - default: "false" - - id: hwm - label: HWM - dtype: int - settable: false - default: -1 - - id: itemsize - label: Item Size - dtype: size - settable: false - default: 0 -callbacks: -- id: last_endpoint - return: std::string - const: true - # inherited: true - -ports: - - domain: stream - id: in - direction: input - type: untyped - size: parameters/itemsize - -implementations: - - id: cpu - -file_format: 1 diff --git a/blocklib/zeromq/rep_sink/rep_sink_cpu.cc b/blocklib/zeromq/rep_sink/rep_sink_cpu.cc deleted file mode 100644 index 7ad80d471..000000000 --- a/blocklib/zeromq/rep_sink/rep_sink_cpu.cc +++ /dev/null @@ -1,74 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014,2019 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#include "rep_sink_cpu.h" -#include "rep_sink_cpu_gen.h" - -namespace gr { -namespace zeromq { - -rep_sink_cpu::rep_sink_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - base_sink( - ZMQ_REP, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm) -{ -} -work_return_t rep_sink_cpu::work(work_io& wio) -{ - auto in = wio.inputs()[0].items<uint8_t>(); - auto noutput_items = wio.inputs()[0].n_items; - bool first = true; - int done = 0; - - /* Process as much as we can */ - while (1) { - /* Wait for a small time (FIXME: scheduler can't wait for us) */ - /* We only wait if its the first iteration, for the others we'll - * let the scheduler retry */ - zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, std::chrono::milliseconds{ (first ? d_timeout : 0) }); - - /* If we don't have anything, we're done */ - if (!(items[0].revents & ZMQ_POLLIN)) - break; - - /* Get and parse the request */ - zmq::message_t request; - bool ok = bool(d_socket.recv(request)); - - if (!ok) { - // Should not happen, we've checked POLLIN. - d_logger->error("Failed to receive message."); - break; - } - - int nitems_send = noutput_items - done; - if (request.size() >= sizeof(uint32_t)) { - int req = (int)*(static_cast<uint32_t*>(request.data())); - nitems_send = std::min(nitems_send, req); - } - - /* Delegate the actual send */ - done += send_message(in + (done * d_vsize), - nitems_send, - wio.inputs()[0].nitems_read() + done, - wio.inputs()[0].tags_in_window(0, noutput_items)); - - /* Not the first anymore */ - first = false; - } - - wio.inputs()[0].n_consumed = done; - return work_return_t::OK; -} - - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/rep_sink/rep_sink_cpu.h b/blocklib/zeromq/rep_sink/rep_sink_cpu.h deleted file mode 100644 index 98d106c4a..000000000 --- a/blocklib/zeromq/rep_sink/rep_sink_cpu.h +++ /dev/null @@ -1,35 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014,2019 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#pragma once - -#include "base.h" -#include <gnuradio/zeromq/rep_sink.h> - -namespace gr { -namespace zeromq { - -class rep_sink_cpu : public virtual rep_sink, public virtual base_sink -{ -public: - rep_sink_cpu(block_args args); - work_return_t work(work_io&) override; - std::string last_endpoint() const override { return base_sink::last_endpoint(); } - - // Since vsize can be set as 0, then inferred on flowgraph init, set it during start() - bool start() override - { - set_vsize(this->input_stream_ports()[0]->itemsize()); - return rep_sink::start(); - } -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/req_source/req_source.yml b/blocklib/zeromq/req_source/req_source.yml deleted file mode 100644 index 8980ec8f3..000000000 --- a/blocklib/zeromq/req_source/req_source.yml +++ /dev/null @@ -1,43 +0,0 @@ -module: zeromq -block: req_source -label: REQ Source -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' -# inherits: base - -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: pass_tags - label: Pass Tags - dtype: bool - settable: false - default: "false" - - id: hwm - label: HWM - dtype: int - settable: false - default: -1 - - id: itemsize - label: Item Size - dtype: size - settable: false - default: 0 -ports: - - domain: stream - id: out - direction: output - type: untyped - size: parameters/itemsize - -implementations: - - id: cpu - -file_format: 1 diff --git a/blocklib/zeromq/req_source/req_source_cpu.cc b/blocklib/zeromq/req_source/req_source_cpu.cc deleted file mode 100644 index 2fc6f86b1..000000000 --- a/blocklib/zeromq/req_source/req_source_cpu.cc +++ /dev/null @@ -1,78 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#include "req_source_cpu.h" -#include "req_source_cpu_gen.h" - -#include <chrono> -#include <thread> - -namespace gr { -namespace zeromq { - -req_source_cpu::req_source_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - base_source( - ZMQ_REQ, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm) -{ -} - -work_return_t req_source_cpu::work(work_io& wio) -{ - - auto noutput_items = wio.outputs()[0].n_items; - bool first = true; - size_t done = 0; - - /* Process as much as we can */ - while (1) { - if (has_pending()) { - /* Flush anything pending */ - done += flush_pending(wio.outputs()[0], noutput_items - done, done); - - /* No more space ? */ - if (done == noutput_items) - break; - } - else { - - /* Send request if needed */ - if (!d_req_pending) { - /* The REP/REQ pattern state machine guarantees we can send at this point - */ - uint32_t req_len = noutput_items - done; - zmq::message_t request(sizeof(uint32_t)); - memcpy((void*)request.data(), &req_len, sizeof(uint32_t)); - d_socket.send(request, zmq::send_flags::none); - d_req_pending = true; - } - - /* Try to get the next message */ - if (!load_message(first)) { - // Launch a thread to come back and try again some time later - come_back_later(100); - break; /* No message, we're done for now */ - } - - /* Got response */ - d_req_pending = false; - - /* Not the first anymore */ - first = false; - } - } - - wio.outputs()[0].n_produced = done; - return work_return_t::OK; -} - - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/req_source/req_source_cpu.h b/blocklib/zeromq/req_source/req_source_cpu.h deleted file mode 100644 index 05cd2b6b8..000000000 --- a/blocklib/zeromq/req_source/req_source_cpu.h +++ /dev/null @@ -1,37 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2013,2014 Free Software Foundation, Inc. - * - * This file is part of GNU Radio. - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ - -#pragma once - -#include "base.h" -#include <gnuradio/zeromq/req_source.h> - -namespace gr { -namespace zeromq { - -class req_source_cpu : public virtual req_source, public virtual base_source -{ -public: - req_source_cpu(block_args args); - work_return_t work(work_io&) override; - - // Since vsize can be set as 0, then inferred on flowgraph init, set it during start() - bool start() override - { - set_vsize(this->output_stream_ports()[0]->itemsize()); - return req_source::start(); - } - -private: - bool d_req_pending = false; -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/sub_source/sub_source.yml b/blocklib/zeromq/sub_source/sub_source.yml deleted file mode 100644 index b461962c6..000000000 --- a/blocklib/zeromq/sub_source/sub_source.yml +++ /dev/null @@ -1,49 +0,0 @@ -module: zeromq -block: sub_source -label: SUB Source -blocktype: sync_block -category: '[Core]/ZeroMQ Interfaces' - -# Example Parameters -parameters: - - id: address - label: IP Address - dtype: string - settable: false - - id: timeout - label: Timeout - dtype: int - settable: false - default: 100 - - id: pass_tags - label: Pass Tags - dtype: bool - settable: false - default: "false" - - id: hwm - label: HWM - dtype: int - settable: false - default: -1 - - id: key - label: Key - dtype: string - settable: false - default: "\"\"" - - id: itemsize - label: Item Size - dtype: size - settable: false - default: 0 - -ports: - - domain: stream - id: out - direction: output - type: untyped - size: parameters/itemsize - -implementations: - - id: cpu - -file_format: 1
\ No newline at end of file diff --git a/blocklib/zeromq/sub_source/sub_source_cpu.cc b/blocklib/zeromq/sub_source/sub_source_cpu.cc deleted file mode 100644 index 47e8a05b4..000000000 --- a/blocklib/zeromq/sub_source/sub_source_cpu.cc +++ /dev/null @@ -1,60 +0,0 @@ -#include "sub_source_cpu.h" -#include "sub_source_cpu_gen.h" - -#include <chrono> -#include <thread> - -namespace gr { -namespace zeromq { - -sub_source_cpu::sub_source_cpu(block_args args) - : INHERITED_CONSTRUCTORS, - base_source(ZMQ_SUB, - args.itemsize, - args.address, - args.timeout, - args.pass_tags, - args.hwm, - args.key) -{ - - /* Subscribe */ - d_socket.set(zmq::sockopt::subscribe, args.key); -} - -work_return_t sub_source_cpu::work(work_io& wio) -{ - auto noutput_items = wio.outputs()[0].n_items; - bool first = true; - size_t done = 0; - - /* Process as much as we can */ - while (1) { - if (has_pending()) { - /* Flush anything pending */ - done += flush_pending(wio.outputs()[0], noutput_items - done, done); - - /* No more space ? */ - if (done == noutput_items) - break; - } - else { - /* Try to get the next message */ - if (!load_message(first)) { - // Launch a thread to come back and try again some time later - come_back_later(100); - break; /* No message, we're done for now */ - } - - /* Not the first anymore */ - first = false; - } - } - - wio.produce_each(done); - return work_return_t::OK; -} - - -} // namespace zeromq -} // namespace gr diff --git a/blocklib/zeromq/sub_source/sub_source_cpu.h b/blocklib/zeromq/sub_source/sub_source_cpu.h deleted file mode 100644 index 85ec087ac..000000000 --- a/blocklib/zeromq/sub_source/sub_source_cpu.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once - -#include "base.h" -#include <gnuradio/zeromq/sub_source.h> - -namespace gr { -namespace zeromq { - -class sub_source_cpu : public virtual sub_source, public virtual base_source -{ -public: - sub_source_cpu(block_args args); - work_return_t work(work_io&) override; - - // Since vsize can be set as 0, then inferred on flowgraph init, set it during start() - bool start() override - { - set_vsize(this->output_stream_ports()[0]->itemsize()); - return sub_source::start(); - } - -private: - // private variables here -}; - -} // namespace zeromq -} // namespace gr
\ No newline at end of file diff --git a/blocklib/zeromq/test/.gitignore b/blocklib/zeromq/test/.gitignore deleted file mode 100644 index 01ecb66ff..000000000 --- a/blocklib/zeromq/test/.gitignore +++ /dev/null @@ -1 +0,0 @@ -!meson.build
\ No newline at end of file diff --git a/blocklib/zeromq/test/meson.build b/blocklib/zeromq/test/meson.build deleted file mode 100644 index f271695c2..000000000 --- a/blocklib/zeromq/test/meson.build +++ /dev/null @@ -1,10 +0,0 @@ -################################################### -# QA -################################################### - -if GR_ENABLE_PYTHON - test('qa_zeromq_pushpull', py3, args : files('qa_zeromq_pushpull.py'), env: TEST_ENV) - test('qa_zeromq_pull_msg_source', py3, args : files('qa_zeromq_pull_msg_source.py'), env: TEST_ENV) - test('qa_zeromq_pubsub', py3, args : files('qa_zeromq_pubsub.py'), env: TEST_ENV) - test('qa_zeromq_reqrep', py3, args : files('qa_zeromq_reqrep.py'), env: TEST_ENV) -endif diff --git a/blocklib/zeromq/test/qa_zeromq_pubsub.py b/blocklib/zeromq/test/qa_zeromq_pubsub.py deleted file mode 100644 index 4334794d8..000000000 --- a/blocklib/zeromq/test/qa_zeromq_pubsub.py +++ /dev/null @@ -1,110 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright 2014 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -# - - -from gnuradio import gr, gr_unittest -from gnuradio import blocks, zeromq -import pmtf -import time - - -def make_tag(key, value, offset, srcid=None): - - if srcid is None: - tag = gr.tag_t(offset, {key: pmtf.pmt(value)}) - else: - tag = gr.tag_t(offset, {key: pmtf.pmt(value), "srcid": pmtf.pmt(srcid)}) - return tag - - -# def compare_tags(a, b): -# return a.offset == b.offset and pmt.equal(a.key, b.key) and \ -# pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid) - - -class qa_zeromq_pubsub (gr_unittest.TestCase): - - def setUp(self): - self.send_tb = gr.top_block() - self.recv_tb = gr.top_block() - - def tearDown(self): - self.send_tb = None - self.recv_tb = None - - def test_001(self): - vlen = 10 - src_data = list(range(vlen)) * 100 - src = blocks.vector_source_f(src_data, False, vlen) - zeromq_pub_sink = zeromq.pub_sink("tcp://127.0.0.1:0", 0) - address = zeromq_pub_sink.last_endpoint() - zeromq_sub_source = zeromq.sub_source(address, 0) - sink = blocks.vector_sink_f(vlen) - self.send_tb.connect(src, zeromq_pub_sink) - self.recv_tb.connect(zeromq_sub_source, sink) - self.recv_tb.start() - time.sleep(0.5) - self.send_tb.start() - time.sleep(0.5) - self.recv_tb.stop() - self.send_tb.stop() - # self.recv_tb.wait() - # self.send_tb.wait() - self.assertFloatTuplesAlmostEqual(sink.data(), src_data) - - def test_002(self): - # same as test_001, but insert a tag and set key filter - vlen = 10 - src_data = list(range(vlen)) * 100 - - src_tags = tuple([make_tag('key', 'val', 0, 'src'), - make_tag('key', 'val', 1, 'src')]) - - src = blocks.vector_source_f(src_data, False, vlen, tags=src_tags) - zeromq_pub_sink = zeromq.pub_sink( - "tcp://127.0.0.1:0", - 0, - pass_tags=True, - key="filter_key") - address = zeromq_pub_sink.last_endpoint() - zeromq_sub_source = zeromq.sub_source(address, 0, pass_tags=True, key="filter_key") - sink = blocks.vector_sink_f(vlen) - self.send_tb.connect(src, zeromq_pub_sink) - self.recv_tb.connect(zeromq_sub_source, sink) - - # start both flowgraphs - self.recv_tb.start() - time.sleep(0.5) - self.send_tb.start() - time.sleep(0.5) - self.recv_tb.stop() - self.send_tb.stop() - self.recv_tb.wait() - self.send_tb.wait() - - # compare data - self.assertFloatTuplesAlmostEqual(sink.data(), src_data) - - # compare all tags - rx_tags = sink.tags() - self.assertEqual(len(src_tags), len(rx_tags)) - - idx = 0 - for in_tag, out_tag in zip(src_tags, rx_tags): - print(idx) - print(in_tag) - print(out_tag) - self.assertTrue(in_tag == out_tag) - idx += 1 - - -if __name__ == '__main__': - gr_unittest.run(qa_zeromq_pubsub) diff --git a/blocklib/zeromq/test/qa_zeromq_pull_msg_source.py b/blocklib/zeromq/test/qa_zeromq_pull_msg_source.py deleted file mode 100644 index ff161b9c7..000000000 --- a/blocklib/zeromq/test/qa_zeromq_pull_msg_source.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -# -"""Unit tests for ZMQ PULL Message Source block""" - -import time -import zmq - -from gnuradio import gr, gr_unittest, blocks, zeromq -import pmtf - - -class qa_zeromq_pull_msg_source(gr_unittest.TestCase): - """Unit tests for ZMQ PULL Message Source block""" - - def setUp(self): - addr = 'tcp://127.0.0.1' - self.context = zmq.Context() - self.zmq_sock = self.context.socket(zmq.PUSH) - port = self.zmq_sock.bind_to_random_port(addr) - - self.zeromq_pull_msg_source = zeromq.pull_msg_source( - ('%s:%s' % (addr, port)), 100) - self.message_debug = blocks.message_debug() - self.tb = gr.top_block() - self.tb.connect( - (self.zeromq_pull_msg_source, 'out'), (self.message_debug, 'store')) - - self.tb.start() - time.sleep(0.1) - - def tearDown(self): - self.tb.stop() - self.tb.wait() - self.zeromq_pull_msg_source = None - self.message_debug = None - self.tb = None - self.zmq_sock.close() - self.context.term() - - def test_valid_pmt(self): - """Test receiving of valid PMT messages""" - msg = pmtf.pmt('test_valid_pmt') - self.zmq_sock.send(bytes(msg.serialize())) - for _ in range(10): - if self.message_debug.num_messages() > 0: - break - time.sleep(0.2) - self.assertEqual(1, self.message_debug.num_messages()) - self.assertEqual(msg(), self.message_debug.get_message(0)()) - - # def test_invalid_pmt(self): - # """Test receiving of invalid PMT messages""" - # self.zmq_sock.send_string('test_invalid_pmt') - # time.sleep(0.1) - # self.assertEqual(0, self.message_debug.num_messages()()) - - -if __name__ == '__main__': - gr_unittest.run(qa_zeromq_pull_msg_source) diff --git a/blocklib/zeromq/test/qa_zeromq_pushpull.py b/blocklib/zeromq/test/qa_zeromq_pushpull.py deleted file mode 100644 index ea18ed7b1..000000000 --- a/blocklib/zeromq/test/qa_zeromq_pushpull.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2014 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -# - - -from gnuradio import gr, gr_unittest, blocks, zeromq -import time - - -class qa_zeromq_pushpull (gr_unittest.TestCase): - - def setUp(self): - self.send_tb = gr.top_block() - self.recv_tb = gr.top_block() - - def tearDown(self): - self.send_tb = None - self.recv_tb = None - - def test_001(self): - vlen = 10 - src_data = list(range(vlen)) * 100 - src = blocks.vector_source_f(src_data, False, vlen) - zeromq_push_sink = zeromq.push_sink("tcp://127.0.0.1:0") - address = zeromq_push_sink.last_endpoint() - zeromq_pull_source = zeromq.pull_source(address, 0) - sink = blocks.vector_sink_f(vlen) - self.send_tb.connect(src, zeromq_push_sink) - self.recv_tb.connect(zeromq_pull_source, sink) - self.recv_tb.start() - # time.sleep(0.5) - self.send_tb.start() - time.sleep(0.5) - self.recv_tb.stop() - self.send_tb.stop() - self.assertFloatTuplesAlmostEqual(sink.data(), src_data) - - -if __name__ == '__main__': - gr_unittest.run(qa_zeromq_pushpull) diff --git a/blocklib/zeromq/test/qa_zeromq_reqrep.py b/blocklib/zeromq/test/qa_zeromq_reqrep.py deleted file mode 100644 index 733bf2f03..000000000 --- a/blocklib/zeromq/test/qa_zeromq_reqrep.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright 2014 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# SPDX-License-Identifier: GPL-3.0-or-later -# -# - - -from gnuradio import gr, gr_unittest -from gnuradio import blocks, zeromq -import time - - -class qa_zeromq_reqrep (gr_unittest.TestCase): - - def setUp(self): - self.send_tb = gr.top_block() - self.recv_tb = gr.top_block() - - def tearDown(self): - self.send_tb = None - self.recv_tb = None - - def test_001(self): - vlen = 10 - src_data = list(range(vlen)) * 100 - src = blocks.vector_source_f(src_data, False, vlen) - zeromq_rep_sink = zeromq.rep_sink("tcp://127.0.0.1:0", 0) - address = zeromq_rep_sink.last_endpoint() - zeromq_req_source = zeromq.req_source(address, 0) - sink = blocks.vector_sink_f(vlen) - self.send_tb.connect(src, zeromq_rep_sink) - self.recv_tb.connect(zeromq_req_source, sink) - self.recv_tb.start() - time.sleep(0.5) - self.send_tb.start() - time.sleep(0.5) - self.recv_tb.stop() - self.send_tb.stop() - self.recv_tb.wait() - self.send_tb.wait() - self.assertFloatTuplesAlmostEqual(sink.data(), src_data) - - -if __name__ == '__main__': - gr_unittest.run(qa_zeromq_reqrep) |