PhoenixZMQ  2.0.0
Library which integrates zeromq use in Phoenix
phoenix_zmq.cpp
Go to the documentation of this file.
1 /***************************************
2  Auteur : Pierre Aubert
3  Mail : pierre.aubert@lapp.in2p3.fr
4  Licence : CeCILL-C
5 ****************************************/
6 
7 #include <sstream>
8 #include "phoenix_zmq.h"
9 
10 
12 
18 zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, int type, const std::string & address, size_t port){
19  std::stringstream socketAddressData;
20  socketAddressData << "tcp://" << address <<":" << port;
21  zmq::socket_t *socket = new zmq::socket_t(context, type);
22  socket->connect(socketAddressData.str());
23 
24 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
25  socket->set(zmq::sockopt::linger, 1); //1 ms to stop
26 #else
27  socket->setsockopt(ZMQ_LINGER, 1); //1 ms to stop
28 #endif
29  if(type == ZMQ_SUB){
30 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
31  socket->set(zmq::sockopt::subscribe, "");
32  socket->set(zmq::sockopt::conflate, 1);
33 #else
34  socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
35  int conflate(1);
36  socket->setsockopt(ZMQ_CONFLATE, &conflate, sizeof(int));
37 #endif
38  }
39  return socket;
40 }
41 
43 
48 zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, int type, size_t port){
49  std::stringstream socketAddressData;
50  socketAddressData << "tcp://*:" << port;
51  zmq::socket_t *socket = new zmq::socket_t(context, type);
52  socket->bind(socketAddressData.str());
53  return socket;
54 }
55 
57 
66 zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, size_t port, int type, int nbBufferMessage, int bufferSizeByte,
67  size_t threadAffinity, ssize_t dataRate)
68 {
69  zmq::socket_t* socket = pzmq_createServerSocket(context, type, port);
70  bool b(socket != NULL);
71  if(b){
72  pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
73  pzmq_setThreadAffinity(socket, threadAffinity);
74  }
75  return socket;
76 }
77 
78 
80 
90 zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, const std::string & address, size_t port, int type, int nbBufferMessage,
91  int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
92 {
93  zmq::socket_t* socket = pzmq_createClientSocket(context, type, address, port);
94  bool b(socket != NULL);
95  if(b){
96  pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
97  pzmq_setThreadAffinity(socket, threadAffinity);
98  }
99  return socket;
100 }
101 
103 
105 void pzmq_closeServerSocket(zmq::socket_t *& socket){
106  if(socket != NULL){
107  delete socket;
108  socket = NULL;
109  }
110 }
111 
113 
116 void pzmq_setNbMessageBuffer(zmq::socket_t* socket, int nbBufferMessage){
117  if(socket != NULL){
118 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
119  socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
120 #else
121  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
122  socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage, sizeof(int));
123  socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage, sizeof(int));
124 #endif
125  }
126 }
127 
129 
133 void pzmq_setDataRate(zmq::socket_t* socket, int type, int dataRate){
134  if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
135  int dataRateKbit(dataRate*8l);
136 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137  socket->set(zmq::sockopt::rate, dataRateKbit);
138 #else
139  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
140  socket->setsockopt(ZMQ_RATE, dataRateKbit);
141 #endif
142  }
143 }
144 
146 
149 void pzmq_setRecvBufferSize(zmq::socket_t* socket, int bufferSizeByte){
150  if(socket != NULL){
151 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
152  socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
153 #else
154  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
155  socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
156 #endif
157  }
158 }
159 
161 
164 void pzmq_setSendBufferSize(zmq::socket_t* socket, int bufferSizeByte){
165  if(socket != NULL){
166 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
167  socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
168 #else
169  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
170  socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
171 #endif
172  }
173 }
174 
176 
179 void pzmq_setThreadAffinity(zmq::socket_t* socket, size_t threadAffinity){
180  if(socket != NULL){
181 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
182  socket->set(zmq::sockopt::affinity, threadAffinity);
183 #else
184  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
185  socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
186 #endif
187  }
188 }
189 
191 
197 void pzmq_setBufferSize(zmq::socket_t* socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte){
198  pzmq_setNbMessageBuffer(socket, nbBufferMessage);
199  pzmq_setDataRate(socket, type, dataRate);
200  if(type == ZMQ_PULL || type == ZMQ_SUB){
201  pzmq_setRecvBufferSize(socket, bufferSizeByte);
202  }else if(type == ZMQ_PUSH || type == ZMQ_PUB){
203  pzmq_setSendBufferSize(socket, bufferSizeByte);
204  }
205 }
206 
void pzmq_setSendBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to send messages.
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
Definition: phoenix_zmq.cpp:48
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Definition: phoenix_zmq.cpp:18
void pzmq_closeServerSocket(zmq::socket_t *&socket)
Close the given server socket.
void pzmq_setThreadAffinity(zmq::socket_t *socket, size_t threadAffinity)
Set the thread affinity of zmq.
void pzmq_setRecvBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to recieved messages.
void pzmq_setBufferSize(zmq::socket_t *socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte)
Set the size of the buffer to send messages.
void pzmq_setDataRate(zmq::socket_t *socket, int type, int dataRate)
Set the data rate of the socket.
void pzmq_setNbMessageBuffer(zmq::socket_t *socket, int nbBufferMessage)
Set the number of messages in the messages buffer.