19 std::stringstream socketAddressData;
20 socketAddressData <<
"tcp://" << address <<
":" << port;
21 zmq::socket_t *socket =
new zmq::socket_t(context, type);
22 if(socket == NULL){
return NULL;}
23 socket->connect(socketAddressData.str());
25#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
26 socket->set(zmq::sockopt::linger, -1);
28 socket->setsockopt(ZMQ_LINGER, -1);
31#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
32 socket->set(zmq::sockopt::subscribe,
"");
33 socket->set(zmq::sockopt::conflate, 1);
35 socket->setsockopt(ZMQ_SUBSCRIBE,
"", 0);
37 socket->setsockopt(ZMQ_CONFLATE, &conflate,
sizeof(
int));
50 std::stringstream socketAddressData;
51 socketAddressData <<
"tcp://127.0.0.1:" << port;
52 zmq::socket_t *socket =
new zmq::socket_t(context, type);
54 socket->bind(socketAddressData.str());
55#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
56 socket->set(zmq::sockopt::linger, -1);
58 socket->setsockopt(ZMQ_LINGER, -1);
74zmq::socket_t*
pzmq_createServerSocket(zmq::context_t & context,
size_t port,
int type,
int nbBufferMessage,
int bufferSizeByte,
75 size_t threadAffinity, ssize_t dataRate)
78 bool b(socket != NULL);
98zmq::socket_t*
pzmq_createClientSocket(zmq::context_t & context,
const std::string & address,
size_t port,
int type,
int nbBufferMessage,
99 int bufferSizeByte,
size_t threadAffinity, ssize_t dataRate)
102 bool b(socket != NULL);
128#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
129 socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
132 socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage,
sizeof(
int));
133 socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage,
sizeof(
int));
144 if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
145 int dataRateKbit(dataRate*8l);
146#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
147 socket->set(zmq::sockopt::rate, dataRateKbit);
150 socket->setsockopt(ZMQ_RATE, dataRateKbit);
161#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
162 socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
165 socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
176#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
177 socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
180 socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
191#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
192 socket->set(zmq::sockopt::affinity, threadAffinity);
195 socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
207void pzmq_setBufferSize(zmq::socket_t* socket,
int type,
int nbBufferMessage,
int dataRate,
size_t bufferSizeByte){
210 if(type == ZMQ_PULL || type == ZMQ_SUB){
212 }
else if(type == ZMQ_PUSH || type == ZMQ_PUB){
void pzmq_setSendBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to send messages.
void pzmq_closeServerSocket(zmq::socket_t *&socket)
Close the given server socket.
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
void pzmq_setThreadAffinity(zmq::socket_t *socket, size_t threadAffinity)
Set the thread affinity of zmq.
void pzmq_setRecvBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to received messages.
void pzmq_setBufferSize(zmq::socket_t *socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte)
Set the size of the buffer to send messages.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
void pzmq_setDataRate(zmq::socket_t *socket, int type, int dataRate)
Set the data rate of the socket.
void pzmq_setNbMessageBuffer(zmq::socket_t *socket, int nbBufferMessage)
Set the number of messages in the messages buffer.