GCC Code Coverage Report


Directory: ./
File: src/phoenix_zmq.cpp
Date: 2025-06-03 16:44:11
Exec Total Coverage
Lines: 0 57 0.0%
Branches: 0 72 0.0%

Line Branch Exec Source
1 /***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5 ****************************************/
6
7 #include <sstream>
8 #include "phoenix_zmq.h"
9
10
11 ///Create a client socket to be used by the SocketManagerZMQ
12 /** @param context : zeromq context which defines the number of thread to be used in the data transfert
13 * @param type : type of the socket (ZMQ_PULL, ZMQ_PUSH, etc)
14 * @param address : address of the socket (example localhost or 127.0.0.1)
15 * @param port : port to be used
16 * @return create socket
17 */
18 zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, int type, const std::string & address, size_t port){
19 std::stringstream socketAddressData;
20 socketAddressData << "tcp://" << address <<":" << port;
21 zmq::socket_t *socket = new zmq::socket_t(context, type);
22 socket->connect(socketAddressData.str());
23
24 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
25 socket->set(zmq::sockopt::linger, 1); //1 ms to stop
26 #else
27 socket->setsockopt(ZMQ_LINGER, 1); //1 ms to stop
28 #endif
29 if(type == ZMQ_SUB){
30 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
31 socket->set(zmq::sockopt::subscribe, "");
32 socket->set(zmq::sockopt::conflate, 1);
33 #else
34 socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
35 int conflate(1);
36 socket->setsockopt(ZMQ_CONFLATE, &conflate, sizeof(int));
37 #endif
38 }
39 return socket;
40 }
41
42 ///Create a server socket to be used by the SocketManagerZMQ
43 /** @param context : zeromq context which defines the number of thread to be used in the data transfert
44 * @param type : type of the socket (ZMQ_PULL, ZMQ_PUSH, etc)
45 * @param port : port to be used
46 * @return create socket
47 */
48 zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, int type, size_t port){
49 std::stringstream socketAddressData;
50 socketAddressData << "tcp://*:" << port;
51 zmq::socket_t *socket = new zmq::socket_t(context, type);
52 socket->bind(socketAddressData.str());
53 return socket;
54 }
55
56 ///Add a server socket to the manager
57 /** @param context : zmq context where to create socket
58 * @param port : port to be used
59 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
60 * @param nbBufferMessage : number of messages to be buffered
61 * @param bufferSizeByte : size of the zmq buffer in bytes
62 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
63 * @param dataRate : expected data rate (in kilobytes per second)
64 * @return zmq socket
65 */
66 zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, size_t port, int type, int nbBufferMessage, int bufferSizeByte,
67 size_t threadAffinity, ssize_t dataRate)
68 {
69 zmq::socket_t* socket = pzmq_createServerSocket(context, type, port);
70 bool b(socket != NULL);
71 if(b){
72 pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
73 pzmq_setThreadAffinity(socket, threadAffinity);
74 }
75 return socket;
76 }
77
78
79 ///Add a client socket to the manager
80 /** @param context : zmq context where to create socket
81 * @param address : address of the server to be connected to
82 * @param port : port to be used
83 * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
84 * @param nbBufferMessage : number of messages to be buffered
85 * @param bufferSizeByte : size of the zmq buffer in bytes
86 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
87 * @param dataRate : expected data rate (in kilobytes per second)
88 * @return zmq socket
89 */
90 zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, const std::string & address, size_t port, int type, int nbBufferMessage,
91 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
92 {
93 zmq::socket_t* socket = pzmq_createClientSocket(context, type, address, port);
94 bool b(socket != NULL);
95 if(b){
96 pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
97 pzmq_setThreadAffinity(socket, threadAffinity);
98 }
99 return socket;
100 }
101
102 ///Close the given server socket
103 /** @param[out] socket : pointer to the server socket to be closed (will be set to NULL at then end of the function)
104 */
105 void pzmq_closeServerSocket(zmq::socket_t *& socket){
106 if(socket != NULL){
107 delete socket;
108 socket = NULL;
109 }
110 }
111
112 ///Set the number of messages in the messages buffer
113 /** @param[out] socket : socket to be modified
114 * @param nbBufferMessage : number of messages to be buffered
115 */
116 void pzmq_setNbMessageBuffer(zmq::socket_t* socket, int nbBufferMessage){
117 if(socket != NULL){
118 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
119 socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
120 #else
121 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
122 socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage, sizeof(int));
123 socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage, sizeof(int));
124 #endif
125 }
126 }
127
128 ///Set the data rate of the socket
129 /** @param[out] socket : socket to be modified
130 * @param type : type of the socket to be used
131 * @param dataRate : expected data rate (in kilobytes per second)
132 */
133 void pzmq_setDataRate(zmq::socket_t* socket, int type, int dataRate){
134 if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
135 int dataRateKbit(dataRate*8l);
136 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137 socket->set(zmq::sockopt::rate, dataRateKbit);
138 #else
139 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
140 socket->setsockopt(ZMQ_RATE, dataRateKbit);
141 #endif
142 }
143 }
144
145 ///Set the size of the buffer to recieved messages
146 /** @param[out] socket : socket to be modified
147 * @param bufferSizeByte : size of the zmq buffer in bytes
148 */
149 void pzmq_setRecvBufferSize(zmq::socket_t* socket, int bufferSizeByte){
150 if(socket != NULL){
151 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
152 socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
153 #else
154 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
155 socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
156 #endif
157 }
158 }
159
160 ///Set the size of the buffer to send messages
161 /** @param[out] socket : socket to be modified
162 * @param bufferSizeByte : size of the zmq buffer in bytes
163 */
164 void pzmq_setSendBufferSize(zmq::socket_t* socket, int bufferSizeByte){
165 if(socket != NULL){
166 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
167 socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
168 #else
169 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
170 socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
171 #endif
172 }
173 }
174
175 ///Set the thread affinity of zmq
176 /** @param socket : socket to be modified
177 * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
178 */
179 void pzmq_setThreadAffinity(zmq::socket_t* socket, size_t threadAffinity){
180 if(socket != NULL){
181 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
182 socket->set(zmq::sockopt::affinity, threadAffinity);
183 #else
184 //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
185 socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
186 #endif
187 }
188 }
189
190 ///Set the size of the buffer to send messages
191 /** @param[out] socket : socket to be modified
192 * @param type : type of the socket to be used
193 * @param nbBufferMessage : number of messages to be buffered
194 * @param dataRate : expected data rate (in kilobytes per second)
195 * @param bufferSizeByte : size of the zmq buffer in bytes
196 */
197 void pzmq_setBufferSize(zmq::socket_t* socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte){
198 pzmq_setNbMessageBuffer(socket, nbBufferMessage);
199 pzmq_setDataRate(socket, type, dataRate);
200 if(type == ZMQ_PULL || type == ZMQ_SUB){
201 pzmq_setRecvBufferSize(socket, bufferSizeByte);
202 }else if(type == ZMQ_PUSH || type == ZMQ_PUB){
203 pzmq_setSendBufferSize(socket, bufferSizeByte);
204 }
205 }
206
207