Line |
Branch |
Exec |
Source |
1 |
|
|
/*************************************** |
2 |
|
|
Auteur : Pierre Aubert |
3 |
|
|
Mail : pierre.aubert@lapp.in2p3.fr |
4 |
|
|
Licence : CeCILL-C |
5 |
|
|
****************************************/ |
6 |
|
|
|
7 |
|
|
#include "PZmqBackend.h" |
8 |
|
|
|
9 |
|
|
///Create param for a client socket |
10 |
|
|
/** @param address : address of the server to be connected to |
11 |
|
|
* @param port : port to be used |
12 |
|
|
* @param context : zmq context where to create socket |
13 |
|
|
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) |
14 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
15 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
16 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
17 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
18 |
|
|
* @return corresponding PZmqParam |
19 |
|
|
*/ |
20 |
|
1 |
PZmqParam pzmq_createParamClient(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, |
21 |
|
|
int bufferSizeByte, size_t threadAffinity, ssize_t dataRate) |
22 |
|
|
{ |
23 |
|
1 |
PZmqParam param; |
24 |
1/1
✓ Branch 1 taken 1 times.
|
1 |
param.address = address; |
25 |
|
1 |
param.port = port; |
26 |
|
1 |
param.context = context; |
27 |
|
1 |
param.type = type; |
28 |
|
1 |
param.nbBufferMessage = nbBufferMessage; |
29 |
|
1 |
param.bufferSizeByte = bufferSizeByte; |
30 |
|
1 |
param.threadAffinity = threadAffinity; |
31 |
|
1 |
param.dataRate = dataRate; |
32 |
|
1 |
return param; |
33 |
|
|
} |
34 |
|
|
|
35 |
|
|
///Create param for a client socket |
36 |
|
|
/** @param address : address of the server to be connected to |
37 |
|
|
* @param port : port to be used |
38 |
|
|
* @param context : zmq context where to create socket |
39 |
|
|
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) |
40 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
41 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
42 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
43 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
44 |
|
|
* @return corresponding PZmqParam |
45 |
|
|
*/ |
46 |
|
1 |
PZmqParam pzmq_createParamServer(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, |
47 |
|
|
int bufferSizeByte, size_t threadAffinity, ssize_t dataRate) |
48 |
|
|
{ |
49 |
|
1 |
PZmqParam param; |
50 |
1/1
✓ Branch 1 taken 1 times.
|
1 |
param.address = address; |
51 |
|
1 |
param.port = port; |
52 |
|
1 |
param.context = context; |
53 |
|
1 |
param.type = type; |
54 |
|
1 |
param.nbBufferMessage = nbBufferMessage; |
55 |
|
1 |
param.bufferSizeByte = bufferSizeByte; |
56 |
|
1 |
param.threadAffinity = threadAffinity; |
57 |
|
1 |
param.dataRate = dataRate; |
58 |
|
1 |
return param; |
59 |
|
|
} |
60 |
|
|
|
61 |
|
|
///Default constructor of PZmqBackend |
62 |
|
✗ |
PZmqBackend::PZmqBackend(){ |
63 |
|
|
|
64 |
|
|
} |
65 |
|
|
|
66 |
|
|
///Create a client parameter |
67 |
|
|
/** @param address : address to be connected to |
68 |
|
|
* @param port : port to be used |
69 |
|
|
* @param context : zmq context where to create socket |
70 |
|
|
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) |
71 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
72 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
73 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
74 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
75 |
|
|
* @return corresponding PZmqBackend::Param (or PZmqParam) |
76 |
|
|
*/ |
77 |
|
1 |
PZmqBackend::Param PZmqBackend::client(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate){ |
78 |
|
1 |
return pzmq_createParamClient(address, port, context, type, nbBufferMessage, bufferSizeByte, threadAffinity, dataRate); |
79 |
|
|
} |
80 |
|
|
|
81 |
|
|
///Create a server parameter |
82 |
|
|
/** @param address : address to be connected to |
83 |
|
|
* @param port : port to be used |
84 |
|
|
* @param context : zmq context where to create socket |
85 |
|
|
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) |
86 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
87 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
88 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
89 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
90 |
|
|
* @return corresponding PZmqBackend::Param (or PZmqParam) |
91 |
|
|
*/ |
92 |
|
1 |
PZmqBackend::Param PZmqBackend::server(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate){ |
93 |
|
1 |
return pzmq_createParamServer(address, port, context, type, nbBufferMessage, bufferSizeByte, threadAffinity, dataRate); |
94 |
|
|
} |
95 |
|
|
|
96 |
|
|
///Create a client socket |
97 |
|
|
/** @param[out] socket : socket to be created |
98 |
|
|
* @param address : address of the server, the client has to connect to |
99 |
|
|
* @param port : port to be used for the connection |
100 |
|
|
* @param param : extra customisable parameters for the creation of the socket (depends on the backend) |
101 |
|
|
* @return true if the socket has been created, false otherwise |
102 |
|
|
*/ |
103 |
|
✗ |
bool PZmqBackend::createClientSocket(PZmqBackend::Socket & socket, const std::string & address, size_t port, const PZmqParam & param){ |
104 |
|
✗ |
socket = pzmq_createClientSocket(*param.context, address, port, param.type, param.nbBufferMessage, |
105 |
|
✗ |
param.bufferSizeByte, param.threadAffinity, param.dataRate); |
106 |
|
✗ |
return socket != NULL; |
107 |
|
|
} |
108 |
|
|
|
109 |
|
|
///Create a server socket |
110 |
|
|
/** @param[out] socket : socket to be created |
111 |
|
|
* @param address : address of the server, the client has to connect to |
112 |
|
|
* @param port : port to be used for the connection |
113 |
|
|
* @param param : extra customisable parameters for the creation of the socket (depends on the backend) |
114 |
|
|
* @return true if the socket has been created, false otherwise |
115 |
|
|
*/ |
116 |
|
✗ |
bool PZmqBackend::createServerSocket(PZmqBackend::Socket & socket, const std::string & address, size_t port, const PZmqParam & param){ |
117 |
|
✗ |
socket = pzmq_createServerSocket(*param.context, port, param.type, param.nbBufferMessage, param.bufferSizeByte, |
118 |
|
✗ |
param.threadAffinity, param.dataRate); |
119 |
|
✗ |
return socket != NULL; |
120 |
|
|
} |
121 |
|
|
|
122 |
|
|
///Create a client socket |
123 |
|
|
/** @param[out] socket : socket to be created |
124 |
|
|
* @param param : extra customisable parameters for the creation of the socket (depends on the backend) |
125 |
|
|
* @return true if the socket has been created, false otherwise |
126 |
|
|
*/ |
127 |
|
✗ |
bool PZmqBackend::createClientSocket(Socket & socket, const PZmqBackend::Param & param){ |
128 |
|
✗ |
socket = pzmq_createClientSocket(*param.context, param.address, param.port, param.type, param.nbBufferMessage, |
129 |
|
✗ |
param.bufferSizeByte, param.threadAffinity, param.dataRate); |
130 |
|
✗ |
return socket != NULL; |
131 |
|
|
} |
132 |
|
|
|
133 |
|
|
///Create a server socket |
134 |
|
|
/** @param[out] socket : socket to be created |
135 |
|
|
* @param param : extra customisable parameters for the creation of the socket (depends on the backend) |
136 |
|
|
* @return true if the socket has been created, false otherwise |
137 |
|
|
*/ |
138 |
|
✗ |
bool PZmqBackend::createServerSocket(Socket & socket, const PZmqBackend::Param & param){ |
139 |
|
✗ |
socket = pzmq_createServerSocket(*param.context, param.port, param.type, param.nbBufferMessage, param.bufferSizeByte, |
140 |
|
✗ |
param.threadAffinity, param.dataRate); |
141 |
|
✗ |
return socket != NULL; |
142 |
|
|
} |
143 |
|
|
|
144 |
|
|
///Convert a send flag into zmq flag |
145 |
|
|
/** @param flag : generic PSendFlag |
146 |
|
|
* @return corresponding zmq::send_flags |
147 |
|
|
*/ |
148 |
|
✗ |
zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){ |
149 |
|
✗ |
if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;} |
150 |
|
✗ |
else{return zmq::send_flags::none;} |
151 |
|
|
} |
152 |
|
|
|
153 |
|
|
///Send message on the given socket |
154 |
|
|
/** @param socket : socket to be used |
155 |
|
|
* @param msg : message to be sent |
156 |
|
|
* @param flag : flag to be used to send the message (BLOCK, NON_BLOCK, etc) |
157 |
|
|
* @return true on success, false otherwise |
158 |
|
|
*/ |
159 |
|
✗ |
bool PZmqBackend::send(PZmqBackend::Socket & socket, PZmqBackend::Message & msg, PSendFlag::PSendFlag flag){ |
160 |
|
✗ |
if(socket != NULL){ |
161 |
|
✗ |
zmq::send_result_t res = socket->send(msg, convertToSendFlag(flag)); |
162 |
|
✗ |
return res.has_value(); //Seems, if there is no value, there is also no error |
163 |
|
|
}else{ |
164 |
|
✗ |
return false; |
165 |
|
|
} |
166 |
|
|
} |
167 |
|
|
|
168 |
|
|
///Convert a recv flag into zmq flag |
169 |
|
|
/** @param flag : generic PRecvFlag |
170 |
|
|
* @return corresponding zmq::recv_flags |
171 |
|
|
*/ |
172 |
|
✗ |
zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){ |
173 |
|
✗ |
if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;} |
174 |
|
✗ |
else{return zmq::recv_flags::none;} |
175 |
|
|
} |
176 |
|
|
|
177 |
|
|
///Recieve message from the given socket |
178 |
|
|
/** @param socket : socket to be used |
179 |
|
|
* @param msg : message to be recieved |
180 |
|
|
* @param flag : flags to be used to send the message (BLOCK, NON_BLOCK, etc) |
181 |
|
|
* @return true on success, false otherwise |
182 |
|
|
*/ |
183 |
|
✗ |
bool PZmqBackend::recv(PZmqBackend::Socket & socket, PZmqBackend::Message & msg, PRecvFlag::PRecvFlag flag){ |
184 |
|
✗ |
if(socket != NULL){ |
185 |
|
✗ |
zmq::recv_result_t res = socket->recv(msg, convertToRecvFlag(flag)); |
186 |
|
✗ |
return res.has_value(); //Seems, if there is no value, there is also no error |
187 |
|
|
}else{ |
188 |
|
✗ |
return false; |
189 |
|
|
} |
190 |
|
|
} |
191 |
|
|
|
192 |
|
|
///Resize a message |
193 |
|
|
/** @param[out] msg : message to be resized |
194 |
|
|
* @param sizeMsg : new size of the message |
195 |
|
|
*/ |
196 |
|
✗ |
void PZmqBackend::msgResize(PZmqBackend::Message & msg, size_t sizeMsg){ |
197 |
|
✗ |
msg.rebuild(sizeMsg); |
198 |
|
|
} |
199 |
|
|
|
200 |
|
|
///Get the size of a message |
201 |
|
|
/** @param msg : message to be used |
202 |
|
|
* @return size of the message in bytes |
203 |
|
|
*/ |
204 |
|
✗ |
size_t PZmqBackend::msgSize(const PZmqBackend::Message & msg){ |
205 |
|
✗ |
return msg.size(); |
206 |
|
|
} |
207 |
|
|
|
208 |
|
|
///Get the data of a message |
209 |
|
|
/** @param msg : message to be used |
210 |
|
|
* @return data of the message in bytes |
211 |
|
|
*/ |
212 |
|
✗ |
const DataStreamIter PZmqBackend::msgData(const PZmqBackend::Message & msg){ |
213 |
|
✗ |
return (const DataStreamIter)msg.data(); |
214 |
|
|
} |
215 |
|
|
|
216 |
|
|
///Get the data of a message |
217 |
|
|
/** @param msg : message to be used |
218 |
|
|
* @return data of the message in bytes |
219 |
|
|
*/ |
220 |
|
✗ |
DataStreamIter PZmqBackend::msgData(PZmqBackend::Message & msg){ |
221 |
|
✗ |
return (DataStreamIter)msg.data(); |
222 |
|
|
} |
223 |
|
|
|
224 |
|
|
///Close the given socket |
225 |
|
|
/** @param[out] socket : socket to be closed |
226 |
|
|
*/ |
227 |
|
✗ |
void PZmqBackend::close(PZmqBackend::Socket & socket){ |
228 |
|
✗ |
pzmq_closeServerSocket(socket); |
229 |
|
|
} |
230 |
|
|
|
231 |
|
|
///Close the given socket |
232 |
|
|
/** @param socket : socket to be checked |
233 |
|
|
* @return true if the socket is connected, false otherwise |
234 |
|
|
*/ |
235 |
|
✗ |
bool PZmqBackend::isConnected(const PZmqBackend::Socket & socket){ |
236 |
|
✗ |
if(socket != NULL){ |
237 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
238 |
|
✗ |
return socket->handle() != NULL; |
239 |
|
|
#else |
240 |
|
|
return socket->connected(); |
241 |
|
|
#endif |
242 |
|
|
}else{ |
243 |
|
✗ |
return false; |
244 |
|
|
} |
245 |
|
|
} |
246 |
|
|
|
247 |
|
|
///Copy current backend message data into mock message |
248 |
|
|
/** @param[out] mockMsg : mock message |
249 |
|
|
* @param msg : message of the current backend to be converted |
250 |
|
|
*/ |
251 |
|
✗ |
void PZmqBackend::msgToMock(DataStreamMsg & mockMsg, const PZmqBackend::Message & msg){ |
252 |
|
✗ |
size_t dataSize(PZmqBackend::msgSize(msg)); |
253 |
|
✗ |
mockMsg.resize(dataSize); |
254 |
|
✗ |
memcpy(mockMsg.data(), PZmqBackend::msgData(msg), dataSize); |
255 |
|
|
} |
256 |
|
|
|
257 |
|
|
///Copy mock message data into current backend message |
258 |
|
|
/** @param[out] msg : message of the current backend to be converted |
259 |
|
|
* @param mockMsg : mock message |
260 |
|
|
*/ |
261 |
|
✗ |
void PZmqBackend::mockToMsg(PZmqBackend::Message & msg, DataStreamMsg & mockMsg){ |
262 |
|
✗ |
size_t dataSize(mockMsg.size()); |
263 |
|
✗ |
PZmqBackend::msgResize(msg, dataSize); |
264 |
|
✗ |
memcpy(PZmqBackend::msgData(msg), mockMsg.data(), dataSize); |
265 |
|
|
} |
266 |
|
|
|
267 |
|
|
|
268 |
|
|
|
269 |
|
|
|
270 |
|
|
|
271 |
|
|
|