aboutsummaryrefslogtreecommitdiffstats
path: root/blocklib/zeromq
diff options
context:
space:
mode:
Diffstat (limited to 'blocklib/zeromq')
-rw-r--r--blocklib/zeromq/.gitignore1
-rw-r--r--blocklib/zeromq/include/gnuradio/zeromq/.gitignore1
-rw-r--r--blocklib/zeromq/include/gnuradio/zeromq/api.h18
-rw-r--r--blocklib/zeromq/include/gnuradio/zeromq/meson.build25
-rw-r--r--blocklib/zeromq/lib/.gitignore1
-rw-r--r--blocklib/zeromq/lib/base.cc236
-rw-r--r--blocklib/zeromq/lib/base.h89
-rw-r--r--blocklib/zeromq/lib/meson.build36
-rw-r--r--blocklib/zeromq/lib/tag_headers.cc89
-rw-r--r--blocklib/zeromq/lib/tag_headers.h28
-rw-r--r--blocklib/zeromq/lib/zmq_common_impl.h13
-rw-r--r--blocklib/zeromq/pub_sink/pub_sink.yml55
-rw-r--r--blocklib/zeromq/pub_sink/pub_sink_cpu.cc33
-rw-r--r--blocklib/zeromq/pub_sink/pub_sink_cpu.h28
-rw-r--r--blocklib/zeromq/pull_msg_source/pull_msg_source.yml37
-rw-r--r--blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc96
-rw-r--r--blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h45
-rw-r--r--blocklib/zeromq/pull_source/pull_source.yml43
-rw-r--r--blocklib/zeromq/pull_source/pull_source_cpu.cc53
-rw-r--r--blocklib/zeromq/pull_source/pull_source_cpu.h24
-rw-r--r--blocklib/zeromq/push_msg_sink/push_msg_sink.yml38
-rw-r--r--blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc50
-rw-r--r--blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h40
-rw-r--r--blocklib/zeromq/push_sink/push_sink.yml51
-rw-r--r--blocklib/zeromq/push_sink/push_sink_cpu.cc34
-rw-r--r--blocklib/zeromq/push_sink/push_sink_cpu.h25
-rw-r--r--blocklib/zeromq/python/gnuradio/zeromq/.gitignore1
-rw-r--r--blocklib/zeromq/python/gnuradio/zeromq/__init__.py9
-rw-r--r--blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore1
-rw-r--r--blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc39
-rw-r--r--blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build2
-rw-r--r--blocklib/zeromq/rep_sink/rep_sink.yml51
-rw-r--r--blocklib/zeromq/rep_sink/rep_sink_cpu.cc74
-rw-r--r--blocklib/zeromq/rep_sink/rep_sink_cpu.h35
-rw-r--r--blocklib/zeromq/req_source/req_source.yml43
-rw-r--r--blocklib/zeromq/req_source/req_source_cpu.cc78
-rw-r--r--blocklib/zeromq/req_source/req_source_cpu.h37
-rw-r--r--blocklib/zeromq/sub_source/sub_source.yml49
-rw-r--r--blocklib/zeromq/sub_source/sub_source_cpu.cc60
-rw-r--r--blocklib/zeromq/sub_source/sub_source_cpu.h27
-rw-r--r--blocklib/zeromq/test/.gitignore1
-rw-r--r--blocklib/zeromq/test/meson.build10
-rw-r--r--blocklib/zeromq/test/qa_zeromq_pubsub.py110
-rw-r--r--blocklib/zeromq/test/qa_zeromq_pull_msg_source.py66
-rw-r--r--blocklib/zeromq/test/qa_zeromq_pushpull.py46
-rw-r--r--blocklib/zeromq/test/qa_zeromq_reqrep.py50
46 files changed, 0 insertions, 1978 deletions
diff --git a/blocklib/zeromq/.gitignore b/blocklib/zeromq/.gitignore
deleted file mode 100644
index 25d053a52..000000000
--- a/blocklib/zeromq/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-meson.build \ No newline at end of file
diff --git a/blocklib/zeromq/include/gnuradio/zeromq/.gitignore b/blocklib/zeromq/include/gnuradio/zeromq/.gitignore
deleted file mode 100644
index d53050d7d..000000000
--- a/blocklib/zeromq/include/gnuradio/zeromq/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-!meson.build
diff --git a/blocklib/zeromq/include/gnuradio/zeromq/api.h b/blocklib/zeromq/include/gnuradio/zeromq/api.h
deleted file mode 100644
index 29941d0f4..000000000
--- a/blocklib/zeromq/include/gnuradio/zeromq/api.h
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2011 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#pragma once
-
-#include <gnuradio/attributes.h>
-
-#ifdef gnuradio_blocks_EXPORTS
-#define ZEROMQ_API __GR_ATTR_EXPORT
-#else
-#define ZEROMQ_API __GR_ATTR_IMPORT
-#endif
diff --git a/blocklib/zeromq/include/gnuradio/zeromq/meson.build b/blocklib/zeromq/include/gnuradio/zeromq/meson.build
deleted file mode 100644
index 9bb4be16a..000000000
--- a/blocklib/zeromq/include/gnuradio/zeromq/meson.build
+++ /dev/null
@@ -1,25 +0,0 @@
-zeromq_headers = [
- 'api.h'
-]
-
-install_headers(zeromq_headers, subdir : 'gnuradio/zeromq')
-
-cmake_conf = configuration_data()
-cmake_conf.set('libdir', join_paths(prefix,get_option('libdir')))
-cmake_conf.set('module', 'zeromq')
-cmake.configure_package_config_file(
- name : 'gnuradio-zeromq',
- input : join_paths(meson.source_root(),'cmake','Modules','gnuradioConfigModule.cmake.in'),
- install_dir : get_option('prefix') / get_option('libdir') / 'cmake' / 'gnuradio',
- configuration : cmake_conf
-)
-
-pkg = import('pkgconfig')
-libs = [] # the library/libraries users need to link against
-h = ['.'] # subdirectories of ${prefix}/${includedir} to add to header path
-pkg.generate(libraries : libs,
- subdirs : h,
- version : meson.project_version(),
- name : 'libgnuradio-zeromq',
- filebase : 'gnuradio-zeromq',
- description : 'GNU Radio ZeroMQ Blocks')
diff --git a/blocklib/zeromq/lib/.gitignore b/blocklib/zeromq/lib/.gitignore
deleted file mode 100644
index 01ecb66ff..000000000
--- a/blocklib/zeromq/lib/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-!meson.build \ No newline at end of file
diff --git a/blocklib/zeromq/lib/base.cc b/blocklib/zeromq/lib/base.cc
deleted file mode 100644
index 10a07cfa3..000000000
--- a/blocklib/zeromq/lib/base.cc
+++ /dev/null
@@ -1,236 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2016,2019 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "base.h"
-#include "tag_headers.h"
-
-namespace {
-constexpr int LINGER_DEFAULT = 1000; // 1 second.
-}
-
-namespace gr {
-namespace zeromq {
-
-base::base(int type, size_t itemsize, int timeout, bool pass_tags, const std::string& key)
- : d_context(1),
- d_socket(d_context, type),
- d_vsize(itemsize),
- d_timeout(timeout),
- d_pass_tags(pass_tags),
- d_key(key)
-{
- /* "Fix" timeout value (ms for new API, us for old API) */
- int major, minor, patch;
- zmq::version(&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout *= 1000;
- }
-}
-
-std::string base::last_endpoint() const
-{
- return d_socket.get(zmq::sockopt::last_endpoint);
-}
-
-
-base_sink::base_sink(int type,
- size_t itemsize,
- const std::string& address,
- int timeout,
- bool pass_tags,
- int hwm,
- const std::string& key)
- : base(type, itemsize, timeout, pass_tags, key)
-{
- gr::configure_default_loggers(d_base_logger, d_base_debug_logger, "zmq_base_source");
-
- /* Set high watermark */
- if (hwm >= 0) {
- d_socket.set(zmq::sockopt::sndhwm, hwm);
- }
-
- /* Set ZMQ_LINGER so socket won't infinitely block during teardown */
- d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT);
-
- /* Bind */
- d_socket.bind(address);
-}
-
-int base_sink::send_message(const void* in_buf,
- const int in_nitems,
- const uint64_t in_offset,
- const std::vector<tag_t>& tags)
-{
- /* Send key if it exists */
- if (!d_key.empty()) {
- zmq::message_t key_message(d_key.size());
- memcpy(key_message.data(), d_key.data(), d_key.size());
- d_socket.send(key_message, zmq::send_flags::sndmore);
- }
- /* Meta-data header */
- std::string header("");
- if (d_pass_tags) {
- // std::vector<gr::tag_t> tags;
- // get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems);
- header = gen_tag_header(in_offset, tags);
- }
-
- /* Create message */
- size_t payload_len = in_nitems * d_vsize;
- size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len;
- zmq::message_t msg(msg_len);
-
- if (d_pass_tags) {
- memcpy(msg.data(), header.c_str(), header.length());
- memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len);
- }
- else {
- memcpy(msg.data(), in_buf, payload_len);
- }
-
- /* Send */
- d_socket.send(msg, zmq::send_flags::none);
-
- /* Report back */
- return in_nitems;
-}
-
-base_source::base_source(int type,
- size_t itemsize,
- const std::string& address,
- int timeout,
- bool pass_tags,
- int hwm,
- const std::string& key)
- : base(type, itemsize, timeout, pass_tags, key),
- d_consumed_bytes(0),
- d_consumed_items(0)
-{
- /* Set high watermark */
- if (hwm >= 0) {
- d_socket.set(zmq::sockopt::rcvhwm, hwm);
- }
-
- /* Set ZMQ_LINGER so socket won't infinitely block during teardown */
- d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT);
-
- /* Connect */
- d_socket.connect(address);
-}
-
-bool base_source::has_pending() { return d_msg.size() > d_consumed_bytes; }
-
-int base_source::flush_pending(block_work_output& work_output,
- const int out_nitems,
- const uint64_t out_offset)
-{
- /* How much to copy in this call */
- int to_copy_items =
- std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize));
- int to_copy_bytes = d_vsize * to_copy_items;
- auto nw = work_output.nitems_written();
-
- /* Copy actual data */
- memcpy(work_output.items<uint8_t>() + out_offset * d_vsize,
- (uint8_t*)d_msg.data() + d_consumed_bytes,
- to_copy_bytes);
-
- /* Add tags matching this segment of samples */
- for (unsigned int i = 0; i < d_tags.size(); i++) {
- if ((d_tags[i].offset() >= (uint64_t)d_consumed_items) &&
- (d_tags[i].offset() < (uint64_t)d_consumed_items + to_copy_items)) {
- gr::tag_t nt = d_tags[i];
- nt.set_offset(nt.offset() + nw + out_offset - d_consumed_items);
- work_output.add_tag(nt);
- }
- }
-
- /* Update pointer */
- d_consumed_items += to_copy_items;
- d_consumed_bytes += to_copy_bytes;
-
- return to_copy_items;
-}
-
-bool base_source::load_message(bool wait)
-{
- /* Poll for input */
- zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, std::chrono::milliseconds{ wait ? d_timeout : 0 });
-
- if (!(items[0].revents & ZMQ_POLLIN))
- return false;
-
- /* Is this the start or continuation of a multi-part message? */
- auto more = d_socket.get(zmq::sockopt::rcvmore);
-
- /* Reset */
- d_msg.rebuild();
- d_tags.clear();
- d_consumed_items = 0;
- d_consumed_bytes = 0;
-
- /* Get the message */
- const bool ok = bool(d_socket.recv(d_msg));
-
- if (!ok) {
- // This shouldn't happen since we polled POLLIN, but ZMQ wants us to check
- // the return value.
- d_base_logger->warn("Failed to recv() message.");
- return false;
- }
-
- /* Throw away key and get the first message. Avoid blocking if a multi-part
- * message is not sent */
- if (!d_key.empty() && !more) {
- auto is_multipart = d_socket.get(zmq::sockopt::rcvmore);
-
- d_msg.rebuild();
- if (is_multipart) {
- const bool multi_ok = bool(d_socket.recv(d_msg));
-
- if (!multi_ok) {
- d_base_logger->error("Failure to receive multi-part message.");
- }
- }
- else {
- return false;
- }
- }
- /* Parse header from the first (or only) message of a multi-part message */
- if (d_pass_tags && !more) {
- uint64_t rcv_offset;
-
- /* Parse header */
- d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags);
-
- /* Fixup the tags offset to be relative to the start of this message */
- for (unsigned int i = 0; i < d_tags.size(); i++) {
- d_tags[i].set_offset(d_tags[i].offset() - rcv_offset);
- }
- }
-
- /* Each message must contain an integer multiple of data vectors */
- if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) {
- throw std::runtime_error("Incompatible vector sizes: need a multiple of " +
- std::to_string(d_vsize) + " bytes per message");
- }
-
- /* We got one ! */
- return true;
-}
-
-} /* namespace zeromq */
-} /* namespace gr */
diff --git a/blocklib/zeromq/lib/base.h b/blocklib/zeromq/lib/base.h
deleted file mode 100644
index 7fb6d2ccb..000000000
--- a/blocklib/zeromq/lib/base.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2016,2019 Free Software Foundation, Inc.
- * Copyright 2022 Josh Morman
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#pragma once
-
-#include "zmq_common_impl.h"
-#include <gnuradio/block_work_io.h>
-#include <gnuradio/logger.h>
-#include <gnuradio/tag.h>
-
-namespace gr {
-namespace zeromq {
-
-class base
-{
-public:
- base(int type,
- size_t itemsize,
- int timeout,
- bool pass_tags,
- const std::string& key = "");
-
- void set_vsize(size_t vsize) { d_vsize = vsize; }
-
-protected:
- std::string last_endpoint() const;
- zmq::context_t d_context;
- zmq::socket_t d_socket;
- size_t d_vsize;
- int d_timeout;
- bool d_pass_tags;
- const std::string d_key;
-
- logger_ptr d_base_logger;
- logger_ptr d_base_debug_logger;
-};
-
-class base_sink : public base
-{
-public:
- base_sink(int type,
- size_t itemsize,
- const std::string& address,
- int timeout,
- bool pass_tags,
- int hwm,
- const std::string& key = "");
-
-protected:
- int send_message(const void* in_buf,
- const int in_nitems,
- const uint64_t in_offset,
- const std::vector<tag_t>& tags);
-};
-
-class base_source : public base
-{
-public:
- base_source(int type,
- size_t itemsize,
- const std::string& address,
- int timeout,
- bool pass_tags,
- int hwm,
- const std::string& key = "");
-
-protected:
- zmq::message_t d_msg;
- std::vector<gr::tag_t> d_tags;
- size_t d_consumed_bytes;
- int d_consumed_items;
-
- bool has_pending();
- int flush_pending(block_work_output& work_output,
- const int out_nitems,
- const uint64_t out_offset);
- bool load_message(bool wait);
-};
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/lib/meson.build b/blocklib/zeromq/lib/meson.build
deleted file mode 100644
index c3f036763..000000000
--- a/blocklib/zeromq/lib/meson.build
+++ /dev/null
@@ -1,36 +0,0 @@
-zeromq_deps += [gnuradio_gr_dep, volk_dep, fmt_dep, pmtf_dep, cppzmq_dep]
-zeromq_sources += ['base.cc', 'tag_headers.cc']
-block_cpp_args = ['-DHAVE_CPU']
-
-incdir = include_directories(['../include/gnuradio/zeromq','../include'])
-gnuradio_blocklib_zeromq_lib = library('gnuradio-blocklib-zeromq',
- zeromq_sources,
- include_directories : incdir,
- install : true,
- link_language: 'cpp',
- dependencies : zeromq_deps,
- cpp_args : block_cpp_args)
-
-gnuradio_blocklib_zeromq_dep = declare_dependency(include_directories : incdir,
- link_with : gnuradio_blocklib_zeromq_lib,
- dependencies : zeromq_deps)
-
-cmake_conf = configuration_data()
-cmake_conf.set('libdir', join_paths(prefix,get_option('libdir')))
-cmake_conf.set('module', 'zeromq')
-cmake.configure_package_config_file(
- name : 'gnuradio-zeromq',
- input : join_paths(meson.source_root(),'cmake','Modules','gnuradioConfigModule.cmake.in'),
- install_dir : get_option('prefix') / get_option('libdir') / 'cmake' / 'gnuradio',
- configuration : cmake_conf
-)
-
-pkg = import('pkgconfig')
-libs = [gnuradio_blocklib_zeromq_lib] # the library/libraries users need to link against
-h = ['.'] # subdirectories of ${prefix}/${includedir} to add to header path
-pkg.generate(libraries : libs,
- subdirs : h,
- version : meson.project_version(),
- name : 'libgnuradio-zeromq',
- filebase : 'gnuradio-zeromq',
- description : 'GNU Radio ZeroMQ Module')
diff --git a/blocklib/zeromq/lib/tag_headers.cc b/blocklib/zeromq/lib/tag_headers.cc
deleted file mode 100644
index fd814bead..000000000
--- a/blocklib/zeromq/lib/tag_headers.cc
+++ /dev/null
@@ -1,89 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2014 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#include "zmq_common_impl.h"
-#include <gnuradio/tag.h>
-#include <cstring>
-#include <sstream>
-
-#define GR_HEADER_MAGIC 0x5FF0
-#define GR_HEADER_VERSION 0x02
-
-namespace gr {
-namespace zeromq {
-
-struct membuf : std::streambuf {
- membuf(void* b, size_t len)
- {
- char* bc = static_cast<char*>(b);
- this->setg(bc, bc, bc + len);
- }
-};
-
-std::string gen_tag_header(uint64_t offset, const std::vector<gr::tag_t>& tags)
-{
- std::stringbuf sb("");
- std::ostream ss(&sb);
-
- uint16_t header_magic = GR_HEADER_MAGIC;
- uint8_t header_version = GR_HEADER_VERSION;
- uint64_t ntags = (uint64_t)tags.size();
-
- ss.write((const char*)&header_magic, sizeof(uint16_t));
- ss.write((const char*)&header_version, sizeof(uint8_t));
- ss.write((const char*)&offset, sizeof(uint64_t));
- ss.write((const char*)&ntags, sizeof(uint64_t));
-
- for (auto& tag : tags) {
- tag.serialize(sb);
- }
-
- return sb.str();
-}
-
-size_t parse_tag_header(zmq::message_t& msg,
- uint64_t& offset_out,
- std::vector<gr::tag_t>& tags_out)
-{
- membuf sb(msg.data(), msg.size());
- std::istream iss(&sb);
-
- size_t min_len =
- sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t);
- if (msg.size() < min_len)
- throw std::runtime_error("incoming zmq msg too small to hold gr tag header!");
-
- uint16_t header_magic;
- uint8_t header_version;
- uint64_t rcv_ntags;
-
- iss.read((char*)&header_magic, sizeof(uint16_t));
- iss.read((char*)&header_version, sizeof(uint8_t));
-
- if (header_magic != GR_HEADER_MAGIC)
- throw std::runtime_error("gr header magic does not match!");
-
- if (header_version != GR_HEADER_VERSION)
- throw std::runtime_error("gr header version does not match!");
-
- iss.read((char*)&offset_out, sizeof(uint64_t));
- iss.read((char*)&rcv_ntags, sizeof(uint64_t));
-
- for (size_t i = 0; i < rcv_ntags; i++) {
- gr::tag_t newtag = tag_t::deserialize(sb);
- tags_out.push_back(newtag);
- }
-
- return msg.size() - sb.in_avail();
-}
-} /* namespace zeromq */
-} /* namespace gr */
-
-// vim: ts=2 sw=2 expandtab
diff --git a/blocklib/zeromq/lib/tag_headers.h b/blocklib/zeromq/lib/tag_headers.h
deleted file mode 100644
index b8b7a09b1..000000000
--- a/blocklib/zeromq/lib/tag_headers.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2014 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#ifndef ZEROMQ_TAG_HEADERS_H
-#define ZEROMQ_TAG_HEADERS_H
-
-#include "zmq_common_impl.h"
-#include <gnuradio/tag.h>
-
-namespace gr {
-namespace zeromq {
-
-std::string gen_tag_header(uint64_t offset, const std::vector<gr::tag_t>& tags);
-size_t parse_tag_header(zmq::message_t& msg,
- uint64_t& offset_out,
- std::vector<gr::tag_t>& tags_out);
-
-} /* namespace zeromq */
-} /* namespace gr */
-
-#endif /* ZEROMQ_TAG_HEADERS_H */
diff --git a/blocklib/zeromq/lib/zmq_common_impl.h b/blocklib/zeromq/lib/zmq_common_impl.h
deleted file mode 100644
index fc08ef301..000000000
--- a/blocklib/zeromq/lib/zmq_common_impl.h
+++ /dev/null
@@ -1,13 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2019 Free Software Foundation, Inc.
- * Copyright 2022 Josh Morman
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-#pragma once
-
-#include <zmq.hpp>
diff --git a/blocklib/zeromq/pub_sink/pub_sink.yml b/blocklib/zeromq/pub_sink/pub_sink.yml
deleted file mode 100644
index f65b4a70f..000000000
--- a/blocklib/zeromq/pub_sink/pub_sink.yml
+++ /dev/null
@@ -1,55 +0,0 @@
-module: zeromq
-block: pub_sink
-label: PUB Sink
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-
-# Example Parameters
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: pass_tags
- label: Pass Tags
- dtype: bool
- settable: false
- default: "false"
- - id: hwm
- label: HWM
- dtype: int
- settable: false
- default: -1
- - id: key
- label: Key
- dtype: string
- settable: false
- default: "\"\""
- - id: itemsize
- label: Item Size
- dtype: size
- settable: false
- default: 0
-
-callbacks:
-- id: last_endpoint
- return: std::string
- const: true
- # inherited: true
-
-ports:
- - domain: stream
- id: in
- direction: input
- type: untyped
- size: parameters/itemsize
-
-implementations:
-- id: cpu
-
-file_format: 1
diff --git a/blocklib/zeromq/pub_sink/pub_sink_cpu.cc b/blocklib/zeromq/pub_sink/pub_sink_cpu.cc
deleted file mode 100644
index 2a5d9d3f5..000000000
--- a/blocklib/zeromq/pub_sink/pub_sink_cpu.cc
+++ /dev/null
@@ -1,33 +0,0 @@
-#include "pub_sink_cpu.h"
-#include "pub_sink_cpu_gen.h"
-
-namespace gr {
-namespace zeromq {
-
-pub_sink_cpu::pub_sink_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- base_sink(ZMQ_PUB,
- args.itemsize,
- args.address,
- args.timeout,
- args.pass_tags,
- args.hwm,
- args.key)
-{
-}
-
-work_return_t pub_sink_cpu::work(work_io& wio)
-{
- auto noutput_items = wio.inputs()[0].n_items;
- auto nread = wio.inputs()[0].nitems_read();
- auto nsent = send_message(wio.inputs()[0].raw_items(),
- noutput_items,
- nread,
- wio.inputs()[0].tags_in_window(0, noutput_items));
- wio.consume_each(nsent);
- return work_return_t::OK;
-}
-
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/pub_sink/pub_sink_cpu.h b/blocklib/zeromq/pub_sink/pub_sink_cpu.h
deleted file mode 100644
index e55d125e7..000000000
--- a/blocklib/zeromq/pub_sink/pub_sink_cpu.h
+++ /dev/null
@@ -1,28 +0,0 @@
-#pragma once
-
-#include "base.h"
-#include <gnuradio/zeromq/pub_sink.h>
-
-namespace gr {
-namespace zeromq {
-
-class pub_sink_cpu : public virtual pub_sink, public virtual base_sink
-{
-public:
- pub_sink_cpu(block_args args);
- work_return_t work(work_io&) override;
- std::string last_endpoint() const override { return base_sink::last_endpoint(); }
-
- // Since vsize can be set as 0, then inferred on flowgraph init, set it during start()
- bool start() override
- {
- set_vsize(this->input_stream_ports()[0]->itemsize());
- return pub_sink::start();
- }
-
-private:
- // private variables here
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/pull_msg_source/pull_msg_source.yml b/blocklib/zeromq/pull_msg_source/pull_msg_source.yml
deleted file mode 100644
index 3cde228af..000000000
--- a/blocklib/zeromq/pull_msg_source/pull_msg_source.yml
+++ /dev/null
@@ -1,37 +0,0 @@
-module: zeromq
-block: pull_msg_source
-label: Pull Msg Source
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: bind
- label: Bind
- dtype: bool
- settable: false
- default: "false"
-
-callbacks:
-- id: last_endpoint
- return: std::string
- const: true
-
-ports:
-- domain: message
- id: out
- direction: output
-
-implementations:
-- id: cpu
-# - id: cuda
-
-file_format: 1 \ No newline at end of file
diff --git a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc b/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc
deleted file mode 100644
index 94c81afb6..000000000
--- a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.cc
+++ /dev/null
@@ -1,96 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014,2019 Free Software Foundation, Inc.
- * Copyright 2022 Josh Morman
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#include "pull_msg_source_cpu.h"
-#include "pull_msg_source_cpu_gen.h"
-
-#include <chrono>
-
-namespace {
-constexpr int LINGER_DEFAULT = 1000; // 1 second.
-}
-
-namespace gr {
-namespace zeromq {
-
-pull_msg_source_cpu::pull_msg_source_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- d_timeout(args.timeout),
- d_context(1),
- d_socket(d_context, ZMQ_PULL)
-{
-
- d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT);
-
- if (args.bind) {
- d_socket.bind(args.address);
- }
- else {
- d_socket.connect(args.address);
- }
-}
-
-bool pull_msg_source_cpu::start()
-{
- d_finished = false;
- d_thread = std::thread([this] { readloop(); });
- return block::start();
-}
-
-bool pull_msg_source_cpu::stop()
-{
- d_finished = true;
- if (d_thread.joinable()) {
- d_thread.join();
- }
- return block::stop();
-}
-
-void pull_msg_source_cpu::readloop()
-{
- using namespace std::chrono_literals;
- while (!d_finished) {
-
- zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, std::chrono::milliseconds{ d_timeout });
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
-
- // Receive data
- zmq::message_t msg;
- const bool ok = bool(d_socket.recv(msg));
-
- if (!ok) {
- // Should not happen, we've checked POLLIN.
- d_logger->error("Failed to receive message.");
- std::this_thread::sleep_for(100us);
- continue;
- }
-
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- try {
- auto m = pmtf::pmt::deserialize(sb);
- d_msg_out->post(m);
- } catch (...) { // Take out PMT specific exception for now
- d_logger->error("Invalid PMT message");
- }
- }
- else {
- std::this_thread::sleep_for(100us);
- }
- }
-}
-
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h b/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h
deleted file mode 100644
index a31704ffd..000000000
--- a/blocklib/zeromq/pull_msg_source/pull_msg_source_cpu.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014,2019 Free Software Foundation, Inc.
- * Copyright 2022 Josh Morman
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#pragma once
-
-#include <gnuradio/zeromq/pull_msg_source.h>
-
-#include <zmq.hpp>
-#include <thread>
-
-namespace gr {
-namespace zeromq {
-
-class pull_msg_source_cpu : public virtual pull_msg_source
-{
-public:
- pull_msg_source_cpu(block_args args);
- bool start() override;
- bool stop() override;
- std::string last_endpoint() const override
- {
- return d_socket.get(zmq::sockopt::last_endpoint);
- }
-
-private:
- bool d_finished;
- int d_timeout; // microseconds, -1 is blocking
- zmq::context_t d_context;
- zmq::socket_t d_socket;
- std::thread d_thread;
-
-
- void readloop();
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/pull_source/pull_source.yml b/blocklib/zeromq/pull_source/pull_source.yml
deleted file mode 100644
index 491211407..000000000
--- a/blocklib/zeromq/pull_source/pull_source.yml
+++ /dev/null
@@ -1,43 +0,0 @@
-module: zeromq
-block: pull_source
-label: PULL Source
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-# inherits: base
-
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: pass_tags
- label: Pass Tags
- dtype: bool
- settable: false
- default: "false"
- - id: hwm
- label: HWM
- dtype: int
- settable: false
- default: -1
- - id: itemsize
- label: Item Size
- dtype: size
- settable: false
- default: 0
-ports:
- - domain: stream
- id: out
- direction: output
- type: untyped
- size: parameters/itemsize
-
-implementations:
- - id: cpu
-
-file_format: 1
diff --git a/blocklib/zeromq/pull_source/pull_source_cpu.cc b/blocklib/zeromq/pull_source/pull_source_cpu.cc
deleted file mode 100644
index 6331db2cc..000000000
--- a/blocklib/zeromq/pull_source/pull_source_cpu.cc
+++ /dev/null
@@ -1,53 +0,0 @@
-#include "pull_source_cpu.h"
-#include "pull_source_cpu_gen.h"
-
-#include <chrono>
-#include <thread>
-
-namespace gr {
-namespace zeromq {
-
-pull_source_cpu::pull_source_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- base_source(
- ZMQ_PULL, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm)
-{
-}
-
-work_return_t pull_source_cpu::work(work_io& wio)
-{
-
- auto noutput_items = wio.outputs()[0].n_items;
- bool first = true;
- size_t done = 0;
-
- /* Process as much as we can */
- while (1) {
- if (has_pending()) {
- /* Flush anything pending */
- done += flush_pending(wio.outputs()[0], noutput_items - done, done);
-
- /* No more space ? */
- if (done == noutput_items)
- break;
- }
- else {
- /* Try to get the next message */
- if (!load_message(first)) {
- // Launch a thread to come back and try again some time later
- come_back_later(100);
- break; /* No message, we're done for now */
- }
-
- /* Not the first anymore */
- first = false;
- }
- }
-
- wio.outputs()[0].n_produced = done;
- return work_return_t::OK;
-}
-
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/pull_source/pull_source_cpu.h b/blocklib/zeromq/pull_source/pull_source_cpu.h
deleted file mode 100644
index 157a9ba45..000000000
--- a/blocklib/zeromq/pull_source/pull_source_cpu.h
+++ /dev/null
@@ -1,24 +0,0 @@
-#pragma once
-
-#include "base.h"
-#include <gnuradio/zeromq/pull_source.h>
-
-namespace gr {
-namespace zeromq {
-
-class pull_source_cpu : public virtual pull_source, public virtual base_source
-{
-public:
- pull_source_cpu(block_args args);
- work_return_t work(work_io&) override;
-
- // Since vsize can be set as 0, then inferred on flowgraph init, set it during start()
- bool start() override
- {
- set_vsize(this->output_stream_ports()[0]->itemsize());
- return pull_source::start();
- }
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/push_msg_sink/push_msg_sink.yml b/blocklib/zeromq/push_msg_sink/push_msg_sink.yml
deleted file mode 100644
index 172631951..000000000
--- a/blocklib/zeromq/push_msg_sink/push_msg_sink.yml
+++ /dev/null
@@ -1,38 +0,0 @@
-module: zeromq
-block: push_msg_sink
-label: Push Msg Sink
-blocktype: block
-category: '[Core]/ZeroMQ Interfaces'
-
-# Example Parameters
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: bind
- label: Bind
- dtype: bool
- settable: false
- default: "true"
-
-callbacks:
-- id: last_endpoint
- return: std::string
- const: true
-
-ports:
-- domain: message
- id: in
- direction: input
-
-implementations:
-- id: cpu
-# - id: cuda
-
-file_format: 1
diff --git a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc b/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc
deleted file mode 100644
index dd044868d..000000000
--- a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014 Free Software Foundation, Inc.
- * Copyright 2022 Josh Morman
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#include "push_msg_sink_cpu.h"
-#include "push_msg_sink_cpu_gen.h"
-
-namespace {
-constexpr int LINGER_DEFAULT = 1000; // 1 second.
-}
-
-namespace gr {
-namespace zeromq {
-
-push_msg_sink_cpu::push_msg_sink_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- d_timeout(args.timeout),
- d_context(1),
- d_socket(d_context, ZMQ_PUSH)
-{
-
- d_socket.set(zmq::sockopt::linger, LINGER_DEFAULT);
-
- if (args.bind) {
- d_socket.bind(args.address);
- }
- else {
- d_socket.connect(args.address);
- }
-}
-
-void push_msg_sink_cpu::handle_msg_in(pmtf::pmt msg)
-{
- std::stringbuf sb("");
- msg.serialize(sb);
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
- memcpy(zmsg.data(), s.c_str(), s.size());
- d_socket.send(zmsg, zmq::send_flags::none);
-}
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h b/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h
deleted file mode 100644
index ad8e01d4e..000000000
--- a/blocklib/zeromq/push_msg_sink/push_msg_sink_cpu.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014 Free Software Foundation, Inc.
- * Copyright 2022 Josh Morman
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#pragma once
-
-#include <gnuradio/zeromq/push_msg_sink.h>
-
-#include <zmq.hpp>
-
-namespace gr {
-namespace zeromq {
-
-class push_msg_sink_cpu : public virtual push_msg_sink
-{
-public:
- push_msg_sink_cpu(block_args args);
-
- std::string last_endpoint() const override
- {
- return d_socket.get(zmq::sockopt::last_endpoint);
- }
-
-private:
- int d_timeout; // microseconds, -1 is blocking
- zmq::context_t d_context;
- zmq::socket_t d_socket;
-
- void handle_msg_in(pmtf::pmt msg) override;
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/push_sink/push_sink.yml b/blocklib/zeromq/push_sink/push_sink.yml
deleted file mode 100644
index 9e45debb1..000000000
--- a/blocklib/zeromq/push_sink/push_sink.yml
+++ /dev/null
@@ -1,51 +0,0 @@
-module: zeromq
-block: push_sink
-label: PUSH Sink
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-# inherits: gr::zeromq::base
-# includes:
-# - gnuradio/zeromq/base.h
-
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: pass_tags
- label: Pass Tags
- dtype: bool
- settable: false
- default: "false"
- - id: hwm
- label: HWM
- dtype: int
- settable: false
- default: -1
- - id: itemsize
- label: Item Size
- dtype: size
- settable: false
- default: 0
-callbacks:
-- id: last_endpoint
- return: std::string
- const: true
- # inherited: true
-
-ports:
- - domain: stream
- id: in
- direction: input
- type: untyped
- size: parameters/itemsize
-
-implementations:
- - id: cpu
-
-file_format: 1
diff --git a/blocklib/zeromq/push_sink/push_sink_cpu.cc b/blocklib/zeromq/push_sink/push_sink_cpu.cc
deleted file mode 100644
index 85de05df0..000000000
--- a/blocklib/zeromq/push_sink/push_sink_cpu.cc
+++ /dev/null
@@ -1,34 +0,0 @@
-#include "push_sink_cpu.h"
-#include "push_sink_cpu_gen.h"
-
-namespace gr {
-namespace zeromq {
-
-push_sink_cpu::push_sink_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- base_sink(
- ZMQ_PUSH, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm)
-{
-}
-work_return_t push_sink_cpu::work(work_io& wio)
-
-{
- // Poll with a timeout (FIXME: scheduler can't wait for us)
- zmq::pollitem_t itemsout[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLOUT, 0 } };
- zmq::poll(&itemsout[0], 1, std::chrono::milliseconds{ d_timeout });
-
- // If we can send something, do it
- if (itemsout[0].revents & ZMQ_POLLOUT) {
- wio.inputs()[0].n_consumed =
- send_message(wio.inputs()[0].raw_items(),
- wio.inputs()[0].n_items,
- wio.inputs()[0].nitems_read(),
- wio.inputs()[0].tags_in_window(0, wio.inputs()[0].n_items));
- }
-
- return work_return_t::OK;
-}
-
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/push_sink/push_sink_cpu.h b/blocklib/zeromq/push_sink/push_sink_cpu.h
deleted file mode 100644
index f58102cfa..000000000
--- a/blocklib/zeromq/push_sink/push_sink_cpu.h
+++ /dev/null
@@ -1,25 +0,0 @@
-#pragma once
-
-#include "base.h"
-#include <gnuradio/zeromq/push_sink.h>
-
-namespace gr {
-namespace zeromq {
-
-class push_sink_cpu : public virtual push_sink, public virtual base_sink
-{
-public:
- push_sink_cpu(block_args args);
- work_return_t work(work_io&) override;
- std::string last_endpoint() const override { return base_sink::last_endpoint(); }
-
- // Since vsize can be set as 0, then inferred on flowgraph init, set it during start()
- bool start() override
- {
- set_vsize(this->input_stream_ports()[0]->itemsize());
- return push_sink::start();
- }
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/python/gnuradio/zeromq/.gitignore b/blocklib/zeromq/python/gnuradio/zeromq/.gitignore
deleted file mode 100644
index 25d053a52..000000000
--- a/blocklib/zeromq/python/gnuradio/zeromq/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-meson.build \ No newline at end of file
diff --git a/blocklib/zeromq/python/gnuradio/zeromq/__init__.py b/blocklib/zeromq/python/gnuradio/zeromq/__init__.py
deleted file mode 100644
index ee01dd5ab..000000000
--- a/blocklib/zeromq/python/gnuradio/zeromq/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-
-import os
-
-try:
- from .zeromq_python import *
-except ImportError:
- dirname, filename = os.path.split(os.path.abspath(__file__))
- __path__.append(os.path.join(dirname, "bindings"))
- from .zeromq_python import *
diff --git a/blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore b/blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore
deleted file mode 100644
index 01ecb66ff..000000000
--- a/blocklib/zeromq/python/gnuradio/zeromq/bindings/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-!meson.build \ No newline at end of file
diff --git a/blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc b/blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc
deleted file mode 100644
index 59d6b86a8..000000000
--- a/blocklib/zeromq/python/gnuradio/zeromq/bindings/base_pybind.cc
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2020 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-/***********************************************************************************/
-/* This file is automatically generated using bindtool and can be manually edited */
-/* The following lines can be configured to regenerate this file during cmake */
-/* If manual edits are made, the following tags should be modified accordingly. */
-/* BINDTOOL_GEN_AUTOMATIC(0) */
-/* BINDTOOL_USE_PYGCCXML(0) */
-/* BINDTOOL_HEADER_FILE(firdes.h) */
-/* BINDTOOL_HEADER_FILE_HASH(10cf0c4b9664ba7e2931c2375c13c68c) */
-/***********************************************************************************/
-
-#include <pybind11/complex.h>
-#include <pybind11/pybind11.h>
-#include <pybind11/stl.h>
-
-namespace py = pybind11;
-
-#include <gnuradio/zeromq/base.h>
-
-void bind_base(py::module& m)
-{
- using base = gr::zeromq::base;
- using base_source = gr::zeromq::base_source;
- using base_sink = gr::zeromq::base_sink;
-
- py::class_<base, std::shared_ptr<base>> base_class(m, "base");
- py::class_<base_source, std::shared_ptr<base_source>> base_class(m, "base_source");
- py::class_<base_sink, std::shared_ptr<base_sink>> base_class(m, "base_sink");
-
- base.def("last_endpoint", &base::last_endpoint)
-}
diff --git a/blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build b/blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build
deleted file mode 100644
index 8a830aaec..000000000
--- a/blocklib/zeromq/python/gnuradio/zeromq/bindings/meson.build
+++ /dev/null
@@ -1,2 +0,0 @@
-# zeromq_pybind_sources = [files('base_pybind.cc')] + zeromq_pybind_sources
-# zeromq_pybind_names = ['base'] + zeromq_pybind_names \ No newline at end of file
diff --git a/blocklib/zeromq/rep_sink/rep_sink.yml b/blocklib/zeromq/rep_sink/rep_sink.yml
deleted file mode 100644
index 2d6a7647e..000000000
--- a/blocklib/zeromq/rep_sink/rep_sink.yml
+++ /dev/null
@@ -1,51 +0,0 @@
-module: zeromq
-block: rep_sink
-label: REP Sink
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-# inherits: gr::zeromq::base
-# includes:
-# - gnuradio/zeromq/base.h
-
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: pass_tags
- label: Pass Tags
- dtype: bool
- settable: false
- default: "false"
- - id: hwm
- label: HWM
- dtype: int
- settable: false
- default: -1
- - id: itemsize
- label: Item Size
- dtype: size
- settable: false
- default: 0
-callbacks:
-- id: last_endpoint
- return: std::string
- const: true
- # inherited: true
-
-ports:
- - domain: stream
- id: in
- direction: input
- type: untyped
- size: parameters/itemsize
-
-implementations:
- - id: cpu
-
-file_format: 1
diff --git a/blocklib/zeromq/rep_sink/rep_sink_cpu.cc b/blocklib/zeromq/rep_sink/rep_sink_cpu.cc
deleted file mode 100644
index 7ad80d471..000000000
--- a/blocklib/zeromq/rep_sink/rep_sink_cpu.cc
+++ /dev/null
@@ -1,74 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014,2019 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#include "rep_sink_cpu.h"
-#include "rep_sink_cpu_gen.h"
-
-namespace gr {
-namespace zeromq {
-
-rep_sink_cpu::rep_sink_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- base_sink(
- ZMQ_REP, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm)
-{
-}
-work_return_t rep_sink_cpu::work(work_io& wio)
-{
- auto in = wio.inputs()[0].items<uint8_t>();
- auto noutput_items = wio.inputs()[0].n_items;
- bool first = true;
- int done = 0;
-
- /* Process as much as we can */
- while (1) {
- /* Wait for a small time (FIXME: scheduler can't wait for us) */
- /* We only wait if its the first iteration, for the others we'll
- * let the scheduler retry */
- zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, std::chrono::milliseconds{ (first ? d_timeout : 0) });
-
- /* If we don't have anything, we're done */
- if (!(items[0].revents & ZMQ_POLLIN))
- break;
-
- /* Get and parse the request */
- zmq::message_t request;
- bool ok = bool(d_socket.recv(request));
-
- if (!ok) {
- // Should not happen, we've checked POLLIN.
- d_logger->error("Failed to receive message.");
- break;
- }
-
- int nitems_send = noutput_items - done;
- if (request.size() >= sizeof(uint32_t)) {
- int req = (int)*(static_cast<uint32_t*>(request.data()));
- nitems_send = std::min(nitems_send, req);
- }
-
- /* Delegate the actual send */
- done += send_message(in + (done * d_vsize),
- nitems_send,
- wio.inputs()[0].nitems_read() + done,
- wio.inputs()[0].tags_in_window(0, noutput_items));
-
- /* Not the first anymore */
- first = false;
- }
-
- wio.inputs()[0].n_consumed = done;
- return work_return_t::OK;
-}
-
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/rep_sink/rep_sink_cpu.h b/blocklib/zeromq/rep_sink/rep_sink_cpu.h
deleted file mode 100644
index 98d106c4a..000000000
--- a/blocklib/zeromq/rep_sink/rep_sink_cpu.h
+++ /dev/null
@@ -1,35 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014,2019 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#pragma once
-
-#include "base.h"
-#include <gnuradio/zeromq/rep_sink.h>
-
-namespace gr {
-namespace zeromq {
-
-class rep_sink_cpu : public virtual rep_sink, public virtual base_sink
-{
-public:
- rep_sink_cpu(block_args args);
- work_return_t work(work_io&) override;
- std::string last_endpoint() const override { return base_sink::last_endpoint(); }
-
- // Since vsize can be set as 0, then inferred on flowgraph init, set it during start()
- bool start() override
- {
- set_vsize(this->input_stream_ports()[0]->itemsize());
- return rep_sink::start();
- }
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/req_source/req_source.yml b/blocklib/zeromq/req_source/req_source.yml
deleted file mode 100644
index 8980ec8f3..000000000
--- a/blocklib/zeromq/req_source/req_source.yml
+++ /dev/null
@@ -1,43 +0,0 @@
-module: zeromq
-block: req_source
-label: REQ Source
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-# inherits: base
-
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: pass_tags
- label: Pass Tags
- dtype: bool
- settable: false
- default: "false"
- - id: hwm
- label: HWM
- dtype: int
- settable: false
- default: -1
- - id: itemsize
- label: Item Size
- dtype: size
- settable: false
- default: 0
-ports:
- - domain: stream
- id: out
- direction: output
- type: untyped
- size: parameters/itemsize
-
-implementations:
- - id: cpu
-
-file_format: 1
diff --git a/blocklib/zeromq/req_source/req_source_cpu.cc b/blocklib/zeromq/req_source/req_source_cpu.cc
deleted file mode 100644
index 2fc6f86b1..000000000
--- a/blocklib/zeromq/req_source/req_source_cpu.cc
+++ /dev/null
@@ -1,78 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#include "req_source_cpu.h"
-#include "req_source_cpu_gen.h"
-
-#include <chrono>
-#include <thread>
-
-namespace gr {
-namespace zeromq {
-
-req_source_cpu::req_source_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- base_source(
- ZMQ_REQ, args.itemsize, args.address, args.timeout, args.pass_tags, args.hwm)
-{
-}
-
-work_return_t req_source_cpu::work(work_io& wio)
-{
-
- auto noutput_items = wio.outputs()[0].n_items;
- bool first = true;
- size_t done = 0;
-
- /* Process as much as we can */
- while (1) {
- if (has_pending()) {
- /* Flush anything pending */
- done += flush_pending(wio.outputs()[0], noutput_items - done, done);
-
- /* No more space ? */
- if (done == noutput_items)
- break;
- }
- else {
-
- /* Send request if needed */
- if (!d_req_pending) {
- /* The REP/REQ pattern state machine guarantees we can send at this point
- */
- uint32_t req_len = noutput_items - done;
- zmq::message_t request(sizeof(uint32_t));
- memcpy((void*)request.data(), &req_len, sizeof(uint32_t));
- d_socket.send(request, zmq::send_flags::none);
- d_req_pending = true;
- }
-
- /* Try to get the next message */
- if (!load_message(first)) {
- // Launch a thread to come back and try again some time later
- come_back_later(100);
- break; /* No message, we're done for now */
- }
-
- /* Got response */
- d_req_pending = false;
-
- /* Not the first anymore */
- first = false;
- }
- }
-
- wio.outputs()[0].n_produced = done;
- return work_return_t::OK;
-}
-
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/req_source/req_source_cpu.h b/blocklib/zeromq/req_source/req_source_cpu.h
deleted file mode 100644
index 05cd2b6b8..000000000
--- a/blocklib/zeromq/req_source/req_source_cpu.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2014 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio.
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#pragma once
-
-#include "base.h"
-#include <gnuradio/zeromq/req_source.h>
-
-namespace gr {
-namespace zeromq {
-
-class req_source_cpu : public virtual req_source, public virtual base_source
-{
-public:
- req_source_cpu(block_args args);
- work_return_t work(work_io&) override;
-
- // Since vsize can be set as 0, then inferred on flowgraph init, set it during start()
- bool start() override
- {
- set_vsize(this->output_stream_ports()[0]->itemsize());
- return req_source::start();
- }
-
-private:
- bool d_req_pending = false;
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/sub_source/sub_source.yml b/blocklib/zeromq/sub_source/sub_source.yml
deleted file mode 100644
index b461962c6..000000000
--- a/blocklib/zeromq/sub_source/sub_source.yml
+++ /dev/null
@@ -1,49 +0,0 @@
-module: zeromq
-block: sub_source
-label: SUB Source
-blocktype: sync_block
-category: '[Core]/ZeroMQ Interfaces'
-
-# Example Parameters
-parameters:
- - id: address
- label: IP Address
- dtype: string
- settable: false
- - id: timeout
- label: Timeout
- dtype: int
- settable: false
- default: 100
- - id: pass_tags
- label: Pass Tags
- dtype: bool
- settable: false
- default: "false"
- - id: hwm
- label: HWM
- dtype: int
- settable: false
- default: -1
- - id: key
- label: Key
- dtype: string
- settable: false
- default: "\"\""
- - id: itemsize
- label: Item Size
- dtype: size
- settable: false
- default: 0
-
-ports:
- - domain: stream
- id: out
- direction: output
- type: untyped
- size: parameters/itemsize
-
-implementations:
- - id: cpu
-
-file_format: 1 \ No newline at end of file
diff --git a/blocklib/zeromq/sub_source/sub_source_cpu.cc b/blocklib/zeromq/sub_source/sub_source_cpu.cc
deleted file mode 100644
index 47e8a05b4..000000000
--- a/blocklib/zeromq/sub_source/sub_source_cpu.cc
+++ /dev/null
@@ -1,60 +0,0 @@
-#include "sub_source_cpu.h"
-#include "sub_source_cpu_gen.h"
-
-#include <chrono>
-#include <thread>
-
-namespace gr {
-namespace zeromq {
-
-sub_source_cpu::sub_source_cpu(block_args args)
- : INHERITED_CONSTRUCTORS,
- base_source(ZMQ_SUB,
- args.itemsize,
- args.address,
- args.timeout,
- args.pass_tags,
- args.hwm,
- args.key)
-{
-
- /* Subscribe */
- d_socket.set(zmq::sockopt::subscribe, args.key);
-}
-
-work_return_t sub_source_cpu::work(work_io& wio)
-{
- auto noutput_items = wio.outputs()[0].n_items;
- bool first = true;
- size_t done = 0;
-
- /* Process as much as we can */
- while (1) {
- if (has_pending()) {
- /* Flush anything pending */
- done += flush_pending(wio.outputs()[0], noutput_items - done, done);
-
- /* No more space ? */
- if (done == noutput_items)
- break;
- }
- else {
- /* Try to get the next message */
- if (!load_message(first)) {
- // Launch a thread to come back and try again some time later
- come_back_later(100);
- break; /* No message, we're done for now */
- }
-
- /* Not the first anymore */
- first = false;
- }
- }
-
- wio.produce_each(done);
- return work_return_t::OK;
-}
-
-
-} // namespace zeromq
-} // namespace gr
diff --git a/blocklib/zeromq/sub_source/sub_source_cpu.h b/blocklib/zeromq/sub_source/sub_source_cpu.h
deleted file mode 100644
index 85ec087ac..000000000
--- a/blocklib/zeromq/sub_source/sub_source_cpu.h
+++ /dev/null
@@ -1,27 +0,0 @@
-#pragma once
-
-#include "base.h"
-#include <gnuradio/zeromq/sub_source.h>
-
-namespace gr {
-namespace zeromq {
-
-class sub_source_cpu : public virtual sub_source, public virtual base_source
-{
-public:
- sub_source_cpu(block_args args);
- work_return_t work(work_io&) override;
-
- // Since vsize can be set as 0, then inferred on flowgraph init, set it during start()
- bool start() override
- {
- set_vsize(this->output_stream_ports()[0]->itemsize());
- return sub_source::start();
- }
-
-private:
- // private variables here
-};
-
-} // namespace zeromq
-} // namespace gr \ No newline at end of file
diff --git a/blocklib/zeromq/test/.gitignore b/blocklib/zeromq/test/.gitignore
deleted file mode 100644
index 01ecb66ff..000000000
--- a/blocklib/zeromq/test/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-!meson.build \ No newline at end of file
diff --git a/blocklib/zeromq/test/meson.build b/blocklib/zeromq/test/meson.build
deleted file mode 100644
index f271695c2..000000000
--- a/blocklib/zeromq/test/meson.build
+++ /dev/null
@@ -1,10 +0,0 @@
-###################################################
-# QA
-###################################################
-
-if GR_ENABLE_PYTHON
- test('qa_zeromq_pushpull', py3, args : files('qa_zeromq_pushpull.py'), env: TEST_ENV)
- test('qa_zeromq_pull_msg_source', py3, args : files('qa_zeromq_pull_msg_source.py'), env: TEST_ENV)
- test('qa_zeromq_pubsub', py3, args : files('qa_zeromq_pubsub.py'), env: TEST_ENV)
- test('qa_zeromq_reqrep', py3, args : files('qa_zeromq_reqrep.py'), env: TEST_ENV)
-endif
diff --git a/blocklib/zeromq/test/qa_zeromq_pubsub.py b/blocklib/zeromq/test/qa_zeromq_pubsub.py
deleted file mode 100644
index 4334794d8..000000000
--- a/blocklib/zeromq/test/qa_zeromq_pubsub.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# Copyright 2014 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-#
-
-
-from gnuradio import gr, gr_unittest
-from gnuradio import blocks, zeromq
-import pmtf
-import time
-
-
-def make_tag(key, value, offset, srcid=None):
-
- if srcid is None:
- tag = gr.tag_t(offset, {key: pmtf.pmt(value)})
- else:
- tag = gr.tag_t(offset, {key: pmtf.pmt(value), "srcid": pmtf.pmt(srcid)})
- return tag
-
-
-# def compare_tags(a, b):
-# return a.offset == b.offset and pmt.equal(a.key, b.key) and \
-# pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid)
-
-
-class qa_zeromq_pubsub (gr_unittest.TestCase):
-
- def setUp(self):
- self.send_tb = gr.top_block()
- self.recv_tb = gr.top_block()
-
- def tearDown(self):
- self.send_tb = None
- self.recv_tb = None
-
- def test_001(self):
- vlen = 10
- src_data = list(range(vlen)) * 100
- src = blocks.vector_source_f(src_data, False, vlen)
- zeromq_pub_sink = zeromq.pub_sink("tcp://127.0.0.1:0", 0)
- address = zeromq_pub_sink.last_endpoint()
- zeromq_sub_source = zeromq.sub_source(address, 0)
- sink = blocks.vector_sink_f(vlen)
- self.send_tb.connect(src, zeromq_pub_sink)
- self.recv_tb.connect(zeromq_sub_source, sink)
- self.recv_tb.start()
- time.sleep(0.5)
- self.send_tb.start()
- time.sleep(0.5)
- self.recv_tb.stop()
- self.send_tb.stop()
- # self.recv_tb.wait()
- # self.send_tb.wait()
- self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
-
- def test_002(self):
- # same as test_001, but insert a tag and set key filter
- vlen = 10
- src_data = list(range(vlen)) * 100
-
- src_tags = tuple([make_tag('key', 'val', 0, 'src'),
- make_tag('key', 'val', 1, 'src')])
-
- src = blocks.vector_source_f(src_data, False, vlen, tags=src_tags)
- zeromq_pub_sink = zeromq.pub_sink(
- "tcp://127.0.0.1:0",
- 0,
- pass_tags=True,
- key="filter_key")
- address = zeromq_pub_sink.last_endpoint()
- zeromq_sub_source = zeromq.sub_source(address, 0, pass_tags=True, key="filter_key")
- sink = blocks.vector_sink_f(vlen)
- self.send_tb.connect(src, zeromq_pub_sink)
- self.recv_tb.connect(zeromq_sub_source, sink)
-
- # start both flowgraphs
- self.recv_tb.start()
- time.sleep(0.5)
- self.send_tb.start()
- time.sleep(0.5)
- self.recv_tb.stop()
- self.send_tb.stop()
- self.recv_tb.wait()
- self.send_tb.wait()
-
- # compare data
- self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
-
- # compare all tags
- rx_tags = sink.tags()
- self.assertEqual(len(src_tags), len(rx_tags))
-
- idx = 0
- for in_tag, out_tag in zip(src_tags, rx_tags):
- print(idx)
- print(in_tag)
- print(out_tag)
- self.assertTrue(in_tag == out_tag)
- idx += 1
-
-
-if __name__ == '__main__':
- gr_unittest.run(qa_zeromq_pubsub)
diff --git a/blocklib/zeromq/test/qa_zeromq_pull_msg_source.py b/blocklib/zeromq/test/qa_zeromq_pull_msg_source.py
deleted file mode 100644
index ff161b9c7..000000000
--- a/blocklib/zeromq/test/qa_zeromq_pull_msg_source.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2020 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-#
-"""Unit tests for ZMQ PULL Message Source block"""
-
-import time
-import zmq
-
-from gnuradio import gr, gr_unittest, blocks, zeromq
-import pmtf
-
-
-class qa_zeromq_pull_msg_source(gr_unittest.TestCase):
- """Unit tests for ZMQ PULL Message Source block"""
-
- def setUp(self):
- addr = 'tcp://127.0.0.1'
- self.context = zmq.Context()
- self.zmq_sock = self.context.socket(zmq.PUSH)
- port = self.zmq_sock.bind_to_random_port(addr)
-
- self.zeromq_pull_msg_source = zeromq.pull_msg_source(
- ('%s:%s' % (addr, port)), 100)
- self.message_debug = blocks.message_debug()
- self.tb = gr.top_block()
- self.tb.connect(
- (self.zeromq_pull_msg_source, 'out'), (self.message_debug, 'store'))
-
- self.tb.start()
- time.sleep(0.1)
-
- def tearDown(self):
- self.tb.stop()
- self.tb.wait()
- self.zeromq_pull_msg_source = None
- self.message_debug = None
- self.tb = None
- self.zmq_sock.close()
- self.context.term()
-
- def test_valid_pmt(self):
- """Test receiving of valid PMT messages"""
- msg = pmtf.pmt('test_valid_pmt')
- self.zmq_sock.send(bytes(msg.serialize()))
- for _ in range(10):
- if self.message_debug.num_messages() > 0:
- break
- time.sleep(0.2)
- self.assertEqual(1, self.message_debug.num_messages())
- self.assertEqual(msg(), self.message_debug.get_message(0)())
-
- # def test_invalid_pmt(self):
- # """Test receiving of invalid PMT messages"""
- # self.zmq_sock.send_string('test_invalid_pmt')
- # time.sleep(0.1)
- # self.assertEqual(0, self.message_debug.num_messages()())
-
-
-if __name__ == '__main__':
- gr_unittest.run(qa_zeromq_pull_msg_source)
diff --git a/blocklib/zeromq/test/qa_zeromq_pushpull.py b/blocklib/zeromq/test/qa_zeromq_pushpull.py
deleted file mode 100644
index ea18ed7b1..000000000
--- a/blocklib/zeromq/test/qa_zeromq_pushpull.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2014 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-#
-
-
-from gnuradio import gr, gr_unittest, blocks, zeromq
-import time
-
-
-class qa_zeromq_pushpull (gr_unittest.TestCase):
-
- def setUp(self):
- self.send_tb = gr.top_block()
- self.recv_tb = gr.top_block()
-
- def tearDown(self):
- self.send_tb = None
- self.recv_tb = None
-
- def test_001(self):
- vlen = 10
- src_data = list(range(vlen)) * 100
- src = blocks.vector_source_f(src_data, False, vlen)
- zeromq_push_sink = zeromq.push_sink("tcp://127.0.0.1:0")
- address = zeromq_push_sink.last_endpoint()
- zeromq_pull_source = zeromq.pull_source(address, 0)
- sink = blocks.vector_sink_f(vlen)
- self.send_tb.connect(src, zeromq_push_sink)
- self.recv_tb.connect(zeromq_pull_source, sink)
- self.recv_tb.start()
- # time.sleep(0.5)
- self.send_tb.start()
- time.sleep(0.5)
- self.recv_tb.stop()
- self.send_tb.stop()
- self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
-
-
-if __name__ == '__main__':
- gr_unittest.run(qa_zeromq_pushpull)
diff --git a/blocklib/zeromq/test/qa_zeromq_reqrep.py b/blocklib/zeromq/test/qa_zeromq_reqrep.py
deleted file mode 100644
index 733bf2f03..000000000
--- a/blocklib/zeromq/test/qa_zeromq_reqrep.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-#
-# Copyright 2014 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# SPDX-License-Identifier: GPL-3.0-or-later
-#
-#
-
-
-from gnuradio import gr, gr_unittest
-from gnuradio import blocks, zeromq
-import time
-
-
-class qa_zeromq_reqrep (gr_unittest.TestCase):
-
- def setUp(self):
- self.send_tb = gr.top_block()
- self.recv_tb = gr.top_block()
-
- def tearDown(self):
- self.send_tb = None
- self.recv_tb = None
-
- def test_001(self):
- vlen = 10
- src_data = list(range(vlen)) * 100
- src = blocks.vector_source_f(src_data, False, vlen)
- zeromq_rep_sink = zeromq.rep_sink("tcp://127.0.0.1:0", 0)
- address = zeromq_rep_sink.last_endpoint()
- zeromq_req_source = zeromq.req_source(address, 0)
- sink = blocks.vector_sink_f(vlen)
- self.send_tb.connect(src, zeromq_rep_sink)
- self.recv_tb.connect(zeromq_req_source, sink)
- self.recv_tb.start()
- time.sleep(0.5)
- self.send_tb.start()
- time.sleep(0.5)
- self.recv_tb.stop()
- self.send_tb.stop()
- self.recv_tb.wait()
- self.send_tb.wait()
- self.assertFloatTuplesAlmostEqual(sink.data(), src_data)
-
-
-if __name__ == '__main__':
- gr_unittest.run(qa_zeromq_reqrep)