diff options
Diffstat (limited to 'gr/lib')
-rw-r--r-- | gr/lib/buffer_cpu_host.cc | 96 | ||||
-rw-r--r-- | gr/lib/buffer_cuda.cc | 162 | ||||
-rw-r--r-- | gr/lib/buffer_cuda_pinned.cc | 80 | ||||
-rw-r--r-- | gr/lib/buffer_cuda_sm.cc | 164 | ||||
-rw-r--r-- | gr/lib/buffer_net_zmq.cc | 175 | ||||
-rw-r--r-- | gr/lib/buffer_sm.cc | 374 | ||||
-rw-r--r-- | gr/lib/meson.build | 3 |
7 files changed, 0 insertions, 1054 deletions
diff --git a/gr/lib/buffer_cpu_host.cc b/gr/lib/buffer_cpu_host.cc deleted file mode 100644 index 0623d166f..000000000 --- a/gr/lib/buffer_cpu_host.cc +++ /dev/null @@ -1,96 +0,0 @@ -#include <string.h> -#include <algorithm> -#include <cstdint> -#include <memory> -#include <mutex> -#include <vector> - -#include <gnuradio/buffer_cpu_host.h> - -namespace gr { -buffer_cpu_host::buffer_cpu_host(size_t num_items, - size_t item_size, - buffer_cpu_host_type type, - std::shared_ptr<buffer_properties> buf_properties) - : gr::buffer_sm(num_items, item_size, buf_properties), _transfer_type(type) -{ - _host_buffer.resize(_buf_size); - _device_buffer.resize(_buf_size); - - set_type("buffer_cpu_host_" + std::to_string((int)_transfer_type)); -} - -buffer_uptr buffer_cpu_host::make(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buffer_properties) -{ - auto cbp = std::static_pointer_cast<buffer_cpu_host_properties>(buffer_properties); - if (cbp != nullptr) { - return buffer_uptr(new buffer_cpu_host( - num_items, item_size, cbp->buffer_type(), buffer_properties)); - } - else { - throw std::runtime_error( - "Failed to cast buffer properties to buffer_cpu_host_properties"); - } -} - -void* buffer_cpu_host::read_ptr(size_t index) -{ - if (_transfer_type == buffer_cpu_host_type::D2H || - _transfer_type == buffer_cpu_host_type::H2H) { - return (void*)&_host_buffer[index]; - } - else { - return (void*)&_device_buffer[index]; - } -} -void* buffer_cpu_host::write_ptr() -{ - if (_transfer_type == buffer_cpu_host_type::H2D || - _transfer_type == buffer_cpu_host_type::H2H) { - return (void*)&_host_buffer[_write_index]; - } - else { - return (void*)&_device_buffer[_write_index]; - } -} - -void buffer_cpu_host::post_write(int num_items) -{ - std::lock_guard<std::mutex> guard(_buf_mutex); - - size_t bytes_written = num_items * _item_size; - size_t wi1 = _write_index; - - // num_items were written to the buffer - - if (_transfer_type == buffer_cpu_host_type::H2D) { - memcpy(&_device_buffer[wi1], &_host_buffer[wi1], bytes_written); - } - else if (_transfer_type == buffer_cpu_host_type::D2H) { - memcpy(&_host_buffer[wi1], &_device_buffer[wi1], bytes_written); - } - - // advance the write pointer - _write_index += bytes_written; - if (_write_index == _buf_size) { - _write_index = 0; - } - if (_write_index > _buf_size) { - throw std::runtime_error("buffer_sm: Wrote too far into buffer"); - } - _total_written += num_items; -} - -buffer_reader_uptr -buffer_cpu_host::add_reader(std::shared_ptr<buffer_properties> buf_props, size_t itemsize) -{ - auto r = - std::make_unique<buffer_cpu_host_reader>(this, buf_props, itemsize, _write_index); - _readers.push_back(r.get()); - return r; -} - - -} // namespace gr diff --git a/gr/lib/buffer_cuda.cc b/gr/lib/buffer_cuda.cc deleted file mode 100644 index 73a1e61f4..000000000 --- a/gr/lib/buffer_cuda.cc +++ /dev/null @@ -1,162 +0,0 @@ -#include <cuda.h> -#include <cuda_runtime.h> -#include <string.h> -#include <algorithm> -#include <cstdint> -#include <memory> -#include <mutex> -#include <vector> - -#include <gnuradio/buffer_cuda.h> - - -namespace gr { -buffer_cuda::buffer_cuda(size_t num_items, - size_t item_size, - buffer_cuda_type type, - std::shared_ptr<buffer_properties> buf_properties) - : gr::buffer(num_items, item_size, buf_properties), _type(type) -{ - // _host_buffer.resize(_buf_size * 2); // double circular buffer - cudaMallocHost( - &_host_buffer, - _buf_size * - 2); // double circular buffer - should do something more intelligent here - cudaMalloc( - &_device_buffer, - _buf_size * - 2); // double circular buffer - should do something more intelligent here - set_type("buffer_cuda_" + std::to_string((int)_type)); - - cudaStreamCreate(&stream); -} -buffer_cuda::~buffer_cuda() -{ - cudaFree(_device_buffer); - cudaFree(_host_buffer); -} - -buffer_uptr buffer_cuda::make(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buffer_properties) -{ - auto cbp = std::static_pointer_cast<buffer_cuda_properties>(buffer_properties); - if (cbp != nullptr) { - return buffer_uptr( - new buffer_cuda(num_items, item_size, cbp->buffer_type(), buffer_properties)); - } - else { - throw std::runtime_error( - "Failed to cast buffer properties to buffer_cuda_properties"); - } -} - -void* buffer_cuda::read_ptr(size_t index) -{ - if (_type == buffer_cuda_type::D2H) { - return (void*)&_host_buffer[index]; - } - else { - return (void*)&_device_buffer[index]; - } -} -void* buffer_cuda::write_ptr() -{ - if (_type == buffer_cuda_type::H2D) { - return (void*)&_host_buffer[_write_index]; - } - else { - return (void*)&_device_buffer[_write_index]; - } -} - -void buffer_cuda_reader::post_read(int num_items) -{ - std::lock_guard<std::mutex> guard(_rdr_mutex); - // advance the read pointer - _read_index += num_items * _itemsize; - if (_read_index >= _buffer->buf_size()) { - _read_index -= _buffer->buf_size(); - } - _total_read += num_items; -} -void buffer_cuda::post_write(int num_items) -{ - std::lock_guard<std::mutex> guard(_buf_mutex); - - size_t bytes_written = num_items * _item_size; - size_t wi1 = _write_index; - size_t wi2 = _write_index + _buf_size; - // num_items were written to the buffer - // copy the data to the second half of the buffer - - size_t num_bytes_1 = std::min(_buf_size - wi1, bytes_written); - size_t num_bytes_2 = bytes_written - num_bytes_1; - - if (_type == buffer_cuda_type::H2D) { - cudaMemcpyAsync(&_device_buffer[wi1], - &_host_buffer[wi1], - bytes_written, - cudaMemcpyHostToDevice, - stream); - - // memcpy(&_host_buffer[wi2], &_host_buffer[wi1], num_bytes_1); - cudaMemcpyAsync(&_device_buffer[wi2], - &_device_buffer[wi1], - num_bytes_1, - cudaMemcpyDeviceToDevice, - stream); - if (num_bytes_2) { - // memcpy(&_host_buffer[0], &_host_buffer[_buf_size], num_bytes_2); - cudaMemcpyAsync(&_device_buffer[0], - &_device_buffer[_buf_size], - num_bytes_2, - cudaMemcpyDeviceToDevice, - stream); - } - } - else if (_type == buffer_cuda_type::D2H) { - cudaMemcpyAsync(&_host_buffer[wi1], - &_device_buffer[wi1], - bytes_written, - cudaMemcpyDeviceToHost, - stream); - - memcpy(&_host_buffer[wi2], &_host_buffer[wi1], num_bytes_1); - - if (num_bytes_2) { - memcpy(&_host_buffer[0], &_host_buffer[_buf_size], num_bytes_2); - } - } - else // D2D - { - cudaMemcpyAsync(&_device_buffer[wi2], - &_device_buffer[wi1], - num_bytes_1, - cudaMemcpyDeviceToDevice); - if (num_bytes_2) - cudaMemcpyAsync(&_device_buffer[0], - &_device_buffer[_buf_size], - num_bytes_2, - cudaMemcpyDeviceToDevice, - stream); - } - // advance the write pointer - _write_index += bytes_written; - if (_write_index >= _buf_size) { - _write_index -= _buf_size; - } - _total_written += num_items; - cudaStreamSynchronize(stream); -} - -buffer_reader_uptr buffer_cuda::add_reader(std::shared_ptr<buffer_properties> buf_props, - size_t itemsize) -{ - auto r = - std::make_unique<buffer_cuda_reader>(this, buf_props, itemsize, _write_index); - _readers.push_back(r.get()); - return r; -} - -} // namespace gr diff --git a/gr/lib/buffer_cuda_pinned.cc b/gr/lib/buffer_cuda_pinned.cc deleted file mode 100644 index 3a1de05de..000000000 --- a/gr/lib/buffer_cuda_pinned.cc +++ /dev/null @@ -1,80 +0,0 @@ -#include <string.h> -#include <algorithm> -#include <cstdint> -#include <memory> -#include <mutex> -#include <vector> - -#include <cuda.h> -#include <cuda_runtime.h> - -#include <gnuradio/buffer_cuda_pinned.h> - -namespace gr { -buffer_cuda_pinned::buffer_cuda_pinned(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buf_properties) - : buffer(num_items, item_size, buf_properties) -{ - if (!cudaHostAlloc((void**)&_pinned_buffer, _buf_size * 2, 0) == cudaSuccess) { - throw std::runtime_error("Failed to allocate CUDA pinned memory"); - } -} -buffer_cuda_pinned::~buffer_cuda_pinned() { cudaFree(_pinned_buffer); } - -buffer_uptr buffer_cuda_pinned::make(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buffer_properties) -{ - return buffer_uptr(new buffer_cuda_pinned(num_items, item_size, buffer_properties)); -} - -void* buffer_cuda_pinned::read_ptr(size_t index) { return (void*)&_pinned_buffer[index]; } -void* buffer_cuda_pinned::write_ptr() { return (void*)&_pinned_buffer[_write_index]; } - -void buffer_cuda_pinned_reader::post_read(int num_items) -{ - std::lock_guard<std::mutex> guard(_rdr_mutex); - // advance the read pointer - _read_index += num_items * _itemsize; - if (_read_index >= _buffer->buf_size()) { - _read_index -= _buffer->buf_size(); - } - _total_read += num_items; -} - -void buffer_cuda_pinned::post_write(int num_items) -{ - std::lock_guard<std::mutex> guard(_buf_mutex); - - size_t bytes_written = num_items * _item_size; - size_t wi1 = _write_index; - size_t wi2 = _write_index + _buf_size; - // num_items were written to the buffer - // copy the data to the second half of the buffer - - size_t num_bytes_1 = std::min(_buf_size - wi1, bytes_written); - size_t num_bytes_2 = bytes_written - num_bytes_1; - - memcpy(&_pinned_buffer[wi2], &_pinned_buffer[wi1], num_bytes_1); - if (num_bytes_2) - memcpy(&_pinned_buffer[0], &_pinned_buffer[_buf_size], num_bytes_2); - - // advance the write pointer - _write_index += bytes_written; - if (_write_index >= _buf_size) { - _write_index -= _buf_size; - } -} - -buffer_reader_uptr -buffer_cuda_pinned::add_reader(std::shared_ptr<buffer_properties> buf_props, - size_t itemsize) -{ - auto r = std::make_unique<buffer_cuda_pinned_reader>( - this, buf_props, itemsize, _write_index); - _readers.push_back(r.get()); - return r; -} - -} // namespace gr diff --git a/gr/lib/buffer_cuda_sm.cc b/gr/lib/buffer_cuda_sm.cc deleted file mode 100644 index 7e8bc88d9..000000000 --- a/gr/lib/buffer_cuda_sm.cc +++ /dev/null @@ -1,164 +0,0 @@ -#include <cuda.h> -#include <cuda_runtime.h> -#include <string.h> -#include <algorithm> -#include <cstdint> -#include <memory> -#include <mutex> -#include <vector> - -#include <gnuradio/buffer_cuda_sm.h> - -namespace gr { -buffer_cuda_sm::buffer_cuda_sm(size_t num_items, - size_t item_size, - buffer_cuda_sm_type type, - std::shared_ptr<buffer_properties> buf_properties) - : gr::buffer_sm(num_items, item_size, buf_properties), _type(type) -{ - // _host_buffer.resize(_buf_size * 2); // double circular buffer - cudaMallocHost(&_host_buffer, _buf_size); - cudaMalloc(&_device_buffer, _buf_size); - set_type("buffer_cuda_sm_" + std::to_string((int)_type)); - - cudaStreamCreate(&stream); -} -buffer_cuda_sm::~buffer_cuda_sm() -{ - cudaFree(_device_buffer); - cudaFree(_host_buffer); -} - -buffer_uptr buffer_cuda_sm::make(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buffer_properties) -{ - auto cbp = std::static_pointer_cast<buffer_cuda_sm_properties>(buffer_properties); - if (cbp != nullptr) { - return buffer_uptr(new buffer_cuda_sm( - num_items, item_size, cbp->buffer_type(), buffer_properties)); - } - else { - throw std::runtime_error( - "Failed to cast buffer properties to buffer_cuda_sm_properties"); - } -} - -void* buffer_cuda_sm::read_ptr(size_t index) -{ - if (_type == buffer_cuda_sm_type::D2H) { - return (void*)&_host_buffer[index]; - } - else { - return (void*)&_device_buffer[index]; - } -} -void* buffer_cuda_sm::write_ptr() -{ - if (_type == buffer_cuda_sm_type::H2D) { - return (void*)&_host_buffer[_write_index]; - } - else { - return (void*)&_device_buffer[_write_index]; - } -} - -void buffer_cuda_sm::post_write(int num_items) -{ - std::lock_guard<std::mutex> guard(_buf_mutex); - - size_t bytes_written = num_items * _item_size; - size_t wi1 = _write_index; - - // num_items were written to the buffer - - if (_type == buffer_cuda_sm_type::H2D) { - cudaMemcpyAsync(&_device_buffer[wi1], - &_host_buffer[wi1], - bytes_written, - cudaMemcpyHostToDevice, - stream); - } - else if (_type == buffer_cuda_sm_type::D2H) { - cudaMemcpyAsync(&_host_buffer[wi1], - &_device_buffer[wi1], - bytes_written, - cudaMemcpyDeviceToHost, - stream); - } - - // advance the write pointer - _write_index += bytes_written; - if (_write_index == _buf_size) { - _write_index = 0; - } - if (_write_index > _buf_size) { - throw std::runtime_error("buffer_sm: Wrote too far into buffer"); - } - _total_written += num_items; - cudaStreamSynchronize(stream); -} - -buffer_reader_uptr -buffer_cuda_sm::add_reader(std::shared_ptr<buffer_properties> buf_props, size_t itemsize) -{ - auto r = - std::make_unique<buffer_cuda_sm_reader>(this, buf_props, itemsize, _write_index); - _readers.push_back(r.get()); - return r; -} - -void* buffer_cuda_sm::cuda_memcpy(void* dest, const void* src, std::size_t count) -{ - cudaError_t rc = cudaSuccess; - rc = cudaMemcpy(dest, src, count, cudaMemcpyDeviceToDevice); - if (rc) { - std::ostringstream msg; - msg << "Error performing cudaMemcpy: " << cudaGetErrorName(rc) << " -- " - << cudaGetErrorString(rc); - throw std::runtime_error(msg.str()); - } - - return dest; -} - -void* buffer_cuda_sm::cuda_memmove(void* dest, const void* src, std::size_t count) -{ - // Would a kernel that checks for overlap and then copies front-to-back or - // back-to-front be faster than using cudaMemcpy with a temp buffer? - - // Allocate temp buffer - void* tempBuffer = nullptr; - cudaError_t rc = cudaSuccess; - rc = cudaMalloc((void**)&tempBuffer, count); - if (rc) { - std::ostringstream msg; - msg << "Error allocating device buffer: " << cudaGetErrorName(rc) << " -- " - << cudaGetErrorString(rc); - throw std::runtime_error(msg.str()); - } - - // First copy data from source to temp buffer - rc = cudaMemcpy(tempBuffer, src, count, cudaMemcpyDeviceToDevice); - if (rc) { - std::ostringstream msg; - msg << "Error performing cudaMemcpy: " << cudaGetErrorName(rc) << " -- " - << cudaGetErrorString(rc); - throw std::runtime_error(msg.str()); - } - - // Then copy data from temp buffer to destination to avoid overlap - rc = cudaMemcpy(dest, tempBuffer, count, cudaMemcpyDeviceToDevice); - if (rc) { - std::ostringstream msg; - msg << "Error performing cudaMemcpy: " << cudaGetErrorName(rc) << " -- " - << cudaGetErrorString(rc); - throw std::runtime_error(msg.str()); - } - - cudaFree(tempBuffer); - - return dest; -} - -} // namespace gr diff --git a/gr/lib/buffer_net_zmq.cc b/gr/lib/buffer_net_zmq.cc deleted file mode 100644 index 0236e028a..000000000 --- a/gr/lib/buffer_net_zmq.cc +++ /dev/null @@ -1,175 +0,0 @@ -#include <gnuradio/buffer_cpu_vmcirc.h> -#include <gnuradio/buffer_net_zmq.h> -#include <nlohmann/json.hpp> -#include <chrono> -#include <thread> -namespace gr { - - -std::shared_ptr<buffer_properties> -buffer_net_zmq_properties::make_from_params(const std::string& json_str) -{ - auto json_obj = nlohmann::json::parse(json_str); - return make(json_obj["ipaddr"], json_obj["port"]); -} - -buffer_uptr buffer_net_zmq::make(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buffer_properties) -{ - - auto zbp = std::static_pointer_cast<buffer_net_zmq_properties>(buffer_properties); - if (zbp != nullptr) { - return buffer_uptr( - new buffer_net_zmq(num_items, item_size, buffer_properties, zbp->port())); - } - else { - throw std::runtime_error( - "Failed to cast buffer properties to buffer_net_zmq_properties"); - } -} - -buffer_net_zmq::buffer_net_zmq(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buf_properties, - int port) - : buffer(num_items, item_size, buf_properties), - _context(1), - _socket(_context, zmq::socket_type::push) -{ - gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_net_zmq"); - set_type("buffer_net_zmq"); - _buffer.resize(_buf_size); - _socket.set(zmq::sockopt::sndhwm, 1); - _socket.set(zmq::sockopt::rcvhwm, 1); - std::string endpoint = "tcp://*:" + std::to_string(port); - std::cout << "snd_endpoint: " << endpoint << std::endl; - _socket.bind(endpoint); -} - - -/****************************************************************************/ -/* READER METHODS */ -/****************************************************************************/ - - -buffer_reader_uptr -buffer_net_zmq_reader::make(size_t itemsize, std::shared_ptr<buffer_properties> buf_props) -{ - auto zbp = std::static_pointer_cast<buffer_net_zmq_properties>(buf_props); - if (zbp != nullptr) { - return std::make_unique<buffer_net_zmq_reader>( - buf_props, itemsize, zbp->ipaddr(), zbp->port()); - } - else { - throw std::runtime_error( - "Failed to cast buffer properties to buffer_net_zmq_properties"); - } -} - -buffer_net_zmq_reader::buffer_net_zmq_reader(std::shared_ptr<buffer_properties> buf_props, - size_t itemsize, - const std::string& ipaddr, - int port) - : buffer_reader(nullptr, buf_props, itemsize, 0), - _context(1), - _socket(_context, zmq::socket_type::pull) -{ - gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_net_zmq_reader"); - auto bufprops = std::make_shared<buffer_cpu_vmcirc_properties>(); - _circbuf = gr::buffer_cpu_vmcirc::make( - 8192, - itemsize, - bufprops); // FIXME - make nitems a buffer reader factory parameter - _circbuf_rdr = _circbuf->add_reader(bufprops, itemsize); - - // auto b = (float *)_circbuf->write_ptr(); - - // for (int i=0; i<8192; i++) - // { - // b[i] = i; - // } - // _circbuf->post_write(8192); - - // buffer_info_t info; - // _circbuf_rdr->read_info(info); - // auto br = (float *)_circbuf_rdr->read_ptr(); - - - // _circbuf_rdr->post_read(4096); - // br = (float *) _circbuf_rdr->read_ptr(); - - - std::string endpoint = "tcp://" + ipaddr + ":" + std::to_string(port); - d_debug_logger->debug("rcv_endpoint: {}", endpoint); - _socket.set(zmq::sockopt::sndhwm, 1); - _socket.set(zmq::sockopt::rcvhwm, 1); - // _socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); - _socket.connect(endpoint); - d_debug_logger->debug(" ... connected"); - - std::thread t([this]() { - while (!this->_recv_done) { - // zmq::message_t msg{}; - // See how much room we have in the circular buffer - buffer_info_t wi; - _circbuf->write_info(wi); - - auto n_bytes_left_in_msg = _msg.size() - _msg_idx; - auto n_bytes_in_circbuf = wi.n_items * wi.item_size; - auto bytes_to_write = std::min(n_bytes_in_circbuf, n_bytes_left_in_msg); - auto items_to_write = bytes_to_write / wi.item_size; - bytes_to_write = items_to_write * wi.item_size; - - if (n_bytes_in_circbuf <= 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - continue; - } - - if (bytes_to_write > 0) { - memcpy(wi.ptr, (uint8_t*)_msg.data() + _msg_idx, bytes_to_write); - d_debug_logger->debug("copied {} items", bytes_to_write / wi.item_size); - _msg_idx += bytes_to_write; - n_bytes_left_in_msg = _msg.size() - _msg_idx; - _circbuf->post_write(items_to_write); - notify_scheduler(); - } - - if (n_bytes_left_in_msg == 0) { - _msg.rebuild(); - d_debug_logger->debug("going into recv"); - auto r = _socket.recv(_msg, zmq::recv_flags::none); - if (r) { - d_debug_logger->debug("received msg with size {} items", - _msg.size() / wi.item_size); - _msg_idx = 0; - } - } - // GR_LOG_DEBUG(d_debug_logger, "recv: {}", wi.n_items); - // auto ret = this->_socket.recv( - // zmq::mutable_buffer(_circbuf->write_ptr(), wi.n_items * wi.item_size), - // zmq::recv_flags::none); - - // _circbuf->post_write(wi.n_items); - // notify_scheduler(); - // auto recbuf = *ret; - // assert(recbuf.size == wi.n_items * wi.item_size); - - // GR_LOG_DEBUG(d_debug_logger, "nbytesrcv: {}", recbuf.size); - // std::cout << " ---> msg received " << msg.size() << " bytes" << - // std::endl; - } - }); - - t.detach(); -} - -std::string buffer_net_zmq_properties::to_json() -{ - nlohmann::json j = { { "id", "buffer_net_zmq_properties" }, - { "parameters", { { "ipaddr", _ipaddr }, { "port", _port } } } }; - - return j.dump(); -} - -} // namespace gr diff --git a/gr/lib/buffer_sm.cc b/gr/lib/buffer_sm.cc deleted file mode 100644 index 6766b1062..000000000 --- a/gr/lib/buffer_sm.cc +++ /dev/null @@ -1,374 +0,0 @@ -#include <gnuradio/buffer_sm.h> - -#include <gnuradio/logger.h> - -namespace gr { - -buffer_reader_uptr buffer_sm::add_reader(std::shared_ptr<buffer_properties> buf_props, - size_t itemsize) -{ - auto r = std::make_unique<buffer_sm_reader>(this, itemsize, buf_props, _write_index); - _readers.push_back(r.get()); - return r; -} - -buffer_sm::buffer_sm(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buf_properties) - : buffer(num_items, item_size, buf_properties) -{ - _buffer.resize(_buf_size); // singly mapped buffer - _raw_buffer = _buffer.data(); - _write_index = 0; - - set_type("buffer_sm"); - - - gr::configure_default_loggers(d_logger, d_debug_logger, _type); -} - -buffer_uptr buffer_sm::make(size_t num_items, - size_t item_size, - std::shared_ptr<buffer_properties> buffer_properties) -{ - return buffer_uptr(new buffer_sm(num_items, item_size, buffer_properties)); -} - -void* buffer_sm::read_ptr(size_t index) { return (void*)&_raw_buffer[index]; } -void* buffer_sm::write_ptr() { return (void*)&_raw_buffer[_write_index]; } - -void buffer_sm::post_write(int num_items) -{ - std::scoped_lock guard(_buf_mutex); - - size_t bytes_written = num_items * _item_size; - // num_items were written to the buffer - - // advance the write pointer - _write_index += bytes_written; - if (_write_index == _buf_size) { - _write_index = 0; - } - if (_write_index > _buf_size) { - throw std::runtime_error("buffer_sm: Wrote too far into buffer"); - } - _total_written += num_items; -} - -bool buffer_sm::output_blkd_cb_ready(int output_multiple) -{ - uint32_t space_avail = 0; - { - std::unique_lock<std::mutex>(*this->mutex()); - space_avail = space_available(); - } - return ((space_avail > 0) && - ((space_avail / output_multiple) * output_multiple == 0)); -} - -bool buffer_sm::output_blocked_callback_logic(bool force, memmove_func_t memmove_func) -{ - auto space_avail = space_available(); - - // if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == - // 0)) || - if ((space_avail > 0) || force) { - // Find reader with the smallest read index - uint32_t min_read_idx = _readers[0]->read_index(); - uint64_t min_read_idx_nitems = _readers[0]->total_read(); - for (size_t idx = 1; idx < _readers.size(); ++idx) { - // Record index of reader with minimum read-index - if (_readers[idx]->read_index() < min_read_idx) { - min_read_idx = _readers[idx]->read_index(); - min_read_idx_nitems = _readers[idx]->total_read(); - } - } - - d_debug_logger->debug("output_blocked_callback, space_avail {}, min_read_idx {}, " - "_write_index {}", - space_avail, - min_read_idx, - _write_index); - - // Make sure we have enough room to start writing back at the beginning - if ((min_read_idx == 0) || (min_read_idx > _write_index) || - (min_read_idx == _write_index && min_read_idx_nitems != total_written())) { - return false; - } - - // Determine how much "to be read" data needs to be moved - auto to_move_bytes = _write_index - min_read_idx; - - if (to_move_bytes > min_read_idx) { - return false; - } - - - d_debug_logger->debug("output_blocked_callback, moving {} bytes", to_move_bytes); - - // Shift "to be read" data back to the beginning of the buffer - std::memmove(_raw_buffer, _raw_buffer + (min_read_idx), to_move_bytes); - - // Adjust write index and each reader index - _write_index -= min_read_idx; - - for (size_t idx = 0; idx < _readers.size(); ++idx) { - d_debug_logger->debug("output_blocked_callback,setting _read_index to {}", - _readers[idx]->read_index() - min_read_idx); - _readers[idx]->set_read_index(_readers[idx]->read_index() - min_read_idx); - } - - return true; - } - - return false; -} - -bool buffer_sm::output_blocked_callback(bool force) -{ - std::scoped_lock guard(_buf_mutex); - - return output_blocked_callback_logic(force, std::memmove); -} - -size_t buffer_sm::space_available() -{ - // Find the max number of items available across readers - - uint64_t min_items_read = std::numeric_limits<uint64_t>::max(); - size_t min_read_idx = 0; - for (size_t idx = 0; idx < _readers.size(); idx++) { - std::scoped_lock lck{ *(_readers[idx]->mutex()) }; - auto total_read = _readers[idx]->total_read(); - if (total_read < min_items_read) { - min_items_read = total_read; - min_read_idx = _readers[idx]->read_index(); - } - } - - size_t space = (_buf_size - _write_index) / _item_size; - - if (min_read_idx == _write_index) { - // If the (min) read index and write index are equal then the buffer - // is either completely empty or completely full depending on if - // the number of items read matches the number written - if (min_items_read != total_written()) { - space = 0; - } - } - else if (min_read_idx > _write_index) { - space = (min_read_idx - _write_index) / _item_size; - } - - if (space == 0) - return space; - // Only half fill the buffer - // Leave extra space in case the reader gets stuck and needs realignment - - space = std::min(space, _num_items); - - return space; -} - -bool buffer_sm::write_info(buffer_info_t& info) -{ - std::scoped_lock guard(_buf_mutex); - - info.ptr = write_ptr(); - info.n_items = space_available(); - if (info.n_items < 0) - info.n_items = 0; - info.item_size = _item_size; - info.total_items = _total_written; - - return true; -} - -bool buffer_sm::adjust_buffer_data(memcpy_func_t memcpy_func, memmove_func_t memmove_func) -{ - - // Find reader with the smallest read index that is greater than the - // write index - // auto min_reader_index = std::numeric_limits<size_t>::max(); - auto min_read_idx = std::numeric_limits<size_t>::max(); - for (size_t idx = 0; idx < _readers.size(); ++idx) { - if (_readers[idx]->read_index() > write_index()) { - // Record index of reader with minimum read-index - // FIXME: What if one of the readers has wrapped back around? - // -- in that case this should use items_available() callback - if (_readers[idx]->read_index() < min_read_idx) { - min_read_idx = _readers[idx]->read_index(); - // min_reader_index = idx; - } - } - } - - // Note items_avail might be zero, that's okay. - auto max_bytes_avail = _buf_size - min_read_idx; - auto max_items_avail = max_bytes_avail / _item_size; - auto gap = min_read_idx - _write_index; - if (_write_index > min_read_idx || max_bytes_avail > gap) { - return false; - } - - // GR_LOG_DEBUG(d_debug_logger, - // "adust_buffer_data: max_bytes_avail {}, gap {}", - // max_bytes_avail, - // gap); - - - // Shift existing data down to make room for blocked data at end of buffer - auto move_data_size = _write_index; - auto dest = _raw_buffer + max_bytes_avail; - memmove_func(dest, _raw_buffer, move_data_size); - - // Next copy the data from the end of the buffer back to the beginning - auto avail_data_size = max_bytes_avail; - auto src = _raw_buffer + min_read_idx; - memcpy_func(_raw_buffer, src, avail_data_size); - - // Finally adjust all reader pointers - for (size_t idx = 0; idx < _readers.size(); ++idx) { - // GR_LOG_DEBUG(d_debug_logger, - // "adjust_buffer_data,setting _read_index to {}", - // _readers[idx]->read_index() - min_read_idx); - _readers[idx]->set_read_index(max_items_avail - _readers[idx]->items_available()); - } - - // Now adjust write pointer - _write_index += max_items_avail; - - return true; -} - - -buffer_sm_reader::buffer_sm_reader(buffer_sm* bufp, - size_t itemsize, - std::shared_ptr<buffer_properties> buf_props, - size_t read_index) - : buffer_reader(bufp, buf_props, itemsize, read_index), _buffer_sm(bufp) -{ - gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_sm_reader"); -} - -void buffer_sm_reader::post_read(int num_items) -{ - std::scoped_lock guard(_rdr_mutex); - - // GR_LOG_DEBUG( - // d_debug_logger, "post_read: _read_index {}, num_items {}", _read_index, num_items); - - // advance the read pointer - _read_index += num_items * _itemsize; //_buffer->item_size(); - _total_read += num_items; - if (_read_index == _buffer->buf_size()) { - _read_index = 0; - } - if (_read_index > _buffer->buf_size()) { - // GR_LOG_INFO(d_logger, - // "too far: num_items {}, prev_index {}, post_index {}", - // num_items, - // _read_index - num_items * _buffer->item_size(), - // _read_index); - - // // throw std::runtime_error("buffer_sm_reader: Wrote too far into buffer"); - } - - // GR_LOG_DEBUG(d_debug_logger, "post_read: _read_index {}", _read_index); -} - -bool buffer_sm_reader::input_blocked_callback(size_t items_required) -{ - // Only singly mapped buffers need to do anything with this callback - std::scoped_lock guard(*(_buffer->mutex())); - - auto items_avail = items_available(); - - // GR_LOG_DEBUG(d_debug_logger, - // "input_blocked_callback: items_avail {}, _read_index {}, " - // "_write_index {}, items_required {}", - // items_avail, - // _read_index, - // _buffer->write_index(), - // items_required); - - // GR_LOG_DEBUG(d_debug_logger, - // "input_blocked_callback: total_written {}, total_read {}", - // _buffer->total_written(), - // total_read()); - - - // Maybe adjust read pointers from min read index? - // This would mean that *all* readers must be > (passed) the write index - if (items_avail < items_required && _buffer->write_index() < read_index()) { - // GR_LOG_DEBUG(d_debug_logger, "Calling adjust_buffer_data "); - return _buffer_sm->adjust_buffer_data(std::memcpy, std::memmove); - } - - return false; -} - -size_t buffer_sm_reader::bytes_available() -{ - // Can only read up to to the write_index, or the end of the buffer - // there is no wraparound - - size_t ret = 0; - - size_t w = _buffer->write_index(); - size_t r = _read_index; - - if (w < r) { - ret = (_buffer->buf_size() - r); - } - else if (w == r && total_read() < _buffer->total_written()) { - ret = (_buffer->buf_size() - r); - } - else { - ret = (w - r); - } - - // return ret; - - // GR_LOG_DEBUG(d_debug_logger, - // "items_available: write_index {}, read_index {}, ret {}, total_read " - // "{}, total_written {}", - // w, - // r, - // ret, - // total_read(), - // _buffer->total_written()); - - // if (_itemsize*(_buffer->total_written() - total_read()) < ret) { - // d_debug_logger->debug( - // "check_math {} {} {} {}", - // _buffer->total_written() - total_read(), - // ret, - // total_read(), - // _buffer->total_written()); - // } - - return ret; // in bytes -} - -bool buffer_sm_reader::input_blkd_cb_ready(int items_required) -{ - std::unique_lock<std::mutex>(*_buffer->mutex()); - - return ( - ((_buffer->buf_size() * _itemsize - _read_index) < (uint32_t)items_required) && - (_buffer->write_index() < _read_index)); -} - -buffer_sm_properties::buffer_sm_properties() : buffer_properties() -{ - _bff = buffer_sm::make; -} - -std::shared_ptr<buffer_properties> buffer_sm_properties::make() -{ - return std::static_pointer_cast<buffer_properties>( - std::make_shared<buffer_sm_properties>()); -} - -} // namespace gr diff --git a/gr/lib/meson.build b/gr/lib/meson.build index a809cea2e..dbee9c491 100644 --- a/gr/lib/meson.build +++ b/gr/lib/meson.build @@ -32,7 +32,6 @@ runtime_sources = [ 'block.cc', 'port.cc', 'buffer.cc', - 'buffer_sm.cc', 'buffer_management.cc', 'buffer_cpu_simple.cc', 'realtime.cc', @@ -56,13 +55,11 @@ runtime_sources = [ 'buffer_cpu_vmcirc_sysv_shm.cc', # mmap requires librt - FIXME - handle this a conditional dependency 'buffer_cpu_vmcirc_mmap_shm_open.cc', - 'buffer_net_zmq.cc', 'sptr_magic.cc', 'hier_block.cc', 'prefs.cc', 'tag.cc', 'registry.cc', - 'buffer_cpu_host.cc' ] if IMPLEMENT_CUDA |