aboutsummaryrefslogtreecommitdiffstats
path: root/gr/lib
diff options
context:
space:
mode:
Diffstat (limited to 'gr/lib')
-rw-r--r--gr/lib/buffer_cpu_host.cc96
-rw-r--r--gr/lib/buffer_cuda.cc162
-rw-r--r--gr/lib/buffer_cuda_pinned.cc80
-rw-r--r--gr/lib/buffer_cuda_sm.cc164
-rw-r--r--gr/lib/buffer_net_zmq.cc175
-rw-r--r--gr/lib/buffer_sm.cc374
-rw-r--r--gr/lib/meson.build3
7 files changed, 0 insertions, 1054 deletions
diff --git a/gr/lib/buffer_cpu_host.cc b/gr/lib/buffer_cpu_host.cc
deleted file mode 100644
index 0623d166f..000000000
--- a/gr/lib/buffer_cpu_host.cc
+++ /dev/null
@@ -1,96 +0,0 @@
-#include <string.h>
-#include <algorithm>
-#include <cstdint>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include <gnuradio/buffer_cpu_host.h>
-
-namespace gr {
-buffer_cpu_host::buffer_cpu_host(size_t num_items,
- size_t item_size,
- buffer_cpu_host_type type,
- std::shared_ptr<buffer_properties> buf_properties)
- : gr::buffer_sm(num_items, item_size, buf_properties), _transfer_type(type)
-{
- _host_buffer.resize(_buf_size);
- _device_buffer.resize(_buf_size);
-
- set_type("buffer_cpu_host_" + std::to_string((int)_transfer_type));
-}
-
-buffer_uptr buffer_cpu_host::make(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buffer_properties)
-{
- auto cbp = std::static_pointer_cast<buffer_cpu_host_properties>(buffer_properties);
- if (cbp != nullptr) {
- return buffer_uptr(new buffer_cpu_host(
- num_items, item_size, cbp->buffer_type(), buffer_properties));
- }
- else {
- throw std::runtime_error(
- "Failed to cast buffer properties to buffer_cpu_host_properties");
- }
-}
-
-void* buffer_cpu_host::read_ptr(size_t index)
-{
- if (_transfer_type == buffer_cpu_host_type::D2H ||
- _transfer_type == buffer_cpu_host_type::H2H) {
- return (void*)&_host_buffer[index];
- }
- else {
- return (void*)&_device_buffer[index];
- }
-}
-void* buffer_cpu_host::write_ptr()
-{
- if (_transfer_type == buffer_cpu_host_type::H2D ||
- _transfer_type == buffer_cpu_host_type::H2H) {
- return (void*)&_host_buffer[_write_index];
- }
- else {
- return (void*)&_device_buffer[_write_index];
- }
-}
-
-void buffer_cpu_host::post_write(int num_items)
-{
- std::lock_guard<std::mutex> guard(_buf_mutex);
-
- size_t bytes_written = num_items * _item_size;
- size_t wi1 = _write_index;
-
- // num_items were written to the buffer
-
- if (_transfer_type == buffer_cpu_host_type::H2D) {
- memcpy(&_device_buffer[wi1], &_host_buffer[wi1], bytes_written);
- }
- else if (_transfer_type == buffer_cpu_host_type::D2H) {
- memcpy(&_host_buffer[wi1], &_device_buffer[wi1], bytes_written);
- }
-
- // advance the write pointer
- _write_index += bytes_written;
- if (_write_index == _buf_size) {
- _write_index = 0;
- }
- if (_write_index > _buf_size) {
- throw std::runtime_error("buffer_sm: Wrote too far into buffer");
- }
- _total_written += num_items;
-}
-
-buffer_reader_uptr
-buffer_cpu_host::add_reader(std::shared_ptr<buffer_properties> buf_props, size_t itemsize)
-{
- auto r =
- std::make_unique<buffer_cpu_host_reader>(this, buf_props, itemsize, _write_index);
- _readers.push_back(r.get());
- return r;
-}
-
-
-} // namespace gr
diff --git a/gr/lib/buffer_cuda.cc b/gr/lib/buffer_cuda.cc
deleted file mode 100644
index 73a1e61f4..000000000
--- a/gr/lib/buffer_cuda.cc
+++ /dev/null
@@ -1,162 +0,0 @@
-#include <cuda.h>
-#include <cuda_runtime.h>
-#include <string.h>
-#include <algorithm>
-#include <cstdint>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include <gnuradio/buffer_cuda.h>
-
-
-namespace gr {
-buffer_cuda::buffer_cuda(size_t num_items,
- size_t item_size,
- buffer_cuda_type type,
- std::shared_ptr<buffer_properties> buf_properties)
- : gr::buffer(num_items, item_size, buf_properties), _type(type)
-{
- // _host_buffer.resize(_buf_size * 2); // double circular buffer
- cudaMallocHost(
- &_host_buffer,
- _buf_size *
- 2); // double circular buffer - should do something more intelligent here
- cudaMalloc(
- &_device_buffer,
- _buf_size *
- 2); // double circular buffer - should do something more intelligent here
- set_type("buffer_cuda_" + std::to_string((int)_type));
-
- cudaStreamCreate(&stream);
-}
-buffer_cuda::~buffer_cuda()
-{
- cudaFree(_device_buffer);
- cudaFree(_host_buffer);
-}
-
-buffer_uptr buffer_cuda::make(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buffer_properties)
-{
- auto cbp = std::static_pointer_cast<buffer_cuda_properties>(buffer_properties);
- if (cbp != nullptr) {
- return buffer_uptr(
- new buffer_cuda(num_items, item_size, cbp->buffer_type(), buffer_properties));
- }
- else {
- throw std::runtime_error(
- "Failed to cast buffer properties to buffer_cuda_properties");
- }
-}
-
-void* buffer_cuda::read_ptr(size_t index)
-{
- if (_type == buffer_cuda_type::D2H) {
- return (void*)&_host_buffer[index];
- }
- else {
- return (void*)&_device_buffer[index];
- }
-}
-void* buffer_cuda::write_ptr()
-{
- if (_type == buffer_cuda_type::H2D) {
- return (void*)&_host_buffer[_write_index];
- }
- else {
- return (void*)&_device_buffer[_write_index];
- }
-}
-
-void buffer_cuda_reader::post_read(int num_items)
-{
- std::lock_guard<std::mutex> guard(_rdr_mutex);
- // advance the read pointer
- _read_index += num_items * _itemsize;
- if (_read_index >= _buffer->buf_size()) {
- _read_index -= _buffer->buf_size();
- }
- _total_read += num_items;
-}
-void buffer_cuda::post_write(int num_items)
-{
- std::lock_guard<std::mutex> guard(_buf_mutex);
-
- size_t bytes_written = num_items * _item_size;
- size_t wi1 = _write_index;
- size_t wi2 = _write_index + _buf_size;
- // num_items were written to the buffer
- // copy the data to the second half of the buffer
-
- size_t num_bytes_1 = std::min(_buf_size - wi1, bytes_written);
- size_t num_bytes_2 = bytes_written - num_bytes_1;
-
- if (_type == buffer_cuda_type::H2D) {
- cudaMemcpyAsync(&_device_buffer[wi1],
- &_host_buffer[wi1],
- bytes_written,
- cudaMemcpyHostToDevice,
- stream);
-
- // memcpy(&_host_buffer[wi2], &_host_buffer[wi1], num_bytes_1);
- cudaMemcpyAsync(&_device_buffer[wi2],
- &_device_buffer[wi1],
- num_bytes_1,
- cudaMemcpyDeviceToDevice,
- stream);
- if (num_bytes_2) {
- // memcpy(&_host_buffer[0], &_host_buffer[_buf_size], num_bytes_2);
- cudaMemcpyAsync(&_device_buffer[0],
- &_device_buffer[_buf_size],
- num_bytes_2,
- cudaMemcpyDeviceToDevice,
- stream);
- }
- }
- else if (_type == buffer_cuda_type::D2H) {
- cudaMemcpyAsync(&_host_buffer[wi1],
- &_device_buffer[wi1],
- bytes_written,
- cudaMemcpyDeviceToHost,
- stream);
-
- memcpy(&_host_buffer[wi2], &_host_buffer[wi1], num_bytes_1);
-
- if (num_bytes_2) {
- memcpy(&_host_buffer[0], &_host_buffer[_buf_size], num_bytes_2);
- }
- }
- else // D2D
- {
- cudaMemcpyAsync(&_device_buffer[wi2],
- &_device_buffer[wi1],
- num_bytes_1,
- cudaMemcpyDeviceToDevice);
- if (num_bytes_2)
- cudaMemcpyAsync(&_device_buffer[0],
- &_device_buffer[_buf_size],
- num_bytes_2,
- cudaMemcpyDeviceToDevice,
- stream);
- }
- // advance the write pointer
- _write_index += bytes_written;
- if (_write_index >= _buf_size) {
- _write_index -= _buf_size;
- }
- _total_written += num_items;
- cudaStreamSynchronize(stream);
-}
-
-buffer_reader_uptr buffer_cuda::add_reader(std::shared_ptr<buffer_properties> buf_props,
- size_t itemsize)
-{
- auto r =
- std::make_unique<buffer_cuda_reader>(this, buf_props, itemsize, _write_index);
- _readers.push_back(r.get());
- return r;
-}
-
-} // namespace gr
diff --git a/gr/lib/buffer_cuda_pinned.cc b/gr/lib/buffer_cuda_pinned.cc
deleted file mode 100644
index 3a1de05de..000000000
--- a/gr/lib/buffer_cuda_pinned.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-#include <string.h>
-#include <algorithm>
-#include <cstdint>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include <cuda.h>
-#include <cuda_runtime.h>
-
-#include <gnuradio/buffer_cuda_pinned.h>
-
-namespace gr {
-buffer_cuda_pinned::buffer_cuda_pinned(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buf_properties)
- : buffer(num_items, item_size, buf_properties)
-{
- if (!cudaHostAlloc((void**)&_pinned_buffer, _buf_size * 2, 0) == cudaSuccess) {
- throw std::runtime_error("Failed to allocate CUDA pinned memory");
- }
-}
-buffer_cuda_pinned::~buffer_cuda_pinned() { cudaFree(_pinned_buffer); }
-
-buffer_uptr buffer_cuda_pinned::make(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buffer_properties)
-{
- return buffer_uptr(new buffer_cuda_pinned(num_items, item_size, buffer_properties));
-}
-
-void* buffer_cuda_pinned::read_ptr(size_t index) { return (void*)&_pinned_buffer[index]; }
-void* buffer_cuda_pinned::write_ptr() { return (void*)&_pinned_buffer[_write_index]; }
-
-void buffer_cuda_pinned_reader::post_read(int num_items)
-{
- std::lock_guard<std::mutex> guard(_rdr_mutex);
- // advance the read pointer
- _read_index += num_items * _itemsize;
- if (_read_index >= _buffer->buf_size()) {
- _read_index -= _buffer->buf_size();
- }
- _total_read += num_items;
-}
-
-void buffer_cuda_pinned::post_write(int num_items)
-{
- std::lock_guard<std::mutex> guard(_buf_mutex);
-
- size_t bytes_written = num_items * _item_size;
- size_t wi1 = _write_index;
- size_t wi2 = _write_index + _buf_size;
- // num_items were written to the buffer
- // copy the data to the second half of the buffer
-
- size_t num_bytes_1 = std::min(_buf_size - wi1, bytes_written);
- size_t num_bytes_2 = bytes_written - num_bytes_1;
-
- memcpy(&_pinned_buffer[wi2], &_pinned_buffer[wi1], num_bytes_1);
- if (num_bytes_2)
- memcpy(&_pinned_buffer[0], &_pinned_buffer[_buf_size], num_bytes_2);
-
- // advance the write pointer
- _write_index += bytes_written;
- if (_write_index >= _buf_size) {
- _write_index -= _buf_size;
- }
-}
-
-buffer_reader_uptr
-buffer_cuda_pinned::add_reader(std::shared_ptr<buffer_properties> buf_props,
- size_t itemsize)
-{
- auto r = std::make_unique<buffer_cuda_pinned_reader>(
- this, buf_props, itemsize, _write_index);
- _readers.push_back(r.get());
- return r;
-}
-
-} // namespace gr
diff --git a/gr/lib/buffer_cuda_sm.cc b/gr/lib/buffer_cuda_sm.cc
deleted file mode 100644
index 7e8bc88d9..000000000
--- a/gr/lib/buffer_cuda_sm.cc
+++ /dev/null
@@ -1,164 +0,0 @@
-#include <cuda.h>
-#include <cuda_runtime.h>
-#include <string.h>
-#include <algorithm>
-#include <cstdint>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include <gnuradio/buffer_cuda_sm.h>
-
-namespace gr {
-buffer_cuda_sm::buffer_cuda_sm(size_t num_items,
- size_t item_size,
- buffer_cuda_sm_type type,
- std::shared_ptr<buffer_properties> buf_properties)
- : gr::buffer_sm(num_items, item_size, buf_properties), _type(type)
-{
- // _host_buffer.resize(_buf_size * 2); // double circular buffer
- cudaMallocHost(&_host_buffer, _buf_size);
- cudaMalloc(&_device_buffer, _buf_size);
- set_type("buffer_cuda_sm_" + std::to_string((int)_type));
-
- cudaStreamCreate(&stream);
-}
-buffer_cuda_sm::~buffer_cuda_sm()
-{
- cudaFree(_device_buffer);
- cudaFree(_host_buffer);
-}
-
-buffer_uptr buffer_cuda_sm::make(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buffer_properties)
-{
- auto cbp = std::static_pointer_cast<buffer_cuda_sm_properties>(buffer_properties);
- if (cbp != nullptr) {
- return buffer_uptr(new buffer_cuda_sm(
- num_items, item_size, cbp->buffer_type(), buffer_properties));
- }
- else {
- throw std::runtime_error(
- "Failed to cast buffer properties to buffer_cuda_sm_properties");
- }
-}
-
-void* buffer_cuda_sm::read_ptr(size_t index)
-{
- if (_type == buffer_cuda_sm_type::D2H) {
- return (void*)&_host_buffer[index];
- }
- else {
- return (void*)&_device_buffer[index];
- }
-}
-void* buffer_cuda_sm::write_ptr()
-{
- if (_type == buffer_cuda_sm_type::H2D) {
- return (void*)&_host_buffer[_write_index];
- }
- else {
- return (void*)&_device_buffer[_write_index];
- }
-}
-
-void buffer_cuda_sm::post_write(int num_items)
-{
- std::lock_guard<std::mutex> guard(_buf_mutex);
-
- size_t bytes_written = num_items * _item_size;
- size_t wi1 = _write_index;
-
- // num_items were written to the buffer
-
- if (_type == buffer_cuda_sm_type::H2D) {
- cudaMemcpyAsync(&_device_buffer[wi1],
- &_host_buffer[wi1],
- bytes_written,
- cudaMemcpyHostToDevice,
- stream);
- }
- else if (_type == buffer_cuda_sm_type::D2H) {
- cudaMemcpyAsync(&_host_buffer[wi1],
- &_device_buffer[wi1],
- bytes_written,
- cudaMemcpyDeviceToHost,
- stream);
- }
-
- // advance the write pointer
- _write_index += bytes_written;
- if (_write_index == _buf_size) {
- _write_index = 0;
- }
- if (_write_index > _buf_size) {
- throw std::runtime_error("buffer_sm: Wrote too far into buffer");
- }
- _total_written += num_items;
- cudaStreamSynchronize(stream);
-}
-
-buffer_reader_uptr
-buffer_cuda_sm::add_reader(std::shared_ptr<buffer_properties> buf_props, size_t itemsize)
-{
- auto r =
- std::make_unique<buffer_cuda_sm_reader>(this, buf_props, itemsize, _write_index);
- _readers.push_back(r.get());
- return r;
-}
-
-void* buffer_cuda_sm::cuda_memcpy(void* dest, const void* src, std::size_t count)
-{
- cudaError_t rc = cudaSuccess;
- rc = cudaMemcpy(dest, src, count, cudaMemcpyDeviceToDevice);
- if (rc) {
- std::ostringstream msg;
- msg << "Error performing cudaMemcpy: " << cudaGetErrorName(rc) << " -- "
- << cudaGetErrorString(rc);
- throw std::runtime_error(msg.str());
- }
-
- return dest;
-}
-
-void* buffer_cuda_sm::cuda_memmove(void* dest, const void* src, std::size_t count)
-{
- // Would a kernel that checks for overlap and then copies front-to-back or
- // back-to-front be faster than using cudaMemcpy with a temp buffer?
-
- // Allocate temp buffer
- void* tempBuffer = nullptr;
- cudaError_t rc = cudaSuccess;
- rc = cudaMalloc((void**)&tempBuffer, count);
- if (rc) {
- std::ostringstream msg;
- msg << "Error allocating device buffer: " << cudaGetErrorName(rc) << " -- "
- << cudaGetErrorString(rc);
- throw std::runtime_error(msg.str());
- }
-
- // First copy data from source to temp buffer
- rc = cudaMemcpy(tempBuffer, src, count, cudaMemcpyDeviceToDevice);
- if (rc) {
- std::ostringstream msg;
- msg << "Error performing cudaMemcpy: " << cudaGetErrorName(rc) << " -- "
- << cudaGetErrorString(rc);
- throw std::runtime_error(msg.str());
- }
-
- // Then copy data from temp buffer to destination to avoid overlap
- rc = cudaMemcpy(dest, tempBuffer, count, cudaMemcpyDeviceToDevice);
- if (rc) {
- std::ostringstream msg;
- msg << "Error performing cudaMemcpy: " << cudaGetErrorName(rc) << " -- "
- << cudaGetErrorString(rc);
- throw std::runtime_error(msg.str());
- }
-
- cudaFree(tempBuffer);
-
- return dest;
-}
-
-} // namespace gr
diff --git a/gr/lib/buffer_net_zmq.cc b/gr/lib/buffer_net_zmq.cc
deleted file mode 100644
index 0236e028a..000000000
--- a/gr/lib/buffer_net_zmq.cc
+++ /dev/null
@@ -1,175 +0,0 @@
-#include <gnuradio/buffer_cpu_vmcirc.h>
-#include <gnuradio/buffer_net_zmq.h>
-#include <nlohmann/json.hpp>
-#include <chrono>
-#include <thread>
-namespace gr {
-
-
-std::shared_ptr<buffer_properties>
-buffer_net_zmq_properties::make_from_params(const std::string& json_str)
-{
- auto json_obj = nlohmann::json::parse(json_str);
- return make(json_obj["ipaddr"], json_obj["port"]);
-}
-
-buffer_uptr buffer_net_zmq::make(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buffer_properties)
-{
-
- auto zbp = std::static_pointer_cast<buffer_net_zmq_properties>(buffer_properties);
- if (zbp != nullptr) {
- return buffer_uptr(
- new buffer_net_zmq(num_items, item_size, buffer_properties, zbp->port()));
- }
- else {
- throw std::runtime_error(
- "Failed to cast buffer properties to buffer_net_zmq_properties");
- }
-}
-
-buffer_net_zmq::buffer_net_zmq(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buf_properties,
- int port)
- : buffer(num_items, item_size, buf_properties),
- _context(1),
- _socket(_context, zmq::socket_type::push)
-{
- gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_net_zmq");
- set_type("buffer_net_zmq");
- _buffer.resize(_buf_size);
- _socket.set(zmq::sockopt::sndhwm, 1);
- _socket.set(zmq::sockopt::rcvhwm, 1);
- std::string endpoint = "tcp://*:" + std::to_string(port);
- std::cout << "snd_endpoint: " << endpoint << std::endl;
- _socket.bind(endpoint);
-}
-
-
-/****************************************************************************/
-/* READER METHODS */
-/****************************************************************************/
-
-
-buffer_reader_uptr
-buffer_net_zmq_reader::make(size_t itemsize, std::shared_ptr<buffer_properties> buf_props)
-{
- auto zbp = std::static_pointer_cast<buffer_net_zmq_properties>(buf_props);
- if (zbp != nullptr) {
- return std::make_unique<buffer_net_zmq_reader>(
- buf_props, itemsize, zbp->ipaddr(), zbp->port());
- }
- else {
- throw std::runtime_error(
- "Failed to cast buffer properties to buffer_net_zmq_properties");
- }
-}
-
-buffer_net_zmq_reader::buffer_net_zmq_reader(std::shared_ptr<buffer_properties> buf_props,
- size_t itemsize,
- const std::string& ipaddr,
- int port)
- : buffer_reader(nullptr, buf_props, itemsize, 0),
- _context(1),
- _socket(_context, zmq::socket_type::pull)
-{
- gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_net_zmq_reader");
- auto bufprops = std::make_shared<buffer_cpu_vmcirc_properties>();
- _circbuf = gr::buffer_cpu_vmcirc::make(
- 8192,
- itemsize,
- bufprops); // FIXME - make nitems a buffer reader factory parameter
- _circbuf_rdr = _circbuf->add_reader(bufprops, itemsize);
-
- // auto b = (float *)_circbuf->write_ptr();
-
- // for (int i=0; i<8192; i++)
- // {
- // b[i] = i;
- // }
- // _circbuf->post_write(8192);
-
- // buffer_info_t info;
- // _circbuf_rdr->read_info(info);
- // auto br = (float *)_circbuf_rdr->read_ptr();
-
-
- // _circbuf_rdr->post_read(4096);
- // br = (float *) _circbuf_rdr->read_ptr();
-
-
- std::string endpoint = "tcp://" + ipaddr + ":" + std::to_string(port);
- d_debug_logger->debug("rcv_endpoint: {}", endpoint);
- _socket.set(zmq::sockopt::sndhwm, 1);
- _socket.set(zmq::sockopt::rcvhwm, 1);
- // _socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
- _socket.connect(endpoint);
- d_debug_logger->debug(" ... connected");
-
- std::thread t([this]() {
- while (!this->_recv_done) {
- // zmq::message_t msg{};
- // See how much room we have in the circular buffer
- buffer_info_t wi;
- _circbuf->write_info(wi);
-
- auto n_bytes_left_in_msg = _msg.size() - _msg_idx;
- auto n_bytes_in_circbuf = wi.n_items * wi.item_size;
- auto bytes_to_write = std::min(n_bytes_in_circbuf, n_bytes_left_in_msg);
- auto items_to_write = bytes_to_write / wi.item_size;
- bytes_to_write = items_to_write * wi.item_size;
-
- if (n_bytes_in_circbuf <= 0) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- continue;
- }
-
- if (bytes_to_write > 0) {
- memcpy(wi.ptr, (uint8_t*)_msg.data() + _msg_idx, bytes_to_write);
- d_debug_logger->debug("copied {} items", bytes_to_write / wi.item_size);
- _msg_idx += bytes_to_write;
- n_bytes_left_in_msg = _msg.size() - _msg_idx;
- _circbuf->post_write(items_to_write);
- notify_scheduler();
- }
-
- if (n_bytes_left_in_msg == 0) {
- _msg.rebuild();
- d_debug_logger->debug("going into recv");
- auto r = _socket.recv(_msg, zmq::recv_flags::none);
- if (r) {
- d_debug_logger->debug("received msg with size {} items",
- _msg.size() / wi.item_size);
- _msg_idx = 0;
- }
- }
- // GR_LOG_DEBUG(d_debug_logger, "recv: {}", wi.n_items);
- // auto ret = this->_socket.recv(
- // zmq::mutable_buffer(_circbuf->write_ptr(), wi.n_items * wi.item_size),
- // zmq::recv_flags::none);
-
- // _circbuf->post_write(wi.n_items);
- // notify_scheduler();
- // auto recbuf = *ret;
- // assert(recbuf.size == wi.n_items * wi.item_size);
-
- // GR_LOG_DEBUG(d_debug_logger, "nbytesrcv: {}", recbuf.size);
- // std::cout << " ---> msg received " << msg.size() << " bytes" <<
- // std::endl;
- }
- });
-
- t.detach();
-}
-
-std::string buffer_net_zmq_properties::to_json()
-{
- nlohmann::json j = { { "id", "buffer_net_zmq_properties" },
- { "parameters", { { "ipaddr", _ipaddr }, { "port", _port } } } };
-
- return j.dump();
-}
-
-} // namespace gr
diff --git a/gr/lib/buffer_sm.cc b/gr/lib/buffer_sm.cc
deleted file mode 100644
index 6766b1062..000000000
--- a/gr/lib/buffer_sm.cc
+++ /dev/null
@@ -1,374 +0,0 @@
-#include <gnuradio/buffer_sm.h>
-
-#include <gnuradio/logger.h>
-
-namespace gr {
-
-buffer_reader_uptr buffer_sm::add_reader(std::shared_ptr<buffer_properties> buf_props,
- size_t itemsize)
-{
- auto r = std::make_unique<buffer_sm_reader>(this, itemsize, buf_props, _write_index);
- _readers.push_back(r.get());
- return r;
-}
-
-buffer_sm::buffer_sm(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buf_properties)
- : buffer(num_items, item_size, buf_properties)
-{
- _buffer.resize(_buf_size); // singly mapped buffer
- _raw_buffer = _buffer.data();
- _write_index = 0;
-
- set_type("buffer_sm");
-
-
- gr::configure_default_loggers(d_logger, d_debug_logger, _type);
-}
-
-buffer_uptr buffer_sm::make(size_t num_items,
- size_t item_size,
- std::shared_ptr<buffer_properties> buffer_properties)
-{
- return buffer_uptr(new buffer_sm(num_items, item_size, buffer_properties));
-}
-
-void* buffer_sm::read_ptr(size_t index) { return (void*)&_raw_buffer[index]; }
-void* buffer_sm::write_ptr() { return (void*)&_raw_buffer[_write_index]; }
-
-void buffer_sm::post_write(int num_items)
-{
- std::scoped_lock guard(_buf_mutex);
-
- size_t bytes_written = num_items * _item_size;
- // num_items were written to the buffer
-
- // advance the write pointer
- _write_index += bytes_written;
- if (_write_index == _buf_size) {
- _write_index = 0;
- }
- if (_write_index > _buf_size) {
- throw std::runtime_error("buffer_sm: Wrote too far into buffer");
- }
- _total_written += num_items;
-}
-
-bool buffer_sm::output_blkd_cb_ready(int output_multiple)
-{
- uint32_t space_avail = 0;
- {
- std::unique_lock<std::mutex>(*this->mutex());
- space_avail = space_available();
- }
- return ((space_avail > 0) &&
- ((space_avail / output_multiple) * output_multiple == 0));
-}
-
-bool buffer_sm::output_blocked_callback_logic(bool force, memmove_func_t memmove_func)
-{
- auto space_avail = space_available();
-
- // if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple ==
- // 0)) ||
- if ((space_avail > 0) || force) {
- // Find reader with the smallest read index
- uint32_t min_read_idx = _readers[0]->read_index();
- uint64_t min_read_idx_nitems = _readers[0]->total_read();
- for (size_t idx = 1; idx < _readers.size(); ++idx) {
- // Record index of reader with minimum read-index
- if (_readers[idx]->read_index() < min_read_idx) {
- min_read_idx = _readers[idx]->read_index();
- min_read_idx_nitems = _readers[idx]->total_read();
- }
- }
-
- d_debug_logger->debug("output_blocked_callback, space_avail {}, min_read_idx {}, "
- "_write_index {}",
- space_avail,
- min_read_idx,
- _write_index);
-
- // Make sure we have enough room to start writing back at the beginning
- if ((min_read_idx == 0) || (min_read_idx > _write_index) ||
- (min_read_idx == _write_index && min_read_idx_nitems != total_written())) {
- return false;
- }
-
- // Determine how much "to be read" data needs to be moved
- auto to_move_bytes = _write_index - min_read_idx;
-
- if (to_move_bytes > min_read_idx) {
- return false;
- }
-
-
- d_debug_logger->debug("output_blocked_callback, moving {} bytes", to_move_bytes);
-
- // Shift "to be read" data back to the beginning of the buffer
- std::memmove(_raw_buffer, _raw_buffer + (min_read_idx), to_move_bytes);
-
- // Adjust write index and each reader index
- _write_index -= min_read_idx;
-
- for (size_t idx = 0; idx < _readers.size(); ++idx) {
- d_debug_logger->debug("output_blocked_callback,setting _read_index to {}",
- _readers[idx]->read_index() - min_read_idx);
- _readers[idx]->set_read_index(_readers[idx]->read_index() - min_read_idx);
- }
-
- return true;
- }
-
- return false;
-}
-
-bool buffer_sm::output_blocked_callback(bool force)
-{
- std::scoped_lock guard(_buf_mutex);
-
- return output_blocked_callback_logic(force, std::memmove);
-}
-
-size_t buffer_sm::space_available()
-{
- // Find the max number of items available across readers
-
- uint64_t min_items_read = std::numeric_limits<uint64_t>::max();
- size_t min_read_idx = 0;
- for (size_t idx = 0; idx < _readers.size(); idx++) {
- std::scoped_lock lck{ *(_readers[idx]->mutex()) };
- auto total_read = _readers[idx]->total_read();
- if (total_read < min_items_read) {
- min_items_read = total_read;
- min_read_idx = _readers[idx]->read_index();
- }
- }
-
- size_t space = (_buf_size - _write_index) / _item_size;
-
- if (min_read_idx == _write_index) {
- // If the (min) read index and write index are equal then the buffer
- // is either completely empty or completely full depending on if
- // the number of items read matches the number written
- if (min_items_read != total_written()) {
- space = 0;
- }
- }
- else if (min_read_idx > _write_index) {
- space = (min_read_idx - _write_index) / _item_size;
- }
-
- if (space == 0)
- return space;
- // Only half fill the buffer
- // Leave extra space in case the reader gets stuck and needs realignment
-
- space = std::min(space, _num_items);
-
- return space;
-}
-
-bool buffer_sm::write_info(buffer_info_t& info)
-{
- std::scoped_lock guard(_buf_mutex);
-
- info.ptr = write_ptr();
- info.n_items = space_available();
- if (info.n_items < 0)
- info.n_items = 0;
- info.item_size = _item_size;
- info.total_items = _total_written;
-
- return true;
-}
-
-bool buffer_sm::adjust_buffer_data(memcpy_func_t memcpy_func, memmove_func_t memmove_func)
-{
-
- // Find reader with the smallest read index that is greater than the
- // write index
- // auto min_reader_index = std::numeric_limits<size_t>::max();
- auto min_read_idx = std::numeric_limits<size_t>::max();
- for (size_t idx = 0; idx < _readers.size(); ++idx) {
- if (_readers[idx]->read_index() > write_index()) {
- // Record index of reader with minimum read-index
- // FIXME: What if one of the readers has wrapped back around?
- // -- in that case this should use items_available() callback
- if (_readers[idx]->read_index() < min_read_idx) {
- min_read_idx = _readers[idx]->read_index();
- // min_reader_index = idx;
- }
- }
- }
-
- // Note items_avail might be zero, that's okay.
- auto max_bytes_avail = _buf_size - min_read_idx;
- auto max_items_avail = max_bytes_avail / _item_size;
- auto gap = min_read_idx - _write_index;
- if (_write_index > min_read_idx || max_bytes_avail > gap) {
- return false;
- }
-
- // GR_LOG_DEBUG(d_debug_logger,
- // "adust_buffer_data: max_bytes_avail {}, gap {}",
- // max_bytes_avail,
- // gap);
-
-
- // Shift existing data down to make room for blocked data at end of buffer
- auto move_data_size = _write_index;
- auto dest = _raw_buffer + max_bytes_avail;
- memmove_func(dest, _raw_buffer, move_data_size);
-
- // Next copy the data from the end of the buffer back to the beginning
- auto avail_data_size = max_bytes_avail;
- auto src = _raw_buffer + min_read_idx;
- memcpy_func(_raw_buffer, src, avail_data_size);
-
- // Finally adjust all reader pointers
- for (size_t idx = 0; idx < _readers.size(); ++idx) {
- // GR_LOG_DEBUG(d_debug_logger,
- // "adjust_buffer_data,setting _read_index to {}",
- // _readers[idx]->read_index() - min_read_idx);
- _readers[idx]->set_read_index(max_items_avail - _readers[idx]->items_available());
- }
-
- // Now adjust write pointer
- _write_index += max_items_avail;
-
- return true;
-}
-
-
-buffer_sm_reader::buffer_sm_reader(buffer_sm* bufp,
- size_t itemsize,
- std::shared_ptr<buffer_properties> buf_props,
- size_t read_index)
- : buffer_reader(bufp, buf_props, itemsize, read_index), _buffer_sm(bufp)
-{
- gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_sm_reader");
-}
-
-void buffer_sm_reader::post_read(int num_items)
-{
- std::scoped_lock guard(_rdr_mutex);
-
- // GR_LOG_DEBUG(
- // d_debug_logger, "post_read: _read_index {}, num_items {}", _read_index, num_items);
-
- // advance the read pointer
- _read_index += num_items * _itemsize; //_buffer->item_size();
- _total_read += num_items;
- if (_read_index == _buffer->buf_size()) {
- _read_index = 0;
- }
- if (_read_index > _buffer->buf_size()) {
- // GR_LOG_INFO(d_logger,
- // "too far: num_items {}, prev_index {}, post_index {}",
- // num_items,
- // _read_index - num_items * _buffer->item_size(),
- // _read_index);
-
- // // throw std::runtime_error("buffer_sm_reader: Wrote too far into buffer");
- }
-
- // GR_LOG_DEBUG(d_debug_logger, "post_read: _read_index {}", _read_index);
-}
-
-bool buffer_sm_reader::input_blocked_callback(size_t items_required)
-{
- // Only singly mapped buffers need to do anything with this callback
- std::scoped_lock guard(*(_buffer->mutex()));
-
- auto items_avail = items_available();
-
- // GR_LOG_DEBUG(d_debug_logger,
- // "input_blocked_callback: items_avail {}, _read_index {}, "
- // "_write_index {}, items_required {}",
- // items_avail,
- // _read_index,
- // _buffer->write_index(),
- // items_required);
-
- // GR_LOG_DEBUG(d_debug_logger,
- // "input_blocked_callback: total_written {}, total_read {}",
- // _buffer->total_written(),
- // total_read());
-
-
- // Maybe adjust read pointers from min read index?
- // This would mean that *all* readers must be > (passed) the write index
- if (items_avail < items_required && _buffer->write_index() < read_index()) {
- // GR_LOG_DEBUG(d_debug_logger, "Calling adjust_buffer_data ");
- return _buffer_sm->adjust_buffer_data(std::memcpy, std::memmove);
- }
-
- return false;
-}
-
-size_t buffer_sm_reader::bytes_available()
-{
- // Can only read up to to the write_index, or the end of the buffer
- // there is no wraparound
-
- size_t ret = 0;
-
- size_t w = _buffer->write_index();
- size_t r = _read_index;
-
- if (w < r) {
- ret = (_buffer->buf_size() - r);
- }
- else if (w == r && total_read() < _buffer->total_written()) {
- ret = (_buffer->buf_size() - r);
- }
- else {
- ret = (w - r);
- }
-
- // return ret;
-
- // GR_LOG_DEBUG(d_debug_logger,
- // "items_available: write_index {}, read_index {}, ret {}, total_read "
- // "{}, total_written {}",
- // w,
- // r,
- // ret,
- // total_read(),
- // _buffer->total_written());
-
- // if (_itemsize*(_buffer->total_written() - total_read()) < ret) {
- // d_debug_logger->debug(
- // "check_math {} {} {} {}",
- // _buffer->total_written() - total_read(),
- // ret,
- // total_read(),
- // _buffer->total_written());
- // }
-
- return ret; // in bytes
-}
-
-bool buffer_sm_reader::input_blkd_cb_ready(int items_required)
-{
- std::unique_lock<std::mutex>(*_buffer->mutex());
-
- return (
- ((_buffer->buf_size() * _itemsize - _read_index) < (uint32_t)items_required) &&
- (_buffer->write_index() < _read_index));
-}
-
-buffer_sm_properties::buffer_sm_properties() : buffer_properties()
-{
- _bff = buffer_sm::make;
-}
-
-std::shared_ptr<buffer_properties> buffer_sm_properties::make()
-{
- return std::static_pointer_cast<buffer_properties>(
- std::make_shared<buffer_sm_properties>());
-}
-
-} // namespace gr
diff --git a/gr/lib/meson.build b/gr/lib/meson.build
index a809cea2e..dbee9c491 100644
--- a/gr/lib/meson.build
+++ b/gr/lib/meson.build
@@ -32,7 +32,6 @@ runtime_sources = [
'block.cc',
'port.cc',
'buffer.cc',
- 'buffer_sm.cc',
'buffer_management.cc',
'buffer_cpu_simple.cc',
'realtime.cc',
@@ -56,13 +55,11 @@ runtime_sources = [
'buffer_cpu_vmcirc_sysv_shm.cc',
# mmap requires librt - FIXME - handle this a conditional dependency
'buffer_cpu_vmcirc_mmap_shm_open.cc',
- 'buffer_net_zmq.cc',
'sptr_magic.cc',
'hier_block.cc',
'prefs.cc',
'tag.cc',
'registry.cc',
- 'buffer_cpu_host.cc'
]
if IMPLEMENT_CUDA