GCC Code Coverage Report


Directory: ./
File: src/phoenix_zmq.cpp
Date: 2026-01-23 17:10:06
Exec Total Coverage
Lines: 60 70 85.7%
Functions: 10 11 90.9%
Branches: 36 65 55.4%

Line Branch Exec Source
1 /***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5 ****************************************/
6
7 #include <sstream>
8 #include "phoenix_zmq.h"
9
10
11 ///Create a client socket to be used by the SocketManagerZMQ
12 /** @param context : zeromq context which defines the number of thread to be used in the data transfert
13 * @param type : type of the socket (ZMQ_PULL, ZMQ_PUSH, etc)
14 * @param address : address of the socket (example localhost or 127.0.0.1)
15 * @param port : port to be used
16 * @return create socket
17 */
18 6 zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, int type, const std::string & address, size_t port){
19
1/1
✓ Branch 0 (2→3) taken 6 times.
6 std::stringstream socketAddressData;
20
4/4
✓ Branch 0 (3→4) taken 6 times.
✓ Branch 2 (4→5) taken 6 times.
✓ Branch 4 (5→6) taken 6 times.
✓ Branch 6 (6→7) taken 6 times.
6 socketAddressData << "tcp://" << address <<":" << port;
21
3/6
✓ Branch 0 (7→8) taken 6 times.
✓ Branch 2 (8→9) taken 6 times.
✗ Branch 4 (9→10) not taken.
✓ Branch 5 (9→11) taken 6 times.
✗ Branch 6 (25→26) not taken.
✗ Branch 7 (25→27) not taken.
6 zmq::socket_t *socket = new zmq::socket_t(context, type);
22
1/2
✗ Branch 0 (11→12) not taken.
✓ Branch 1 (11→13) taken 6 times.
6 if(socket == NULL){return NULL;}
23
2/2
✓ Branch 0 (13→14) taken 6 times.
✓ Branch 2 (14→15) taken 6 times.
6 socket->connect(socketAddressData.str());
24
25 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
26
1/1
✓ Branch 0 (16→17) taken 6 times.
6 socket->set(zmq::sockopt::linger, -1); //1 ms to stop
27 #else
28 socket->setsockopt(ZMQ_LINGER, -1); //1 ms to stop
29 #endif
30
1/2
✗ Branch 0 (17→18) not taken.
✓ Branch 1 (17→21) taken 6 times.
6 if(type == ZMQ_SUB){
31 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
32 socket->set(zmq::sockopt::subscribe, "");
33 socket->set(zmq::sockopt::conflate, 1);
34 #else
35 socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
36 int conflate(1);
37 socket->setsockopt(ZMQ_CONFLATE, &conflate, sizeof(int));
38 #endif
39 }
40 6 return socket;
41 6 }
42
43 ///Create a server socket to be used by the SocketManagerZMQ
44 /** @param context : zeromq context which defines the number of thread to be used in the data transfert
45 * @param type : type of the socket (ZMQ_PULL, ZMQ_PUSH, etc)
46 * @param port : port to be used
47 * @return create socket
48 */
49 7 zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, int type, size_t port){
50
1/1
✓ Branch 0 (2→3) taken 7 times.
7 std::stringstream socketAddressData;
51
2/2
✓ Branch 0 (3→4) taken 7 times.
✓ Branch 2 (4→5) taken 7 times.
7 socketAddressData << "tcp://127.0.0.1:" << port;
52
3/6
✓ Branch 0 (5→6) taken 7 times.
✓ Branch 2 (6→7) taken 7 times.
✗ Branch 4 (7→8) not taken.
✓ Branch 5 (7→9) taken 7 times.
✗ Branch 6 (18→19) not taken.
✗ Branch 7 (18→20) not taken.
7 zmq::socket_t *socket = new zmq::socket_t(context, type);
53
1/2
✓ Branch 0 (9→10) taken 7 times.
✗ Branch 1 (9→15) not taken.
7 if(socket != NULL){
54
2/2
✓ Branch 0 (10→11) taken 7 times.
✓ Branch 2 (11→12) taken 7 times.
7 socket->bind(socketAddressData.str());
55 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
56
1/1
✓ Branch 0 (13→14) taken 7 times.
7 socket->set(zmq::sockopt::linger, -1); //1 ms to stop
57 #else
58 socket->setsockopt(ZMQ_LINGER, -1); //1 ms to stop
59 #endif
60 }
61 7 return socket;
62 7 }
63
64 ///Add a server socket to the manager
65 /** @param context : zmq context where to create socket
66 * @param port : port to be used
67 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
68 * @param nbBufferMessage : number of messages to be buffered
69 * @param bufferSizeByte : size of the zmq buffer in bytes
70 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
71 * @param dataRate : expected data rate (in kilobytes per second)
72 * @return zmq socket
73 */
74 7 zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, size_t port, int type, int nbBufferMessage, int bufferSizeByte,
75 size_t threadAffinity, ssize_t dataRate)
76 {
77 7 zmq::socket_t* socket = pzmq_createServerSocket(context, type, port);
78 7 bool b(socket != NULL);
79
1/2
✓ Branch 0 (3→4) taken 7 times.
✗ Branch 1 (3→6) not taken.
7 if(b){
80 7 pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
81 7 pzmq_setThreadAffinity(socket, threadAffinity);
82 }
83 7 return socket;
84 }
85
86
87 ///Add a client socket to the manager
88 /** @param context : zmq context where to create socket
89 * @param address : address of the server to be connected to
90 * @param port : port to be used
91 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
92 * @param nbBufferMessage : number of messages to be buffered
93 * @param bufferSizeByte : size of the zmq buffer in bytes
94 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
95 * @param dataRate : expected data rate (in kilobytes per second)
96 * @return zmq socket
97 */
98 6 zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, const std::string & address, size_t port, int type, int nbBufferMessage,
99 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
100 {
101 6 zmq::socket_t* socket = pzmq_createClientSocket(context, type, address, port);
102 6 bool b(socket != NULL);
103
1/2
✓ Branch 0 (3→4) taken 6 times.
✗ Branch 1 (3→6) not taken.
6 if(b){
104 6 pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
105 6 pzmq_setThreadAffinity(socket, threadAffinity);
106 }
107 6 return socket;
108 }
109
110
111 ///Close the given server socket
112 /** @param[out] socket : pointer to the server socket to be closed (will be set to NULL at then end of the function)
113 */
114 void pzmq_closeServerSocket(zmq::socket_t *& socket){
115 if(socket != NULL){
116 socket->close();
117 delete socket;
118 socket = NULL;
119 }
120 }
121
122 ///Set the number of messages in the messages buffer
123 /** @param[out] socket : socket to be modified
124 * @param nbBufferMessage : number of messages to be buffered
125 */
126 13 void pzmq_setNbMessageBuffer(zmq::socket_t* socket, int nbBufferMessage){
127
1/2
✓ Branch 0 (2→3) taken 13 times.
✗ Branch 1 (2→4) not taken.
13 if(socket != NULL){
128 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
129 13 socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
130 #else
131 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
132 socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage, sizeof(int));
133 socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage, sizeof(int));
134 #endif
135 }
136 13 }
137
138 ///Set the data rate of the socket
139 /** @param[out] socket : socket to be modified
140 * @param type : type of the socket to be used
141 * @param dataRate : expected data rate (in kilobytes per second)
142 */
143 13 void pzmq_setDataRate(zmq::socket_t* socket, int type, int dataRate){
144
3/6
✓ Branch 0 (2→3) taken 13 times.
✗ Branch 1 (2→7) not taken.
✓ Branch 2 (3→4) taken 13 times.
✗ Branch 3 (3→5) not taken.
✗ Branch 4 (4→5) not taken.
✓ Branch 5 (4→7) taken 13 times.
13 if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
145 int dataRateKbit(dataRate*8l);
146 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
147 socket->set(zmq::sockopt::rate, dataRateKbit);
148 #else
149 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
150 socket->setsockopt(ZMQ_RATE, dataRateKbit);
151 #endif
152 }
153 13 }
154
155 ///Set the size of the buffer to received messages
156 /** @param[out] socket : socket to be modified
157 * @param bufferSizeByte : size of the zmq buffer in bytes
158 */
159 6 void pzmq_setRecvBufferSize(zmq::socket_t* socket, int bufferSizeByte){
160
1/2
✓ Branch 0 (2→3) taken 6 times.
✗ Branch 1 (2→4) not taken.
6 if(socket != NULL){
161 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
162 6 socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
163 #else
164 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
165 socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
166 #endif
167 }
168 6 }
169
170 ///Set the size of the buffer to send messages
171 /** @param[out] socket : socket to be modified
172 * @param bufferSizeByte : size of the zmq buffer in bytes
173 */
174 7 void pzmq_setSendBufferSize(zmq::socket_t* socket, int bufferSizeByte){
175
1/2
✓ Branch 0 (2→3) taken 7 times.
✗ Branch 1 (2→4) not taken.
7 if(socket != NULL){
176 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
177 7 socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
178 #else
179 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
180 socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
181 #endif
182 }
183 7 }
184
185 ///Set the thread affinity of zmq
186 /** @param socket : socket to be modified
187 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
188 */
189 13 void pzmq_setThreadAffinity(zmq::socket_t* socket, size_t threadAffinity){
190
1/2
✓ Branch 0 (2→3) taken 13 times.
✗ Branch 1 (2→4) not taken.
13 if(socket != NULL){
191 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
192 13 socket->set(zmq::sockopt::affinity, threadAffinity);
193 #else
194 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
195 socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
196 #endif
197 }
198 13 }
199
200 ///Set the size of the buffer to send messages
201 /** @param[out] socket : socket to be modified
202 * @param type : type of the socket to be used
203 * @param nbBufferMessage : number of messages to be buffered
204 * @param dataRate : expected data rate (in kilobytes per second)
205 * @param bufferSizeByte : size of the zmq buffer in bytes
206 */
207 13 void pzmq_setBufferSize(zmq::socket_t* socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte){
208 13 pzmq_setNbMessageBuffer(socket, nbBufferMessage);
209 13 pzmq_setDataRate(socket, type, dataRate);
210
3/4
✓ Branch 0 (4→5) taken 7 times.
✓ Branch 1 (4→6) taken 6 times.
✗ Branch 2 (5→6) not taken.
✓ Branch 3 (5→7) taken 7 times.
13 if(type == ZMQ_PULL || type == ZMQ_SUB){
211 6 pzmq_setRecvBufferSize(socket, bufferSizeByte);
212
1/4
✗ Branch 0 (7→8) not taken.
✓ Branch 1 (7→9) taken 7 times.
✗ Branch 2 (8→9) not taken.
✗ Branch 3 (8→10) not taken.
7 }else if(type == ZMQ_PUSH || type == ZMQ_PUB){
213 7 pzmq_setSendBufferSize(socket, bufferSizeByte);
214 }
215 13 }
216
217