253 lines
5.3 KiB
C++
253 lines
5.3 KiB
C++
#ifndef INCLUDE_DI_ZMQ_H_
|
|
#define INCLUDE_DI_ZMQ_H_
|
|
|
|
#include <zmq.h>
|
|
|
|
#include <algorithm>
|
|
#include <cassert>
|
|
#include <memory>
|
|
#include <string>
|
|
|
|
// In order to prevent unused variable warnings when building in non-debug
|
|
// mode use this macro to make assertions.
|
|
#ifdef DEBUG_BUILD
|
|
# define ZMQ_ASSERT(expression) assert(expression)
|
|
#else
|
|
# define ZMQ_ASSERT(expression) (void)(expression)
|
|
#endif
|
|
|
|
namespace Di {
|
|
namespace Zmq {
|
|
|
|
class Context {
|
|
friend class Socket;
|
|
|
|
public:
|
|
inline Context() {
|
|
ptr = zmq_ctx_new();
|
|
ZMQ_ASSERT(ptr != NULL);
|
|
}
|
|
|
|
inline explicit Context(int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) {
|
|
ptr = zmq_ctx_new();
|
|
ZMQ_ASSERT(ptr != NULL);
|
|
|
|
int rc = zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_);
|
|
ZMQ_ASSERT(rc == 0);
|
|
|
|
rc = zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_);
|
|
ZMQ_ASSERT(rc == 0);
|
|
}
|
|
|
|
Context(Context &&rhs) = delete;
|
|
Context(const Context &rhs) = delete;
|
|
void operator =(const Context&) = delete;
|
|
|
|
inline Context &operator =(Context &&rhs) {
|
|
std::swap(ptr, rhs.ptr);
|
|
return *this;
|
|
}
|
|
|
|
inline ~Context() {
|
|
close();
|
|
}
|
|
|
|
inline void close() {
|
|
if (ptr == NULL)
|
|
return;
|
|
int rc = zmq_ctx_destroy(ptr);
|
|
ZMQ_ASSERT(rc == 0);
|
|
ptr = NULL;
|
|
}
|
|
|
|
// Be careful with this, it's probably only useful for
|
|
// using the C api together with an existing C++ api.
|
|
// Normally you should never need to use this.
|
|
inline operator void*() {
|
|
return ptr;
|
|
}
|
|
|
|
private:
|
|
void *ptr;
|
|
};
|
|
|
|
class Socket {
|
|
public:
|
|
inline Socket(const std::shared_ptr<Context> &context_, int type_) {
|
|
ptr = zmq_socket(context_->ptr, type_);
|
|
ZMQ_ASSERT(ptr != nullptr);
|
|
}
|
|
|
|
inline Socket(Socket&& rhs)
|
|
: ptr(rhs.ptr) {
|
|
rhs.ptr = NULL;
|
|
}
|
|
|
|
inline Socket& operator=(Socket&& rhs) {
|
|
std::swap(ptr, rhs.ptr);
|
|
return *this;
|
|
}
|
|
|
|
inline ~Socket() {
|
|
close();
|
|
}
|
|
|
|
inline operator void*() {
|
|
return ptr;
|
|
}
|
|
|
|
inline void close() {
|
|
if (ptr == nullptr)
|
|
// already closed
|
|
return;
|
|
int rc = zmq_close(ptr);
|
|
ZMQ_ASSERT(rc == 0);
|
|
ptr = nullptr;
|
|
}
|
|
|
|
inline void setsockopt(int option_, const void *optval_, size_t optvallen_) {
|
|
int rc = zmq_setsockopt(ptr, option_, optval_, optvallen_);
|
|
ZMQ_ASSERT(rc == 0);
|
|
}
|
|
|
|
inline void getsockopt(int option_, void *optval_, size_t *optvallen_) {
|
|
int rc = zmq_getsockopt(ptr, option_, optval_, optvallen_);
|
|
ZMQ_ASSERT(rc == 0);
|
|
}
|
|
|
|
inline bool bind(const std::string &addr) {
|
|
int rc = zmq_bind(ptr, addr.c_str());
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool bind(const char *addr) {
|
|
int rc = zmq_bind(ptr, addr);
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool unbind(const std::string &addr) {
|
|
int rc = zmq_unbind(ptr, addr.c_str());
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool unbind(const char *addr) {
|
|
int rc = zmq_unbind(ptr, addr);
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool connect(const std::string &addr) {
|
|
int rc = zmq_connect(ptr, addr.c_str());
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool connect(const char *addr) {
|
|
int rc = zmq_connect(ptr, addr);
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool disconnect(const std::string &addr) {
|
|
int rc = zmq_disconnect(ptr, addr.c_str());
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool disconnect(const char *addr) {
|
|
int rc = zmq_disconnect(ptr, addr);
|
|
return (rc == 0);
|
|
}
|
|
|
|
inline bool connected() {
|
|
return (ptr != NULL);
|
|
}
|
|
|
|
inline int pollIn(time_t timeout_ms) {
|
|
zmq_pollitem_t items[] = {{ ptr, 0, ZMQ_POLLIN, 0 }};
|
|
int rc = zmq_poll(items, 1, timeout_ms);
|
|
|
|
if (rc == -1)
|
|
return -1;
|
|
|
|
if (items[0].revents & ZMQ_POLLIN)
|
|
return 1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
inline bool send(const std::string &buf, int flags_ = 0) {
|
|
int nbytes = zmq_send(ptr, buf.data(), buf.size(), flags_);
|
|
if (nbytes >= 0)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
inline bool send(zmq_msg_t *msg_, int flags_ = 0) {
|
|
int nbytes = zmq_send(ptr, zmq_msg_data(msg_), zmq_msg_size(msg_), flags_);
|
|
if (nbytes >= 0)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
inline bool send(const std::shared_ptr<std::string> &buf, int flags_ = 0) {
|
|
int nbytes = zmq_send(ptr, buf->data(), buf->size(), flags_);
|
|
if (nbytes >= 0)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
inline ssize_t send(const void *buf_, size_t len_, int flags_ = 0) {
|
|
int nbytes = zmq_send(ptr, buf_, len_, flags_);
|
|
if (nbytes >= 0)
|
|
return static_cast<ssize_t>(nbytes);
|
|
if (zmq_errno() == EAGAIN)
|
|
return 0;
|
|
return -1;
|
|
}
|
|
|
|
inline ssize_t recv(void *buf_, size_t len_, int flags_ = 0) {
|
|
int nbytes = zmq_recv(ptr, buf_, len_, flags_);
|
|
if (nbytes >= 0)
|
|
return static_cast<ssize_t>(nbytes);
|
|
if (zmq_errno() == EAGAIN)
|
|
return 0;
|
|
return -1;
|
|
}
|
|
|
|
inline ssize_t recv(zmq_msg_t *msg_, int flags_ = 0) {
|
|
zmq_msg_init(msg_);
|
|
return zmq_msg_recv(msg_, ptr, flags_);
|
|
}
|
|
|
|
inline std::shared_ptr<std::string> recv(bool *more, int flags_ = 0) {
|
|
zmq_msg_t msg;
|
|
std::shared_ptr<std::string> retString = nullptr;
|
|
|
|
zmq_msg_init(&msg);
|
|
|
|
int nbytes = zmq_msg_recv(&msg, ptr, flags_);
|
|
if (more) {
|
|
*more = (zmq_msg_more(&msg) != 0);
|
|
}
|
|
if (nbytes >= 0) {
|
|
if (zmq_msg_size(&msg)) {
|
|
retString = std::make_shared<std::string>(static_cast<const char *>(
|
|
zmq_msg_data(&msg)), zmq_msg_size(&msg));
|
|
} else {
|
|
retString = std::make_shared<std::string>();
|
|
}
|
|
}
|
|
|
|
zmq_msg_close(&msg);
|
|
return retString;
|
|
}
|
|
|
|
private:
|
|
void *ptr;
|
|
|
|
Socket(const Socket&) = delete;
|
|
void operator =(const Socket&) = delete;
|
|
};
|
|
|
|
} // namespace Zmq
|
|
} // namespace Di
|
|
|
|
#endif // INCLUDE_DI_ZMQ_H_
|