GCC Code Coverage Report


Directory: ./
File: src/PZmqBackend.cpp
Date: 2026-04-15 12:13:56
Exec Total Coverage
Lines: 91 119 76.5%
Functions: 19 22 86.4%
Branches: 31 73 42.5%

Line Branch Exec Source
1 /***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5 ****************************************/
6
7 #include "PZmqBackend.h"
8
9 ///Check the send result and convert it into PSendStatus
10 /**
11 * @param res : result of the zmq send
12 * @return corresponding PSendStatus
13 *
14 * ZMQ socket send has a bit of a strange implementation:
15 * @li if sending works: the send_result_t has a value
16 * @li if sending doesn't work due to EAGAIN error: the send_result_t is empty (no value) and error should be EAGAIN
17 * @li if sending doesn't work for another reason: an exception is thrown
18 * This function only checks the behaviour due to the send result, not the thrown error.
19 */
20 195 PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res){
21
2/2
✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 175 times.
195 if(res.has_value()){
22 20 return PSendStatus::OK;
23 }else{
24 175 int err = zmq_errno();
25
1/2
✓ Branch 0 (6→7) taken 175 times.
✗ Branch 1 (6→8) not taken.
175 if(err == EAGAIN){
26 175 return PSendStatus::SOCKET_NOT_AVAILABLE;
27 }else{
28 std::cerr << "Unknown ZMQ error in send: '" << err << "'"<< std::endl;
29 return PSendStatus::BROKEN_BACKEND;
30 }
31 }
32 }
33
34 ///Check the recv result and convert it into PRecvStatus
35 /** @param res : result of the zmq recv
36 * @return corresponding PRecvStatus
37 * ZMQ socket recv has a bit of a strange implementation:
38 * @li if recv works: the recv_result_t has a value
39 * @li if recv doesn't work due to EAGAIN error: the recv_result_t is empty (no value) and error should be EAGAIN
40 * @li if recv doesn't work for another reason: an exception is thrown
41 * This function only checks the behaviour due to the recv result, not the thrown error.
42 */
43 195 PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res){
44
2/2
✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 175 times.
195 if(res.has_value()){
45 20 return PRecvStatus::OK;
46 }else{
47 175 int err = zmq_errno();
48
1/2
✓ Branch 0 (6→7) taken 175 times.
✗ Branch 1 (6→8) not taken.
175 if(err == EAGAIN){
49 175 return PRecvStatus::NO_MESSAGE_RECEIVED;
50 }else{
51 return PRecvStatus::BROKEN_SOCKET;
52 }
53 }
54 }
55
56 ///Convert a send flag into zmq flag
57 /** @param flag : generic PSendFlag
58 * @return corresponding zmq::send_flags
59 */
60 196 zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){
61
2/2
✓ Branch 0 (2→3) taken 186 times.
✓ Branch 1 (2→4) taken 10 times.
196 if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;}
62 10 else{return zmq::send_flags::none;}
63 }
64
65 ///Convert a recv flag into zmq flag
66 /** @param flag : generic PRecvFlag
67 * @return corresponding zmq::recv_flags
68 */
69 197 zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){
70
2/2
✓ Branch 0 (2→3) taken 187 times.
✓ Branch 1 (2→4) taken 10 times.
197 if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;}
71 10 else{return zmq::recv_flags::none;}
72 }
73
74 ///Create param for a client socket
75 /** @param context : zmq context where to create socket
76 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
77 * @param nbBufferMessage : number of messages to be buffered
78 * @param bufferSizeByte : size of the zmq buffer in bytes
79 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
80 * @param dataRate : expected data rate (in kilobytes per second)
81 * @param linger : linger period for socket shutdown
82 * @return corresponding PZmqParam
83 */
84 6 PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger){
85 PZmqParam param;
86 6 param.type = type;
87 6 param.nbBufferMessage = nbBufferMessage;
88 6 param.bufferSizeByte = bufferSizeByte;
89 6 param.threadAffinity = threadAffinity;
90 6 param.dataRate = dataRate;
91 6 param.linger = linger;
92 6 return param;
93 }
94
95 ///Create param for a client socket
96 /** @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
97 * @param nbBufferMessage : number of messages to be buffered
98 * @param bufferSizeByte : size of the zmq buffer in bytes
99 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
100 * @param dataRate : expected data rate (in kilobytes per second)
101 * @param linger : linger period for socket shutdown
102 * @return corresponding PZmqParam
103 */
104 7 PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger){
105 PZmqParam param;
106 7 param.type = type;
107 7 param.nbBufferMessage = nbBufferMessage;
108 7 param.bufferSizeByte = bufferSizeByte;
109 7 param.threadAffinity = threadAffinity;
110 7 param.dataRate = dataRate;
111 7 param.linger = linger;
112 7 return param;
113 }
114
115 ///Default constructor of PZmqSocket
116 13 PZmqSocket::PZmqSocket()
117 13 :p_socket(NULL)
118 {
119
120 13 }
121
122 ///Destructor of PZmqSocket
123 13 PZmqSocket::~PZmqSocket(){
124 13 close();
125 13 }
126
127 ///Create a client socket
128 /** @param context : zmq context where to create socket
129 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
130 * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend)
131 * @return true if the socket has been created, false otherwise
132 */
133 6 bool PZmqSocket::createClientSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
134 12 p_socket = pzmq_createClientSocket(context, socketParam.hostname, socketParam.port, extraParam.type, extraParam.nbBufferMessage,
135 6 extraParam.bufferSizeByte, extraParam.threadAffinity, extraParam.dataRate);
136 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137 6 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
138 6 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
139 6 p_socket->set(zmq::sockopt::linger, extraParam.linger); //number of ms to stop
140 #else
141 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
142 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
143 p_socket->setsockopt(ZMQ_LINGER, &extraParam.linger, sizeof(int)); //number of ms to stop
144 #endif
145 6 return p_socket != NULL;
146 }
147
148 ///Create a server socket
149 /** @param context : zmq context where to create socket
150 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
151 * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend)
152 * @return true if the socket has been created, false otherwise
153 */
154 7 bool PZmqSocket::createServerSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
155 14 p_socket = pzmq_createServerSocket(context, socketParam.port, extraParam.type, extraParam.nbBufferMessage, extraParam.bufferSizeByte,
156 7 extraParam.threadAffinity, extraParam.dataRate);
157 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
158 7 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
159 7 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
160 7 p_socket->set(zmq::sockopt::linger, extraParam.linger); //number of ms to stop
161 #else
162 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
163 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
164 p_socket->setsockopt(ZMQ_LINGER, &extraParam.linger, sizeof(int)); //number of ms to stop
165 #endif
166 7 return p_socket != NULL;
167 }
168
169 ///Send data with the socket
170 /** @param msg : message to be sent with the socket
171 * @param flag : sending flag (BLOCK, NON_BLOCK)
172 * @return status of the send
173 */
174 196 PSendStatus::PSendStatus PZmqSocket::sendMsg(Message & msg, PSendFlag::PSendFlag flag){
175 try {
176
2/2
✓ Branch 0 (2→3) taken 196 times.
✓ Branch 2 (3→4) taken 195 times.
196 zmq::send_result_t result = p_socket->send(msg, convertToSendFlag(flag));
177
1/1
✓ Branch 0 (4→5) taken 195 times.
195 return checkSendStatus(result);
178
1/2
✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 1 times.
1 }catch(const zmq::error_t& e) {
179
1/2
✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→15) taken 1 times.
1 if(e.num() == ENOTSOCK){
180 return PSendStatus::BROKEN_SOCKET;
181
2/10
✗ Branch 0 (16→17) not taken.
✓ Branch 1 (16→23) taken 1 times.
✗ Branch 2 (18→19) not taken.
✗ Branch 3 (18→23) not taken.
✗ Branch 4 (20→21) not taken.
✗ Branch 5 (20→23) not taken.
✗ Branch 6 (22→23) not taken.
✗ Branch 7 (22→24) not taken.
✓ Branch 8 (25→26) taken 1 times.
✗ Branch 9 (25→27) not taken.
1 }else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
182 1 return PSendStatus::BROKEN_BACKEND;
183 }else if(e.num() == EINTR){
184 return PSendStatus::SIGNAL_INTERRUPTION;
185 }else if(e.num() == EHOSTUNREACH){
186 return PSendStatus::NO_ROUTE_TO_RECEIVER;
187 }else{
188 std::cerr << "Unknown ZMQ error in send: " << e.what() << std::endl;
189 return PSendStatus::BROKEN_BACKEND;
190 }
191 1 }
192 }
193
194 ///Receive data with the socket
195 /** @param msg : message to be received with the socket
196 * @param flag : receiving flag (BLOCK, NON_BLOCK)
197 * @return status of the recv
198 */
199 197 PRecvStatus::PRecvStatus PZmqSocket::recvMsg(Message & msg, PRecvFlag::PRecvFlag flag){
200 try {
201
2/2
✓ Branch 0 (2→3) taken 197 times.
✓ Branch 2 (3→4) taken 195 times.
197 zmq::recv_result_t result = p_socket->recv(msg, convertToRecvFlag(flag));
202
1/1
✓ Branch 0 (4→5) taken 195 times.
195 return checkRecvStatus(result);
203
1/2
✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 2 times.
2 }catch(const zmq::error_t& e){
204
1/2
✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→19) taken 2 times.
2 if(e.num() == ENOTSOCK){
205 std::cerr << "ZMQ error in recv: " << e.what() << std::endl;
206 return PRecvStatus::BROKEN_SOCKET;
207 }
208
2/10
✗ Branch 0 (20→21) not taken.
✓ Branch 1 (20→27) taken 2 times.
✗ Branch 2 (22→23) not taken.
✗ Branch 3 (22→27) not taken.
✗ Branch 4 (24→25) not taken.
✗ Branch 5 (24→27) not taken.
✗ Branch 6 (26→27) not taken.
✗ Branch 7 (26→28) not taken.
✓ Branch 8 (29→30) taken 2 times.
✗ Branch 9 (29→35) not taken.
2 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
209
3/3
✓ Branch 0 (30→31) taken 2 times.
✓ Branch 2 (32→33) taken 2 times.
✓ Branch 4 (33→34) taken 2 times.
2 std::cerr << "ZMQ error in recv: " << e.what() << std::endl;
210 2 return PRecvStatus::BROKEN_BACKEND;
211 }
212 else if(e.num() == EINTR){
213 std::cerr << "ZMQ error in recv: " << e.what() << std::endl;
214 return PRecvStatus::SIGNAL_INTERRUPTION;
215 }
216 else{
217 std::cerr << "Unknown ZMQ error in recv: " << e.what() << std::endl;
218 return PRecvStatus::BROKEN_BACKEND;
219 }
220 2 }
221 }
222
223 ///Check if the socket is connected
224 /** @return true if the socket is connected, false otherwise
225 */
226 1 bool PZmqSocket::isConnected() const{
227
1/2
✓ Branch 0 (2→3) taken 1 times.
✗ Branch 1 (2→5) not taken.
1 if(p_socket != NULL){
228 1 return p_socket->handle() != NULL;
229 }
230 return false;
231 }
232
233 ///Close the socket
234 38 void PZmqSocket::close(){
235
2/2
✓ Branch 0 (2→3) taken 13 times.
✓ Branch 1 (2→8) taken 25 times.
38 if(p_socket != NULL){
236 13 p_socket->close();
237
1/2
✓ Branch 0 (4→5) taken 13 times.
✗ Branch 1 (4→7) not taken.
13 delete p_socket;
238 13 p_socket = NULL;
239 }
240 38 }
241
242 ///Default constructor of PZmqSocketGenerator setting the number of threads for zmq I/O to 1
243 11 PZmqSocketGenerator::PZmqSocketGenerator()
244 11 :p_context(1)
245 {
246
247 11 }
248
249 ///Create a client parameter
250 /** @return corresponding PZmqSocketGenerator::Param (or PZmqParam)
251 */
252 6 PZmqSocketGenerator::Param PZmqSocketGenerator::client(){
253 6 return pzmq_createParamClient(ZMQ_PULL);
254 }
255
256 ///Create a server parameter
257 /** @return corresponding PZmqSocketGenerator::Param (or PZmqParam)
258 */
259 7 PZmqSocketGenerator::Param PZmqSocketGenerator::server(){
260 7 return pzmq_createParamServer(ZMQ_PUSH);
261 }
262
263 ///Create a client socket
264 /** @param[out] socket : socket to be created
265 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
266 * @param port : port to be used for the connection
267 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
268 * @return true if the socket has been created, false otherwise
269 */
270 6 bool PZmqSocketGenerator::createClientSocket(PZmqSocketGenerator::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
271 6 return socket.createClientSocket(p_context, socketParam, param);
272 }
273
274 ///Create a server socket
275 /** @param[out] socket : socket to be created
276 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
277 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
278 * @return true if the socket has been created, false otherwise
279 */
280 7 bool PZmqSocketGenerator::createServerSocket(PZmqSocketGenerator::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
281 7 return socket.createServerSocket(p_context, socketParam, param);
282 }
283
284 ///Copy current backend message data into mock message
285 /** @param[out] mockMsg : mock message
286 * @param msg : message of the current backend to be converted
287 */
288 void PZmqSocketGenerator::msgToMock(DataStreamMsg & mockMsg, const PZmqSocketGenerator::Message & msg){
289 size_t dataSize(msg.size());
290 mockMsg.resize(dataSize);
291 memcpy(mockMsg.data(), (const void*)msg.data(), dataSize);
292 }
293
294 ///Copy mock message data into current backend message
295 /** @param[out] msg : message of the current backend to be converted
296 * @param mockMsg : mock message
297 */
298 void PZmqSocketGenerator::mockToMsg(PZmqSocketGenerator::Message & msg, DataStreamMsg & mockMsg){
299 size_t dataSize(mockMsg.size());
300 msg.rebuild(dataSize);
301 memcpy((void*)msg.data(), mockMsg.data(), dataSize);
302 }
303