Typed zeromq socket
ActivePublic

Authored by bcs on Jan 1 2017, 4:25 PM.
class ZmqContext {
public:
explicit ZmqContext(int ioThreads = 1) : ctx_(zmq_ctx_new()) {
if (ctx_ == nullptr) throw std::bad_alloc();
set_option(ZMQ_IO_THREADS, ioThreads);
set_option(ZMQ_IPV6, true);
}
~ZmqContext() {
int rc = zmq_ctx_term(ctx_);
while (rc != 0) {
if (errno == EFAULT) abort();
rc = zmq_ctx_term(ctx_);
}
}
bool set_option(int opt, int optValue) {
return zmq_ctx_set(ctx_, opt, optValue) == 0;
}
operator void*() const {
return ctx_;
}
private:
void* ctx_;
};
// Holds a pointer to a globally accessible ZmqContext
// That pointer **must** live as long as the lifetime of the program.
class ZmqGlobalContext {
public:
static void setContext(ZmqContext* ctx) {
globalContext_ = ctx;
}
ZmqContext& allocate() const {
return *globalContext_;
}
private:
static ZmqContext* globalContext_;
};
struct ZmqSocketDeleter {
void operator()(void* socket) {
if (socket == nullptr) {
return;
}
std::cout << "closing socket\n";
zmq_close(socket);
}
};
using ZmqSocket = std::unique_ptr<void, ZmqSocketDeleter>;
template<class T>
struct InprocSerde {
void writeMsg(zmq_msg_t* msg, const T& value) {
size_t storageSize = sizeof(T) + alignof(T);
zmq_msg_init_size(msg, storageSize);
void* p = zmq_msg_data(msg);
if (std::align(alignof(T), sizeof(T), p, storageSize)) {
new(p) T(value);
}
}
void writeMsg(zmq_msg_t* msg, T&& value) {
size_t storageSize = sizeof(T) + alignof(T);
zmq_msg_init_size(msg, storageSize);
void* p = zmq_msg_data(msg);
if (std::align(alignof(T), sizeof(T), p, storageSize)) {
new(p) T(std::move(value));
}
}
T readMsg(zmq_msg_t* msg) {
T* tmp1 = reinterpret_cast<T*>(zmq_msg_data(msg));
T tmp2(std::move(*tmp1));
try {
tmp1->~T();
zmq_msg_close(msg);
} catch (...) {
zmq_msg_close(msg);
throw;
}
return tmp2;
}
};
template<class T, int socketType, class MsgSerde=InprocSerde<T>, class ZmqContextAllocator=ZmqGlobalContext>
class ZmqTypedSocket {
public:
explicit ZmqTypedSocket(int sendHWM, int recvHWM) : socket_(zmq_socket(ZmqContextAllocator().allocate(), socketType)), serde_() {
if (!socket_) {
throw std::runtime_error("socket failed allocation");
}
set_option(ZMQ_SNDHWM, sendHWM);
set_option(ZMQ_RCVHWM, recvHWM);
}
bool connect(const std::string& addr) {
return zmq_connect(socket_.get(), addr.c_str()) == 0;
}
bool bind(const std::string& addr) {
return zmq_bind(socket_.get(), addr.c_str()) == 0;
}
bool set_option(int opt, int optValue) {
return zmq_setsockopt(socket_.get(), opt, &optValue, sizeof(int)) == 0;
}
bool set_option(int opt, const std::string& optValue) {
return zmq_setsockopt(socket_.get(), opt, optValue.c_str(), optValue.size()) == 0;
}
bool send(T&& value) {
static_assert(ZmqSocketTraits<socketType>::canSend, "this socket type can't send");
zmq_msg_t msg;
serde_.writeMsg(&msg, std::forward<T>(value));
return zmq_msg_send(&msg, socket_.get(), 0) > 0;
}
bool send(const T& value) {
static_assert(ZmqSocketTraits<socketType>::canSend, "this socket type can't send");
zmq_msg_t msg;
serde_.writeMsg(&msg, value);
return zmq_msg_send(&msg, socket_.get(), 0) > 0;
}
T recv() {
static_assert(ZmqSocketTraits<socketType>::canRecv, "this socket type can't recieve");
zmq_msg_t msg;
zmq_msg_init(&msg);
zmq_msg_recv(&msg, socket_.get(), 0);
return serde_.readMsg(&msg);
}
private:
ZmqSocket socket_;
MsgSerde serde_;
};
bcs created this paste.Jan 1 2017, 4:25 PM
bcs edited the content of this paste. (Show Details)Jan 1 2017, 7:13 PM
bcs changed the edit policy from "No One" to "bcs (Brian Smith)".