19 std::stringstream socketAddressData;
20 socketAddressData <<
"tcp://" << address <<
":" << port;
21 zmq::socket_t *socket =
new zmq::socket_t(context, type);
22 socket->connect(socketAddressData.str());
24 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
25 socket->set(zmq::sockopt::linger, 1);
27 socket->setsockopt(ZMQ_LINGER, 1);
30 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
31 socket->set(zmq::sockopt::subscribe,
"");
32 socket->set(zmq::sockopt::conflate, 1);
34 socket->setsockopt(ZMQ_SUBSCRIBE,
"", 0);
36 socket->setsockopt(ZMQ_CONFLATE, &conflate,
sizeof(
int));
49 std::stringstream socketAddressData;
50 socketAddressData <<
"tcp://*:" << port;
51 zmq::socket_t *socket =
new zmq::socket_t(context, type);
52 socket->bind(socketAddressData.str());
66 zmq::socket_t*
pzmq_createServerSocket(zmq::context_t & context,
size_t port,
int type,
int nbBufferMessage,
int bufferSizeByte,
67 size_t threadAffinity, ssize_t dataRate)
70 bool b(socket != NULL);
90 zmq::socket_t*
pzmq_createClientSocket(zmq::context_t & context,
const std::string & address,
size_t port,
int type,
int nbBufferMessage,
91 int bufferSizeByte,
size_t threadAffinity, ssize_t dataRate)
94 bool b(socket != NULL);
118 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
119 socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
122 socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage,
sizeof(
int));
123 socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage,
sizeof(
int));
134 if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
135 int dataRateKbit(dataRate*8l);
136 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137 socket->set(zmq::sockopt::rate, dataRateKbit);
140 socket->setsockopt(ZMQ_RATE, dataRateKbit);
151 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
152 socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
155 socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
166 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
167 socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
170 socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
181 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
182 socket->set(zmq::sockopt::affinity, threadAffinity);
185 socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
197 void pzmq_setBufferSize(zmq::socket_t* socket,
int type,
int nbBufferMessage,
int dataRate,
size_t bufferSizeByte){
200 if(type == ZMQ_PULL || type == ZMQ_SUB){
202 }
else if(type == ZMQ_PUSH || type == ZMQ_PUB){
void pzmq_setSendBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to send messages.
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
void pzmq_closeServerSocket(zmq::socket_t *&socket)
Close the given server socket.
void pzmq_setThreadAffinity(zmq::socket_t *socket, size_t threadAffinity)
Set the thread affinity of zmq.
void pzmq_setRecvBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to recieved messages.
void pzmq_setBufferSize(zmq::socket_t *socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte)
Set the size of the buffer to send messages.
void pzmq_setDataRate(zmq::socket_t *socket, int type, int dataRate)
Set the data rate of the socket.
void pzmq_setNbMessageBuffer(zmq::socket_t *socket, int nbBufferMessage)
Set the number of messages in the messages buffer.