GCC Code Coverage Report


Directory: ./
File: src/PZmqBackend.cpp
Date: 2026-01-23 17:10:06
Exec Total Coverage
Lines: 86 112 76.8%
Functions: 19 22 86.4%
Branches: 28 64 43.8%

Line Branch Exec Source
1 /***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5 ****************************************/
6
7 #include "PZmqBackend.h"
8
9 ///Check the send result and convert it into PSendStatus
10 /**
11 * @param res : result of the zmq send
12 * @return corresponding PSendStatus
13 *
14 * ZMQ socket send has a bit of a strange implementation:
15 * @li if sending works: the send_result_t has a value
16 * @li if sending doesn't work due to EAGAIN error: the send_result_t is empty (no value) and error should be EAGAIN
17 * @li if sending doesn't work for another reason: an exception is thrown
18 * This function only checks the behaviour due to the send result, not the thrown error.
19 */
20 193 PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res){
21
2/2
✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 173 times.
193 if(res.has_value()){
22 20 return PSendStatus::OK;
23 }
24 else{
25 173 int err = zmq_errno();
26
1/2
✓ Branch 0 (6→7) taken 173 times.
✗ Branch 1 (6→8) not taken.
173 if(err == EAGAIN){
27 173 return PSendStatus::SOCKET_NOT_AVAILABLE;
28 }
29 else{
30 std::cerr << "Unknown ZMQ error in send: '" << err << "'"<< std::endl;
31 return PSendStatus::BROKEN_BACKEND;
32 }
33 }
34 }
35
36 ///Check the recv result and convert it into PRecvStatus
37 /** @param res : result of the zmq recv
38 * @return corresponding PRecvStatus
39 *
40 * ZMQ socket recv has a bit of a strange implementation:
41 * @li if recv works: the recv_result_t has a value
42 * @li if recv doesn't work due to EAGAIN error: the recv_result_t is empty (no value) and error should be EAGAIN
43 * @li if recv doesn't work for another reason: an exception is thrown
44 * This function only checks the behaviour due to the recv result, not the thrown error.
45 */
46 195 PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res){
47
2/2
✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 175 times.
195 if(res.has_value()){
48 20 return PRecvStatus::OK;
49 }
50 else{
51 175 int err = zmq_errno();
52
1/2
✓ Branch 0 (6→7) taken 175 times.
✗ Branch 1 (6→8) not taken.
175 if(err == EAGAIN){
53 175 return PRecvStatus::NO_MESSAGE_RECEIVED;
54 }
55 else{
56 return PRecvStatus::BROKEN_SOCKET;
57 }
58 }
59 }
60
61 ///Convert a send flag into zmq flag
62 /** @param flag : generic PSendFlag
63 * @return corresponding zmq::send_flags
64 */
65 194 zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){
66
2/2
✓ Branch 0 (2→3) taken 184 times.
✓ Branch 1 (2→4) taken 10 times.
194 if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;}
67 10 else{return zmq::send_flags::none;}
68 }
69
70 ///Convert a recv flag into zmq flag
71 /** @param flag : generic PRecvFlag
72 * @return corresponding zmq::recv_flags
73 */
74 197 zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){
75
2/2
✓ Branch 0 (2→3) taken 187 times.
✓ Branch 1 (2→4) taken 10 times.
197 if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;}
76 10 else{return zmq::recv_flags::none;}
77 }
78
79 ///Create param for a client socket
80 /** @param context : zmq context where to create socket
81 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
82 * @param nbBufferMessage : number of messages to be buffered
83 * @param bufferSizeByte : size of the zmq buffer in bytes
84 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
85 * @param dataRate : expected data rate (in kilobytes per second)
86 * @return corresponding PZmqParam
87 */
88 6 PZmqParam pzmq_createParamClient(int type, int nbBufferMessage,
89 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
90 {
91 PZmqParam param;
92 6 param.type = type;
93 6 param.nbBufferMessage = nbBufferMessage;
94 6 param.bufferSizeByte = bufferSizeByte;
95 6 param.threadAffinity = threadAffinity;
96 6 param.dataRate = dataRate;
97 6 return param;
98 }
99
100 ///Create param for a client socket
101 /** @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
102 * @param nbBufferMessage : number of messages to be buffered
103 * @param bufferSizeByte : size of the zmq buffer in bytes
104 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
105 * @param dataRate : expected data rate (in kilobytes per second)
106 * @return corresponding PZmqParam
107 */
108 7 PZmqParam pzmq_createParamServer(int type, int nbBufferMessage,
109 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
110 {
111 PZmqParam param;
112 7 param.type = type;
113 7 param.nbBufferMessage = nbBufferMessage;
114 7 param.bufferSizeByte = bufferSizeByte;
115 7 param.threadAffinity = threadAffinity;
116 7 param.dataRate = dataRate;
117 7 return param;
118 }
119
120 ///Default constructor of PZmqSocket
121 13 PZmqSocket::PZmqSocket()
122 13 :p_socket(nullptr)
123 {
124
125 13 }
126
127 ///Destructor of PZmqSocket
128 13 PZmqSocket::~PZmqSocket(){
129 13 close();
130 13 }
131
132 ///Create a client socket
133 /** @param context : zmq context where to create socket
134 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
135 * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend)
136 * @return true if the socket has been created, false otherwise
137 */
138 6 bool PZmqSocket::createClientSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
139 12 p_socket = pzmq_createClientSocket(context, socketParam.hostname, socketParam.port, extraParam.type, extraParam.nbBufferMessage,
140 6 extraParam.bufferSizeByte, extraParam.threadAffinity, extraParam.dataRate);
141 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
142 6 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
143 6 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
144 #else
145 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
146 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
147 #endif
148 6 return p_socket != nullptr;
149 }
150
151 ///Create a server socket
152 /** @param context : zmq context where to create socket
153 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
154 * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend)
155 * @return true if the socket has been created, false otherwise
156 */
157 7 bool PZmqSocket::createServerSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
158 14 p_socket = pzmq_createServerSocket(context, socketParam.port, extraParam.type, extraParam.nbBufferMessage, extraParam.bufferSizeByte,
159 7 extraParam.threadAffinity, extraParam.dataRate);
160 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
161 7 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
162 7 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
163 #else
164 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
165 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
166 #endif
167 7 return p_socket != nullptr;
168 }
169
170 ///Send data with the socket
171 /** @param msg : message to be sent with the socket
172 * @param flag : sending flag (BLOCK, NON_BLOCK)
173 * @return status of the send
174 */
175 194 PSendStatus::PSendStatus PZmqSocket::sendMsg(Message & msg, PSendFlag::PSendFlag flag){
176 try {
177
2/2
✓ Branch 0 (2→3) taken 194 times.
✓ Branch 2 (3→4) taken 193 times.
194 zmq::send_result_t result = p_socket->send(msg, convertToSendFlag(flag));
178
1/1
✓ Branch 0 (4→5) taken 193 times.
193 return checkSendStatus(result);
179 }
180
1/2
✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 1 times.
1 catch(const zmq::error_t& e) {
181
182
1/2
✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→15) taken 1 times.
1 if(e.num() == ENOTSOCK){
183 return PSendStatus::BROKEN_SOCKET;
184 }
185
2/10
✗ Branch 0 (16→17) not taken.
✓ Branch 1 (16→23) taken 1 times.
✗ Branch 2 (18→19) not taken.
✗ Branch 3 (18→23) not taken.
✗ Branch 4 (20→21) not taken.
✗ Branch 5 (20→23) not taken.
✗ Branch 6 (22→23) not taken.
✗ Branch 7 (22→24) not taken.
✓ Branch 8 (25→26) taken 1 times.
✗ Branch 9 (25→27) not taken.
1 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
186 1 return PSendStatus::BROKEN_BACKEND;
187 }
188 else if(e.num() == EINTR){
189 return PSendStatus::SIGNAL_INTERRUPTION;
190 }
191 else if(e.num() == EHOSTUNREACH){
192 return PSendStatus::NO_ROUTE_TO_RECEIVER;
193 }
194 else{
195 std::cerr << "Unknown ZMQ error in send: " << e.what() << std::endl;
196 return PSendStatus::BROKEN_BACKEND;
197 }
198 1 }
199 }
200
201 ///Receive data with the socket
202 /** @param msg : message to be received with the socket
203 * @param flag : receiving flag (BLOCK, NON_BLOCK)
204 * @return status of the recv
205 */
206 197 PRecvStatus::PRecvStatus PZmqSocket::recvMsg(Message & msg, PRecvFlag::PRecvFlag flag){
207 try {
208
2/2
✓ Branch 0 (2→3) taken 197 times.
✓ Branch 2 (3→4) taken 195 times.
197 zmq::recv_result_t result = p_socket->recv(msg, convertToRecvFlag(flag));
209
1/1
✓ Branch 0 (4→5) taken 195 times.
195 return checkRecvStatus(result);
210
1/2
✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 2 times.
2 } catch(const zmq::error_t& e)
211 {
212
1/2
✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→15) taken 2 times.
2 if(e.num() == ENOTSOCK){
213 return PRecvStatus::BROKEN_SOCKET;
214 }
215
2/10
✗ Branch 0 (16→17) not taken.
✓ Branch 1 (16→23) taken 2 times.
✗ Branch 2 (18→19) not taken.
✗ Branch 3 (18→23) not taken.
✗ Branch 4 (20→21) not taken.
✗ Branch 5 (20→23) not taken.
✗ Branch 6 (22→23) not taken.
✗ Branch 7 (22→24) not taken.
✓ Branch 8 (25→26) taken 2 times.
✗ Branch 9 (25→27) not taken.
2 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
216 2 return PRecvStatus::BROKEN_BACKEND;
217 }
218 else if(e.num() == EINTR){
219 return PRecvStatus::SIGNAL_INTERRUPTION;
220 }
221 else{
222 std::cerr << "Unknown ZMQ error in recv: " << e.what() << std::endl;
223 return PRecvStatus::BROKEN_BACKEND;
224 }
225 2 }
226 }
227
228 ///Check if the socket is connected
229 /** @return true if the socket is connected, false otherwise
230 */
231 1 bool PZmqSocket::isConnected() const{
232
1/2
✓ Branch 0 (2→3) taken 1 times.
✗ Branch 1 (2→5) not taken.
1 if(p_socket != nullptr){
233 1 return p_socket->handle() != nullptr;
234 }
235 return false;
236 }
237
238 ///Close the socket
239 38 void PZmqSocket::close(){
240
2/2
✓ Branch 0 (2→3) taken 13 times.
✓ Branch 1 (2→8) taken 25 times.
38 if(p_socket != nullptr){
241 13 p_socket->close();
242
1/2
✓ Branch 0 (4→5) taken 13 times.
✗ Branch 1 (4→7) not taken.
13 delete p_socket;
243 13 p_socket = nullptr;
244 }
245 38 }
246
247 ///Default constructor of PZmqBackend setting the number of threads for zmq I/O to 1
248 11 PZmqBackend::PZmqBackend()
249 11 :p_context(1)
250 {
251
252 11 }
253
254 ///Create a client parameter
255 /** @return corresponding PZmqBackend::Param (or PZmqParam)
256 */
257 6 PZmqBackend::Param PZmqBackend::client(){
258 6 return pzmq_createParamClient(ZMQ_PULL);
259 }
260
261 ///Create a server parameter
262 /** @return corresponding PZmqBackend::Param (or PZmqParam)
263 */
264 7 PZmqBackend::Param PZmqBackend::server(){
265 7 return pzmq_createParamServer(ZMQ_PUSH);
266 }
267
268 ///Create a client socket
269 /** @param[out] socket : socket to be created
270 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
271 * @param port : port to be used for the connection
272 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
273 * @return true if the socket has been created, false otherwise
274 */
275 6 bool PZmqBackend::createClientSocket(PZmqBackend::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
276 6 return socket.createClientSocket(p_context, socketParam, param);
277 }
278
279 ///Create a server socket
280 /** @param[out] socket : socket to be created
281 * @param socketParam : parameters of the server (hostname, port), the client has to connect to
282 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
283 * @return true if the socket has been created, false otherwise
284 */
285 7 bool PZmqBackend::createServerSocket(PZmqBackend::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
286 7 return socket.createServerSocket(p_context, socketParam, param);
287 }
288
289 ///Copy current backend message data into mock message
290 /** @param[out] mockMsg : mock message
291 * @param msg : message of the current backend to be converted
292 */
293 void PZmqBackend::msgToMock(DataStreamMsg & mockMsg, const PZmqBackend::Message & msg){
294 size_t dataSize(msg.size());
295 mockMsg.resize(dataSize);
296 memcpy(mockMsg.data(), (const void*)msg.data(), dataSize);
297 }
298
299 ///Copy mock message data into current backend message
300 /** @param[out] msg : message of the current backend to be converted
301 * @param mockMsg : mock message
302 */
303 void PZmqBackend::mockToMsg(PZmqBackend::Message & msg, DataStreamMsg & mockMsg){
304 size_t dataSize(mockMsg.size());
305 msg.rebuild(dataSize);
306 memcpy((void*)msg.data(), mockMsg.data(), dataSize);
307 }
308