PhoenixZMQ  6.0.0
Library which integrates zeromq use
Loading...
Searching...
No Matches
phoenix_zmq.cpp
Go to the documentation of this file.
1/***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5****************************************/
6
7#include <sstream>
8#include "phoenix_zmq.h"
9
10
12
18zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, int type, const std::string & address, size_t port){
19 std::stringstream socketAddressData;
20 socketAddressData << "tcp://" << address <<":" << port;
21 zmq::socket_t *socket = new zmq::socket_t(context, type);
22 if(socket == NULL){return NULL;}
23 socket->connect(socketAddressData.str());
24
25#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
26 socket->set(zmq::sockopt::linger, -1); //1 ms to stop
27#else
28 socket->setsockopt(ZMQ_LINGER, -1); //1 ms to stop
29#endif
30 if(type == ZMQ_SUB){
31#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
32 socket->set(zmq::sockopt::subscribe, "");
33 socket->set(zmq::sockopt::conflate, 1);
34#else
35 socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
36 int conflate(1);
37 socket->setsockopt(ZMQ_CONFLATE, &conflate, sizeof(int));
38#endif
39 }
40 return socket;
41}
42
44
49zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, int type, size_t port){
50 std::stringstream socketAddressData;
51 socketAddressData << "tcp://127.0.0.1:" << port;
52 zmq::socket_t *socket = new zmq::socket_t(context, type);
53 if(socket != NULL){
54 socket->bind(socketAddressData.str());
55#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
56 socket->set(zmq::sockopt::linger, -1); //1 ms to stop
57#else
58 socket->setsockopt(ZMQ_LINGER, -1); //1 ms to stop
59#endif
60 }
61 return socket;
62}
63
65
74zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, size_t port, int type, int nbBufferMessage, int bufferSizeByte,
75 size_t threadAffinity, ssize_t dataRate)
76{
77 zmq::socket_t* socket = pzmq_createServerSocket(context, type, port);
78 bool b(socket != NULL);
79 if(b){
80 pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
81 pzmq_setThreadAffinity(socket, threadAffinity);
82 }
83 return socket;
84}
85
86
88
98zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, const std::string & address, size_t port, int type, int nbBufferMessage,
99 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
100{
101 zmq::socket_t* socket = pzmq_createClientSocket(context, type, address, port);
102 bool b(socket != NULL);
103 if(b){
104 pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
105 pzmq_setThreadAffinity(socket, threadAffinity);
106 }
107 return socket;
108}
109
110
112
114void pzmq_closeServerSocket(zmq::socket_t *& socket){
115 if(socket != NULL){
116 socket->close();
117 delete socket;
118 socket = NULL;
119 }
120}
121
123
126void pzmq_setNbMessageBuffer(zmq::socket_t* socket, int nbBufferMessage){
127 if(socket != NULL){
128#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
129 socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
130#else
131 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
132 socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage, sizeof(int));
133 socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage, sizeof(int));
134#endif
135 }
136}
137
139
143void pzmq_setDataRate(zmq::socket_t* socket, int type, int dataRate){
144 if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
145 int dataRateKbit(dataRate*8l);
146#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
147 socket->set(zmq::sockopt::rate, dataRateKbit);
148#else
149 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
150 socket->setsockopt(ZMQ_RATE, dataRateKbit);
151#endif
152 }
153}
154
156
159void pzmq_setRecvBufferSize(zmq::socket_t* socket, int bufferSizeByte){
160 if(socket != NULL){
161#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
162 socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
163#else
164 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
165 socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
166#endif
167 }
168}
169
171
174void pzmq_setSendBufferSize(zmq::socket_t* socket, int bufferSizeByte){
175 if(socket != NULL){
176#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
177 socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
178#else
179 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
180 socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
181#endif
182 }
183}
184
186
189void pzmq_setThreadAffinity(zmq::socket_t* socket, size_t threadAffinity){
190 if(socket != NULL){
191#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
192 socket->set(zmq::sockopt::affinity, threadAffinity);
193#else
194 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
195 socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
196#endif
197 }
198}
199
201
207void pzmq_setBufferSize(zmq::socket_t* socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte){
208 pzmq_setNbMessageBuffer(socket, nbBufferMessage);
209 pzmq_setDataRate(socket, type, dataRate);
210 if(type == ZMQ_PULL || type == ZMQ_SUB){
211 pzmq_setRecvBufferSize(socket, bufferSizeByte);
212 }else if(type == ZMQ_PUSH || type == ZMQ_PUB){
213 pzmq_setSendBufferSize(socket, bufferSizeByte);
214 }
215}
216
void pzmq_setSendBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to send messages.
void pzmq_closeServerSocket(zmq::socket_t *&socket)
Close the given server socket.
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
void pzmq_setThreadAffinity(zmq::socket_t *socket, size_t threadAffinity)
Set the thread affinity of zmq.
void pzmq_setRecvBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to received messages.
void pzmq_setBufferSize(zmq::socket_t *socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte)
Set the size of the buffer to send messages.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
void pzmq_setDataRate(zmq::socket_t *socket, int type, int dataRate)
Set the data rate of the socket.
void pzmq_setNbMessageBuffer(zmq::socket_t *socket, int nbBufferMessage)
Set the number of messages in the messages buffer.