GCC Code Coverage Report


Directory: ./
File: src/PZmqBackend.cpp
Date: 2025-06-03 16:44:11
Exec Total Coverage
Lines: 26 81 32.1%
Branches: 2 20 10.0%

Line Branch Exec Source
1 /***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5 ****************************************/
6
7 #include "PZmqBackend.h"
8
9 ///Create param for a client socket
10 /** @param address : address of the server to be connected to
11 * @param port : port to be used
12 * @param context : zmq context where to create socket
13 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
14 * @param nbBufferMessage : number of messages to be buffered
15 * @param bufferSizeByte : size of the zmq buffer in bytes
16 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
17 * @param dataRate : expected data rate (in kilobytes per second)
18 * @return corresponding PZmqParam
19 */
20 1 PZmqParam pzmq_createParamClient(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage,
21 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
22 {
23 1 PZmqParam param;
24
1/1
✓ Branch 1 taken 1 times.
1 param.address = address;
25 1 param.port = port;
26 1 param.context = context;
27 1 param.type = type;
28 1 param.nbBufferMessage = nbBufferMessage;
29 1 param.bufferSizeByte = bufferSizeByte;
30 1 param.threadAffinity = threadAffinity;
31 1 param.dataRate = dataRate;
32 1 return param;
33 }
34
35 ///Create param for a client socket
36 /** @param address : address of the server to be connected to
37 * @param port : port to be used
38 * @param context : zmq context where to create socket
39 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
40 * @param nbBufferMessage : number of messages to be buffered
41 * @param bufferSizeByte : size of the zmq buffer in bytes
42 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
43 * @param dataRate : expected data rate (in kilobytes per second)
44 * @return corresponding PZmqParam
45 */
46 1 PZmqParam pzmq_createParamServer(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage,
47 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
48 {
49 1 PZmqParam param;
50
1/1
✓ Branch 1 taken 1 times.
1 param.address = address;
51 1 param.port = port;
52 1 param.context = context;
53 1 param.type = type;
54 1 param.nbBufferMessage = nbBufferMessage;
55 1 param.bufferSizeByte = bufferSizeByte;
56 1 param.threadAffinity = threadAffinity;
57 1 param.dataRate = dataRate;
58 1 return param;
59 }
60
61 ///Default constructor of PZmqBackend
62 PZmqBackend::PZmqBackend(){
63
64 }
65
66 ///Create a client parameter
67 /** @param address : address to be connected to
68 * @param port : port to be used
69 * @param context : zmq context where to create socket
70 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
71 * @param nbBufferMessage : number of messages to be buffered
72 * @param bufferSizeByte : size of the zmq buffer in bytes
73 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
74 * @param dataRate : expected data rate (in kilobytes per second)
75 * @return corresponding PZmqBackend::Param (or PZmqParam)
76 */
77 1 PZmqBackend::Param PZmqBackend::client(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate){
78 1 return pzmq_createParamClient(address, port, context, type, nbBufferMessage, bufferSizeByte, threadAffinity, dataRate);
79 }
80
81 ///Create a server parameter
82 /** @param address : address to be connected to
83 * @param port : port to be used
84 * @param context : zmq context where to create socket
85 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
86 * @param nbBufferMessage : number of messages to be buffered
87 * @param bufferSizeByte : size of the zmq buffer in bytes
88 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
89 * @param dataRate : expected data rate (in kilobytes per second)
90 * @return corresponding PZmqBackend::Param (or PZmqParam)
91 */
92 1 PZmqBackend::Param PZmqBackend::server(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate){
93 1 return pzmq_createParamServer(address, port, context, type, nbBufferMessage, bufferSizeByte, threadAffinity, dataRate);
94 }
95
96 ///Create a client socket
97 /** @param[out] socket : socket to be created
98 * @param address : address of the server, the client has to connect to
99 * @param port : port to be used for the connection
100 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
101 * @return true if the socket has been created, false otherwise
102 */
103 bool PZmqBackend::createClientSocket(PZmqBackend::Socket & socket, const std::string & address, size_t port, const PZmqParam & param){
104 socket = pzmq_createClientSocket(*param.context, address, port, param.type, param.nbBufferMessage,
105 param.bufferSizeByte, param.threadAffinity, param.dataRate);
106 return socket != NULL;
107 }
108
109 ///Create a server socket
110 /** @param[out] socket : socket to be created
111 * @param address : address of the server, the client has to connect to
112 * @param port : port to be used for the connection
113 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
114 * @return true if the socket has been created, false otherwise
115 */
116 bool PZmqBackend::createServerSocket(PZmqBackend::Socket & socket, const std::string & address, size_t port, const PZmqParam & param){
117 socket = pzmq_createServerSocket(*param.context, port, param.type, param.nbBufferMessage, param.bufferSizeByte,
118 param.threadAffinity, param.dataRate);
119 return socket != NULL;
120 }
121
122 ///Create a client socket
123 /** @param[out] socket : socket to be created
124 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
125 * @return true if the socket has been created, false otherwise
126 */
127 bool PZmqBackend::createClientSocket(Socket & socket, const PZmqBackend::Param & param){
128 socket = pzmq_createClientSocket(*param.context, param.address, param.port, param.type, param.nbBufferMessage,
129 param.bufferSizeByte, param.threadAffinity, param.dataRate);
130 return socket != NULL;
131 }
132
133 ///Create a server socket
134 /** @param[out] socket : socket to be created
135 * @param param : extra customisable parameters for the creation of the socket (depends on the backend)
136 * @return true if the socket has been created, false otherwise
137 */
138 bool PZmqBackend::createServerSocket(Socket & socket, const PZmqBackend::Param & param){
139 socket = pzmq_createServerSocket(*param.context, param.port, param.type, param.nbBufferMessage, param.bufferSizeByte,
140 param.threadAffinity, param.dataRate);
141 return socket != NULL;
142 }
143
144 ///Convert a send flag into zmq flag
145 /** @param flag : generic PSendFlag
146 * @return corresponding zmq::send_flags
147 */
148 zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){
149 if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;}
150 else{return zmq::send_flags::none;}
151 }
152
153 ///Send message on the given socket
154 /** @param socket : socket to be used
155 * @param msg : message to be sent
156 * @param flag : flag to be used to send the message (BLOCK, NON_BLOCK, etc)
157 * @return true on success, false otherwise
158 */
159 bool PZmqBackend::send(PZmqBackend::Socket & socket, PZmqBackend::Message & msg, PSendFlag::PSendFlag flag){
160 if(socket != NULL){
161 zmq::send_result_t res = socket->send(msg, convertToSendFlag(flag));
162 return res.has_value(); //Seems, if there is no value, there is also no error
163 }else{
164 return false;
165 }
166 }
167
168 ///Convert a recv flag into zmq flag
169 /** @param flag : generic PRecvFlag
170 * @return corresponding zmq::recv_flags
171 */
172 zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){
173 if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;}
174 else{return zmq::recv_flags::none;}
175 }
176
177 ///Recieve message from the given socket
178 /** @param socket : socket to be used
179 * @param msg : message to be recieved
180 * @param flag : flags to be used to send the message (BLOCK, NON_BLOCK, etc)
181 * @return true on success, false otherwise
182 */
183 bool PZmqBackend::recv(PZmqBackend::Socket & socket, PZmqBackend::Message & msg, PRecvFlag::PRecvFlag flag){
184 if(socket != NULL){
185 zmq::recv_result_t res = socket->recv(msg, convertToRecvFlag(flag));
186 return res.has_value(); //Seems, if there is no value, there is also no error
187 }else{
188 return false;
189 }
190 }
191
192 ///Resize a message
193 /** @param[out] msg : message to be resized
194 * @param sizeMsg : new size of the message
195 */
196 void PZmqBackend::msgResize(PZmqBackend::Message & msg, size_t sizeMsg){
197 msg.rebuild(sizeMsg);
198 }
199
200 ///Get the size of a message
201 /** @param msg : message to be used
202 * @return size of the message in bytes
203 */
204 size_t PZmqBackend::msgSize(const PZmqBackend::Message & msg){
205 return msg.size();
206 }
207
208 ///Get the data of a message
209 /** @param msg : message to be used
210 * @return data of the message in bytes
211 */
212 const DataStreamIter PZmqBackend::msgData(const PZmqBackend::Message & msg){
213 return (const DataStreamIter)msg.data();
214 }
215
216 ///Get the data of a message
217 /** @param msg : message to be used
218 * @return data of the message in bytes
219 */
220 DataStreamIter PZmqBackend::msgData(PZmqBackend::Message & msg){
221 return (DataStreamIter)msg.data();
222 }
223
224 ///Close the given socket
225 /** @param[out] socket : socket to be closed
226 */
227 void PZmqBackend::close(PZmqBackend::Socket & socket){
228 pzmq_closeServerSocket(socket);
229 }
230
231 ///Close the given socket
232 /** @param socket : socket to be checked
233 * @return true if the socket is connected, false otherwise
234 */
235 bool PZmqBackend::isConnected(const PZmqBackend::Socket & socket){
236 if(socket != NULL){
237 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
238 return socket->handle() != NULL;
239 #else
240 return socket->connected();
241 #endif
242 }else{
243 return false;
244 }
245 }
246
247 ///Copy current backend message data into mock message
248 /** @param[out] mockMsg : mock message
249 * @param msg : message of the current backend to be converted
250 */
251 void PZmqBackend::msgToMock(DataStreamMsg & mockMsg, const PZmqBackend::Message & msg){
252 size_t dataSize(PZmqBackend::msgSize(msg));
253 mockMsg.resize(dataSize);
254 memcpy(mockMsg.data(), PZmqBackend::msgData(msg), dataSize);
255 }
256
257 ///Copy mock message data into current backend message
258 /** @param[out] msg : message of the current backend to be converted
259 * @param mockMsg : mock message
260 */
261 void PZmqBackend::mockToMsg(PZmqBackend::Message & msg, DataStreamMsg & mockMsg){
262 size_t dataSize(mockMsg.size());
263 PZmqBackend::msgResize(msg, dataSize);
264 memcpy(PZmqBackend::msgData(msg), mockMsg.data(), dataSize);
265 }
266
267
268
269
270
271