aboutsummaryrefslogtreecommitdiffstats
path: root/gr-zeromq
diff options
context:
space:
mode:
authorThomas Habets <thomas@habets.se>2020-08-30 13:26:41 +0100
committerMarcus Müller <marcus@hostalia.de>2020-09-22 21:48:57 +0200
commit9b4e6276897493c10067544b6469a0fa40d9402e (patch)
tree4f200bedd371255a2f970103af22180daeccfe8d /gr-zeromq
parentgr-blocks: More graceful error handling of missing sndfile dependency. (diff)
downloadgnuradio-9b4e6276897493c10067544b6469a0fa40d9402e.tar.xz
gnuradio-9b4e6276897493c10067544b6469a0fa40d9402e.zip
zeromq: Remove manual memory management
I believe this fixes a memory leak, as the thread objects were never deleted.
Diffstat (limited to 'gr-zeromq')
-rw-r--r--gr-zeromq/lib/base_impl.cc54
-rw-r--r--gr-zeromq/lib/base_impl.h4
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.cc24
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.h6
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.cc28
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.h8
-rw-r--r--gr-zeromq/lib/push_msg_sink_impl.cc24
-rw-r--r--gr-zeromq/lib/push_msg_sink_impl.h6
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc2
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.cc32
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.h8
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc6
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc34
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.h8
-rw-r--r--gr-zeromq/lib/req_source_impl.cc4
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.cc28
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.h8
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc2
18 files changed, 127 insertions, 159 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index 67f800274..588949089 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -25,7 +25,12 @@ base_impl::base_impl(int type,
int timeout,
bool pass_tags,
const std::string& key)
- : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key)
+ : d_context(1),
+ d_socket(d_context, type),
+ d_vsize(itemsize * vlen),
+ d_timeout(timeout),
+ d_pass_tags(pass_tags),
+ d_key(key)
{
/* "Fix" timeout value (ms for new API, us for old API) */
int major, minor, patch;
@@ -34,24 +39,15 @@ base_impl::base_impl(int type,
if (major < 3) {
d_timeout *= 1000;
}
-
- /* Create context & socket */
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, type);
}
-base_impl::~base_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+base_impl::~base_impl() {}
std::string base_impl::last_endpoint()
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
@@ -69,15 +65,15 @@ base_sink_impl::base_sink_impl(int type,
/* Set high watermark */
if (hwm >= 0) {
#ifdef ZMQ_SNDHWM
- d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+ d_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
- d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+ d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
}
/* Bind */
- d_socket->bind(address);
+ d_socket.bind(address);
}
int base_sink_impl::send_message(const void* in_buf,
@@ -89,9 +85,9 @@ int base_sink_impl::send_message(const void* in_buf,
zmq::message_t key_message(d_key.size());
memcpy(key_message.data(), d_key.data(), d_key.size());
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(key_message, zmq::send_flags::sndmore);
+ d_socket.send(key_message, zmq::send_flags::sndmore);
#else
- d_socket->send(key_message, ZMQ_SNDMORE);
+ d_socket.send(key_message, ZMQ_SNDMORE);
#endif
}
/* Meta-data header */
@@ -116,9 +112,9 @@ int base_sink_impl::send_message(const void* in_buf,
/* Send */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(msg, zmq::send_flags::none);
+ d_socket.send(msg, zmq::send_flags::none);
#else
- d_socket->send(msg);
+ d_socket.send(msg);
#endif
/* Report back */
@@ -140,15 +136,15 @@ base_source_impl::base_source_impl(int type,
/* Set high watermark */
if (hwm >= 0) {
#ifdef ZMQ_RCVHWM
- d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+ d_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
- d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+ d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
}
/* Connect */
- d_socket->connect(address);
+ d_socket.connect(address);
}
bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; }
@@ -185,7 +181,7 @@ int base_source_impl::flush_pending(void* out_buf,
bool base_source_impl::load_message(bool wait)
{
/* Poll for input */
- zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, wait ? d_timeout : 0);
if (!(items[0].revents & ZMQ_POLLIN))
@@ -194,7 +190,7 @@ bool base_source_impl::load_message(bool wait)
/* Is this the start or continuation of a multi-part message? */
int64_t more = 0;
size_t more_len = sizeof(more);
- d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
+ d_socket.getsockopt(ZMQ_RCVMORE, &more, &more_len);
/* Reset */
d_msg.rebuild();
@@ -204,23 +200,23 @@ bool base_source_impl::load_message(bool wait)
/* Get the message */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(d_msg);
+ d_socket.recv(d_msg);
#else
- d_socket->recv(&d_msg);
+ d_socket.recv(&d_msg);
#endif
/* Throw away key and get the first message. Avoid blocking if a multi-part
* message is not sent */
if (d_key.size() > 0 && !more) {
int64_t is_multipart;
- d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
+ d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
d_msg.rebuild();
if (is_multipart)
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(d_msg);
+ d_socket.recv(d_msg);
#else
- d_socket->recv(&d_msg);
+ d_socket.recv(&d_msg);
#endif
else
return false;
diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h
index c9b5245fd..eb98ce1b3 100644
--- a/gr-zeromq/lib/base_impl.h
+++ b/gr-zeromq/lib/base_impl.h
@@ -30,8 +30,8 @@ public:
protected:
std::string last_endpoint();
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
size_t d_vsize;
int d_timeout;
bool d_pass_tags;
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc
index 232597e81..c88f9e8f4 100644
--- a/gr-zeromq/lib/pub_msg_sink_impl.cc
+++ b/gr-zeromq/lib/pub_msg_sink_impl.cc
@@ -28,7 +28,9 @@ pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout, bool bind)
: gr::block("pub_msg_sink",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
+ d_timeout(timeout),
+ d_context(1),
+ d_socket(d_context, ZMQ_PUB)
{
int major, minor, patch;
zmq::version(&major, &minor, &patch);
@@ -37,28 +39,20 @@ pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout, bool bind)
d_timeout = timeout * 1000;
}
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
-
int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time));
if (bind) {
- d_socket->bind(address);
+ d_socket.bind(address);
} else {
- d_socket->connect(address);
+ d_socket.connect(address);
}
message_port_register_in(pmt::mp("in"));
set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); });
}
-pub_msg_sink_impl::~pub_msg_sink_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+pub_msg_sink_impl::~pub_msg_sink_impl() {}
void pub_msg_sink_impl::handler(pmt::pmt_t msg)
{
@@ -69,9 +63,9 @@ void pub_msg_sink_impl::handler(pmt::pmt_t msg)
memcpy(zmsg.data(), s.c_str(), s.size());
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(zmsg, zmq::send_flags::none);
+ d_socket.send(zmsg, zmq::send_flags::none);
#else
- d_socket->send(zmsg);
+ d_socket.send(zmsg);
#endif
}
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h
index 1c278ce70..49e547ea9 100644
--- a/gr-zeromq/lib/pub_msg_sink_impl.h
+++ b/gr-zeromq/lib/pub_msg_sink_impl.h
@@ -21,8 +21,8 @@ class pub_msg_sink_impl : public pub_msg_sink
{
private:
float d_timeout;
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
public:
pub_msg_sink_impl(char* address, int timeout, bool bind);
@@ -33,7 +33,7 @@ public:
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
};
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc
index 9a467e597..56dba6973 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -16,6 +16,7 @@
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/make_unique.hpp>
#include <boost/thread/thread.hpp>
namespace gr {
@@ -31,6 +32,8 @@ pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout, bool bind
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
+ d_context(1),
+ d_socket(d_context, ZMQ_PULL),
d_port(pmt::mp("out"))
{
int major, minor, patch;
@@ -40,32 +43,25 @@ pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout, bool bind
d_timeout = timeout * 1000;
}
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
-
int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time));
if (bind) {
- d_socket->bind(address);
+ d_socket.bind(address);
} else {
- d_socket->connect(address);
+ d_socket.connect(address);
}
message_port_register_out(d_port);
}
-pull_msg_source_impl::~pull_msg_source_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+pull_msg_source_impl::~pull_msg_source_impl() {}
bool pull_msg_source_impl::start()
{
d_finished = false;
- d_thread = new boost::thread(boost::bind(&pull_msg_source_impl::readloop, this));
+ d_thread = boost::make_unique<boost::thread>(
+ boost::bind(&pull_msg_source_impl::readloop, this));
return true;
}
@@ -80,7 +76,7 @@ void pull_msg_source_impl::readloop()
{
while (!d_finished) {
- zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
@@ -89,9 +85,9 @@ void pull_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket.recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket.recv(&msg);
#endif
std::string buf(static_cast<char*>(msg.data()), msg.size());
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h b/gr-zeromq/lib/pull_msg_source_impl.h
index 4d84b1111..068028425 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.h
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -21,9 +21,9 @@ class pull_msg_source_impl : public pull_msg_source
{
private:
int d_timeout; // microseconds, -1 is blocking
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
- boost::thread* d_thread;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
+ std::unique_ptr<boost::thread> d_thread;
const pmt::pmt_t d_port;
void readloop();
@@ -41,7 +41,7 @@ public:
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
};
diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc
index b32bea620..08b2eff40 100644
--- a/gr-zeromq/lib/push_msg_sink_impl.cc
+++ b/gr-zeromq/lib/push_msg_sink_impl.cc
@@ -28,7 +28,9 @@ push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout, bool bind)
: gr::block("push_msg_sink",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
+ d_timeout(timeout),
+ d_context(1),
+ d_socket(d_context, ZMQ_PUSH)
{
int major, minor, patch;
zmq::version(&major, &minor, &patch);
@@ -37,28 +39,20 @@ push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout, bool bind)
d_timeout = timeout * 1000;
}
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
-
int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time));
if (bind) {
- d_socket->bind(address);
+ d_socket.bind(address);
} else {
- d_socket->connect(address);
+ d_socket.connect(address);
}
message_port_register_in(pmt::mp("in"));
set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); });
}
-push_msg_sink_impl::~push_msg_sink_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+push_msg_sink_impl::~push_msg_sink_impl() {}
void push_msg_sink_impl::handler(pmt::pmt_t msg)
{
@@ -69,9 +63,9 @@ void push_msg_sink_impl::handler(pmt::pmt_t msg)
memcpy(zmsg.data(), s.c_str(), s.size());
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(zmsg, zmq::send_flags::none);
+ d_socket.send(zmsg, zmq::send_flags::none);
#else
- d_socket->send(zmsg);
+ d_socket.send(zmsg);
#endif
}
diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h
index a197e6856..57a3a2da3 100644
--- a/gr-zeromq/lib/push_msg_sink_impl.h
+++ b/gr-zeromq/lib/push_msg_sink_impl.h
@@ -21,8 +21,8 @@ class push_msg_sink_impl : public push_msg_sink
{
private:
float d_timeout;
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
public:
push_msg_sink_impl(char* address, int timeout, bool bind);
@@ -33,7 +33,7 @@ public:
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
};
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index fb3d3f323..427bb8abb 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -41,7 +41,7 @@ int push_sink_impl::work(int noutput_items,
gr_vector_void_star& output_items)
{
// Poll with a timeout (FIXME: scheduler can't wait for us)
- zmq::pollitem_t itemsout[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 } };
+ zmq::pollitem_t itemsout[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLOUT, 0 } };
zmq::poll(&itemsout[0], 1, d_timeout);
// If we can send something, do it
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 5496d2625..5bb4f3b15 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -15,6 +15,7 @@
#include "rep_msg_sink_impl.h"
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
+#include <boost/make_unique.hpp>
namespace gr {
namespace zeromq {
@@ -29,6 +30,8 @@ rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout, bool bind)
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
+ d_context(1),
+ d_socket(d_context, ZMQ_REP),
d_port(pmt::mp("in"))
{
int major, minor, patch;
@@ -38,32 +41,25 @@ rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout, bool bind)
d_timeout = timeout * 1000;
}
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
-
int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time));
if (bind) {
- d_socket->bind(address);
+ d_socket.bind(address);
} else {
- d_socket->connect(address);
+ d_socket.connect(address);
}
message_port_register_in(d_port);
}
-rep_msg_sink_impl::~rep_msg_sink_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+rep_msg_sink_impl::~rep_msg_sink_impl() {}
bool rep_msg_sink_impl::start()
{
d_finished = false;
- d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, this));
+ d_thread = boost::make_unique<boost::thread>(
+ boost::bind(&rep_msg_sink_impl::readloop, this));
return true;
}
@@ -83,7 +79,7 @@ void rep_msg_sink_impl::readloop()
// wait for query...
zmq::pollitem_t items[] = {
- { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 }
+ { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 }
};
zmq::poll(&items[0], 1, d_timeout);
@@ -93,9 +89,9 @@ void rep_msg_sink_impl::readloop()
// receive data request
zmq::message_t request;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(request);
+ d_socket.recv(request);
#else
- d_socket->recv(&request);
+ d_socket.recv(&request);
#endif
int req_output_items = *(static_cast<int*>(request.data()));
@@ -111,9 +107,9 @@ void rep_msg_sink_impl::readloop()
zmq::message_t zmsg(s.size());
memcpy(zmsg.data(), s.c_str(), s.size());
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(zmsg, zmq::send_flags::none);
+ d_socket.send(zmsg, zmq::send_flags::none);
#else
- d_socket->send(zmsg);
+ d_socket.send(zmsg);
#endif
} // if req
} // while !empty
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h
index d42bc8077..b5685d440 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.h
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -21,9 +21,9 @@ class rep_msg_sink_impl : public rep_msg_sink
{
private:
int d_timeout;
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
- boost::thread* d_thread;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
+ std::unique_ptr<boost::thread> d_thread;
bool d_finished;
const pmt::pmt_t d_port;
@@ -41,7 +41,7 @@ public:
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
};
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 7f207f53a..920cc1ff2 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -49,7 +49,7 @@ int rep_sink_impl::work(int noutput_items,
/* Wait for a small time (FIXME: scheduler can't wait for us) */
/* We only wait if its the first iteration, for the others we'll
* let the scheduler retry */
- zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, first ? d_timeout : 0);
/* If we don't have anything, we're done */
@@ -59,9 +59,9 @@ int rep_sink_impl::work(int noutput_items,
/* Get and parse the request */
zmq::message_t request;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(request);
+ d_socket.recv(request);
#else
- d_socket->recv(&request);
+ d_socket.recv(&request);
#endif
int nitems_send = noutput_items - done;
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index a1d2bf76f..ff4405cb0 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -16,6 +16,7 @@
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/make_unique.hpp>
#include <boost/thread/thread.hpp>
namespace gr {
@@ -31,6 +32,8 @@ req_msg_source_impl::req_msg_source_impl(char* address, int timeout, bool bind)
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
+ d_context(1),
+ d_socket(d_context, ZMQ_REQ),
d_port(pmt::mp("out"))
{
int major, minor, patch;
@@ -40,32 +43,25 @@ req_msg_source_impl::req_msg_source_impl(char* address, int timeout, bool bind)
d_timeout = timeout * 1000;
}
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time));
if (bind) {
- d_socket->bind(address);
+ d_socket.bind(address);
} else {
- d_socket->connect(address);
+ d_socket.connect(address);
}
message_port_register_out(d_port);
}
-req_msg_source_impl::~req_msg_source_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+req_msg_source_impl::~req_msg_source_impl() {}
bool req_msg_source_impl::start()
{
d_finished = false;
- d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this));
+ d_thread = boost::make_unique<boost::thread>(
+ boost::bind(&req_msg_source_impl::readloop, this));
return true;
}
@@ -82,7 +78,7 @@ void req_msg_source_impl::readloop()
// std::cout << "readloop\n";
zmq::pollitem_t itemsout[] = {
- { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 }
+ { static_cast<void*>(d_socket), 0, ZMQ_POLLOUT, 0 }
};
zmq::poll(&itemsout[0], 1, d_timeout);
@@ -93,13 +89,13 @@ void req_msg_source_impl::readloop()
zmq::message_t request(sizeof(int));
memcpy((void*)request.data(), &nmsg, sizeof(int));
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(request, zmq::send_flags::none);
+ d_socket.send(request, zmq::send_flags::none);
#else
- d_socket->send(request);
+ d_socket.send(request);
#endif
}
- zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
@@ -107,9 +103,9 @@ void req_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket.recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket.recv(&msg);
#endif
std::string buf(static_cast<char*>(msg.data()), msg.size());
diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h
index 3ea44c632..e8b85c680 100644
--- a/gr-zeromq/lib/req_msg_source_impl.h
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -21,9 +21,9 @@ class req_msg_source_impl : public req_msg_source
{
private:
int d_timeout;
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
- boost::thread* d_thread;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
+ std::unique_ptr<boost::thread> d_thread;
const pmt::pmt_t d_port;
void readloop();
@@ -41,7 +41,7 @@ public:
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
};
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index b200ed076..4ecaeb08d 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -66,9 +66,9 @@ int req_source_impl::work(int noutput_items,
zmq::message_t request(sizeof(uint32_t));
memcpy((void*)request.data(), &req_len, sizeof(uint32_t));
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(request, zmq::send_flags::none);
+ d_socket.send(request, zmq::send_flags::none);
#else
- d_socket->send(request);
+ d_socket.send(request);
#endif
d_req_pending = true;
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc
index d5b06b4df..9c4b28309 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.cc
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -16,6 +16,7 @@
#include "tag_headers.h"
#include <gnuradio/io_signature.h>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/make_unique.hpp>
#include <boost/thread/thread.hpp>
namespace gr {
@@ -31,6 +32,8 @@ sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout, bool bind)
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
+ d_context(1),
+ d_socket(d_context, ZMQ_SUB),
d_port(pmt::mp("out"))
{
int major, minor, patch;
@@ -40,31 +43,24 @@ sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout, bool bind)
d_timeout = timeout * 1000;
}
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-
- d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
if (bind) {
- d_socket->bind(address);
+ d_socket.bind(address);
} else {
- d_socket->connect(address);
+ d_socket.connect(address);
}
message_port_register_out(d_port);
}
-sub_msg_source_impl::~sub_msg_source_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+sub_msg_source_impl::~sub_msg_source_impl() {}
bool sub_msg_source_impl::start()
{
d_finished = false;
- d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this));
+ d_thread = boost::make_unique<boost::thread>(
+ boost::bind(&sub_msg_source_impl::readloop, this));
return true;
}
@@ -79,7 +75,7 @@ void sub_msg_source_impl::readloop()
{
while (!d_finished) {
- zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
@@ -88,9 +84,9 @@ void sub_msg_source_impl::readloop()
// Receive data
zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket.recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket.recv(&msg);
#endif
std::string buf(static_cast<char*>(msg.data()), msg.size());
std::stringbuf sb(buf);
diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h
index 6707a217d..52759c3f1 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.h
+++ b/gr-zeromq/lib/sub_msg_source_impl.h
@@ -21,9 +21,9 @@ class sub_msg_source_impl : public sub_msg_source
{
private:
int d_timeout; // microseconds, -1 is blocking
- zmq::context_t* d_context;
- zmq::socket_t* d_socket;
- boost::thread* d_thread;
+ zmq::context_t d_context;
+ zmq::socket_t d_socket;
+ std::unique_ptr<boost::thread> d_thread;
const pmt::pmt_t d_port;
void readloop();
@@ -41,7 +41,7 @@ public:
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
};
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index ddc2eb7ad..168ac9eeb 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -44,7 +44,7 @@ sub_source_impl::sub_source_impl(size_t itemsize,
base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key)
{
/* Subscribe */
- d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size());
+ d_socket.setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size());
}
int sub_source_impl::work(int noutput_items,