| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /*************************************** | ||
| 2 | Auteur : Pierre Aubert | ||
| 3 | Mail : pierre.aubert@lapp.in2p3.fr | ||
| 4 | Licence : CeCILL-C | ||
| 5 | ****************************************/ | ||
| 6 | |||
| 7 | #include "PZmqBackend.h" | ||
| 8 | |||
| 9 | ///Check the send result and convert it into PSendStatus | ||
| 10 | /** | ||
| 11 | * @param res : result of the zmq send | ||
| 12 | * @return corresponding PSendStatus | ||
| 13 | * | ||
| 14 | * ZMQ socket send has a bit of a strange implementation: | ||
| 15 | * @li if sending works: the send_result_t has a value | ||
| 16 | * @li if sending doesn't work due to EAGAIN error: the send_result_t is empty (no value) and error should be EAGAIN | ||
| 17 | * @li if sending doesn't work for another reason: an exception is thrown | ||
| 18 | * This function only checks the behaviour due to the send result, not the thrown error. | ||
| 19 | */ | ||
| 20 | 193 | PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res){ | |
| 21 |
2/2✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 173 times.
|
193 | if(res.has_value()){ |
| 22 | 20 | return PSendStatus::OK; | |
| 23 | } | ||
| 24 | else{ | ||
| 25 | 173 | int err = zmq_errno(); | |
| 26 |
1/2✓ Branch 0 (6→7) taken 173 times.
✗ Branch 1 (6→8) not taken.
|
173 | if(err == EAGAIN){ |
| 27 | 173 | return PSendStatus::SOCKET_NOT_AVAILABLE; | |
| 28 | } | ||
| 29 | else{ | ||
| 30 | ✗ | std::cerr << "Unknown ZMQ error in send: '" << err << "'"<< std::endl; | |
| 31 | ✗ | return PSendStatus::BROKEN_BACKEND; | |
| 32 | } | ||
| 33 | } | ||
| 34 | } | ||
| 35 | |||
| 36 | ///Check the recv result and convert it into PRecvStatus | ||
| 37 | /** @param res : result of the zmq recv | ||
| 38 | * @return corresponding PRecvStatus | ||
| 39 | * | ||
| 40 | * ZMQ socket recv has a bit of a strange implementation: | ||
| 41 | * @li if recv works: the recv_result_t has a value | ||
| 42 | * @li if recv doesn't work due to EAGAIN error: the recv_result_t is empty (no value) and error should be EAGAIN | ||
| 43 | * @li if recv doesn't work for another reason: an exception is thrown | ||
| 44 | * This function only checks the behaviour due to the recv result, not the thrown error. | ||
| 45 | */ | ||
| 46 | 195 | PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res){ | |
| 47 |
2/2✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 175 times.
|
195 | if(res.has_value()){ |
| 48 | 20 | return PRecvStatus::OK; | |
| 49 | } | ||
| 50 | else{ | ||
| 51 | 175 | int err = zmq_errno(); | |
| 52 |
1/2✓ Branch 0 (6→7) taken 175 times.
✗ Branch 1 (6→8) not taken.
|
175 | if(err == EAGAIN){ |
| 53 | 175 | return PRecvStatus::NO_MESSAGE_RECEIVED; | |
| 54 | } | ||
| 55 | else{ | ||
| 56 | ✗ | return PRecvStatus::BROKEN_SOCKET; | |
| 57 | } | ||
| 58 | } | ||
| 59 | } | ||
| 60 | |||
| 61 | ///Convert a send flag into zmq flag | ||
| 62 | /** @param flag : generic PSendFlag | ||
| 63 | * @return corresponding zmq::send_flags | ||
| 64 | */ | ||
| 65 | 194 | zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){ | |
| 66 |
2/2✓ Branch 0 (2→3) taken 184 times.
✓ Branch 1 (2→4) taken 10 times.
|
194 | if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;} |
| 67 | 10 | else{return zmq::send_flags::none;} | |
| 68 | } | ||
| 69 | |||
| 70 | ///Convert a recv flag into zmq flag | ||
| 71 | /** @param flag : generic PRecvFlag | ||
| 72 | * @return corresponding zmq::recv_flags | ||
| 73 | */ | ||
| 74 | 197 | zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){ | |
| 75 |
2/2✓ Branch 0 (2→3) taken 187 times.
✓ Branch 1 (2→4) taken 10 times.
|
197 | if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;} |
| 76 | 10 | else{return zmq::recv_flags::none;} | |
| 77 | } | ||
| 78 | |||
| 79 | ///Create param for a client socket | ||
| 80 | /** @param context : zmq context where to create socket | ||
| 81 | * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) | ||
| 82 | * @param nbBufferMessage : number of messages to be buffered | ||
| 83 | * @param bufferSizeByte : size of the zmq buffer in bytes | ||
| 84 | * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) | ||
| 85 | * @param dataRate : expected data rate (in kilobytes per second) | ||
| 86 | * @return corresponding PZmqParam | ||
| 87 | */ | ||
| 88 | 6 | PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, | |
| 89 | int bufferSizeByte, size_t threadAffinity, ssize_t dataRate) | ||
| 90 | { | ||
| 91 | PZmqParam param; | ||
| 92 | 6 | param.type = type; | |
| 93 | 6 | param.nbBufferMessage = nbBufferMessage; | |
| 94 | 6 | param.bufferSizeByte = bufferSizeByte; | |
| 95 | 6 | param.threadAffinity = threadAffinity; | |
| 96 | 6 | param.dataRate = dataRate; | |
| 97 | 6 | return param; | |
| 98 | } | ||
| 99 | |||
| 100 | ///Create param for a client socket | ||
| 101 | /** @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) | ||
| 102 | * @param nbBufferMessage : number of messages to be buffered | ||
| 103 | * @param bufferSizeByte : size of the zmq buffer in bytes | ||
| 104 | * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) | ||
| 105 | * @param dataRate : expected data rate (in kilobytes per second) | ||
| 106 | * @return corresponding PZmqParam | ||
| 107 | */ | ||
| 108 | 7 | PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, | |
| 109 | int bufferSizeByte, size_t threadAffinity, ssize_t dataRate) | ||
| 110 | { | ||
| 111 | PZmqParam param; | ||
| 112 | 7 | param.type = type; | |
| 113 | 7 | param.nbBufferMessage = nbBufferMessage; | |
| 114 | 7 | param.bufferSizeByte = bufferSizeByte; | |
| 115 | 7 | param.threadAffinity = threadAffinity; | |
| 116 | 7 | param.dataRate = dataRate; | |
| 117 | 7 | return param; | |
| 118 | } | ||
| 119 | |||
| 120 | ///Default constructor of PZmqSocket | ||
| 121 | 13 | PZmqSocket::PZmqSocket() | |
| 122 | 13 | :p_socket(nullptr) | |
| 123 | { | ||
| 124 | |||
| 125 | 13 | } | |
| 126 | |||
| 127 | ///Destructor of PZmqSocket | ||
| 128 | 13 | PZmqSocket::~PZmqSocket(){ | |
| 129 | 13 | close(); | |
| 130 | 13 | } | |
| 131 | |||
| 132 | ///Create a client socket | ||
| 133 | /** @param context : zmq context where to create socket | ||
| 134 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 135 | * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 136 | * @return true if the socket has been created, false otherwise | ||
| 137 | */ | ||
| 138 | 6 | bool PZmqSocket::createClientSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){ | |
| 139 | 12 | p_socket = pzmq_createClientSocket(context, socketParam.hostname, socketParam.port, extraParam.type, extraParam.nbBufferMessage, | |
| 140 | 6 | extraParam.bufferSizeByte, extraParam.threadAffinity, extraParam.dataRate); | |
| 141 | #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 | ||
| 142 | 6 | p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut); | |
| 143 | 6 | p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut); | |
| 144 | #else | ||
| 145 | p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int)); | ||
| 146 | p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int)); | ||
| 147 | #endif | ||
| 148 | 6 | return p_socket != nullptr; | |
| 149 | } | ||
| 150 | |||
| 151 | ///Create a server socket | ||
| 152 | /** @param context : zmq context where to create socket | ||
| 153 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 154 | * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 155 | * @return true if the socket has been created, false otherwise | ||
| 156 | */ | ||
| 157 | 7 | bool PZmqSocket::createServerSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){ | |
| 158 | 14 | p_socket = pzmq_createServerSocket(context, socketParam.port, extraParam.type, extraParam.nbBufferMessage, extraParam.bufferSizeByte, | |
| 159 | 7 | extraParam.threadAffinity, extraParam.dataRate); | |
| 160 | #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 | ||
| 161 | 7 | p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut); | |
| 162 | 7 | p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut); | |
| 163 | #else | ||
| 164 | p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int)); | ||
| 165 | p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int)); | ||
| 166 | #endif | ||
| 167 | 7 | return p_socket != nullptr; | |
| 168 | } | ||
| 169 | |||
| 170 | ///Send data with the socket | ||
| 171 | /** @param msg : message to be sent with the socket | ||
| 172 | * @param flag : sending flag (BLOCK, NON_BLOCK) | ||
| 173 | * @return status of the send | ||
| 174 | */ | ||
| 175 | 194 | PSendStatus::PSendStatus PZmqSocket::sendMsg(Message & msg, PSendFlag::PSendFlag flag){ | |
| 176 | try { | ||
| 177 |
2/2✓ Branch 0 (2→3) taken 194 times.
✓ Branch 2 (3→4) taken 193 times.
|
194 | zmq::send_result_t result = p_socket->send(msg, convertToSendFlag(flag)); |
| 178 |
1/1✓ Branch 0 (4→5) taken 193 times.
|
193 | return checkSendStatus(result); |
| 179 | } | ||
| 180 |
1/2✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 1 times.
|
1 | catch(const zmq::error_t& e) { |
| 181 | |||
| 182 |
1/2✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→15) taken 1 times.
|
1 | if(e.num() == ENOTSOCK){ |
| 183 | ✗ | return PSendStatus::BROKEN_SOCKET; | |
| 184 | } | ||
| 185 |
2/10✗ Branch 0 (16→17) not taken.
✓ Branch 1 (16→23) taken 1 times.
✗ Branch 2 (18→19) not taken.
✗ Branch 3 (18→23) not taken.
✗ Branch 4 (20→21) not taken.
✗ Branch 5 (20→23) not taken.
✗ Branch 6 (22→23) not taken.
✗ Branch 7 (22→24) not taken.
✓ Branch 8 (25→26) taken 1 times.
✗ Branch 9 (25→27) not taken.
|
1 | else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){ |
| 186 | 1 | return PSendStatus::BROKEN_BACKEND; | |
| 187 | } | ||
| 188 | ✗ | else if(e.num() == EINTR){ | |
| 189 | ✗ | return PSendStatus::SIGNAL_INTERRUPTION; | |
| 190 | } | ||
| 191 | ✗ | else if(e.num() == EHOSTUNREACH){ | |
| 192 | ✗ | return PSendStatus::NO_ROUTE_TO_RECEIVER; | |
| 193 | } | ||
| 194 | else{ | ||
| 195 | ✗ | std::cerr << "Unknown ZMQ error in send: " << e.what() << std::endl; | |
| 196 | ✗ | return PSendStatus::BROKEN_BACKEND; | |
| 197 | } | ||
| 198 | 1 | } | |
| 199 | } | ||
| 200 | |||
| 201 | ///Receive data with the socket | ||
| 202 | /** @param msg : message to be received with the socket | ||
| 203 | * @param flag : receiving flag (BLOCK, NON_BLOCK) | ||
| 204 | * @return status of the recv | ||
| 205 | */ | ||
| 206 | 197 | PRecvStatus::PRecvStatus PZmqSocket::recvMsg(Message & msg, PRecvFlag::PRecvFlag flag){ | |
| 207 | try { | ||
| 208 |
2/2✓ Branch 0 (2→3) taken 197 times.
✓ Branch 2 (3→4) taken 195 times.
|
197 | zmq::recv_result_t result = p_socket->recv(msg, convertToRecvFlag(flag)); |
| 209 |
1/1✓ Branch 0 (4→5) taken 195 times.
|
195 | return checkRecvStatus(result); |
| 210 |
1/2✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 2 times.
|
2 | } catch(const zmq::error_t& e) |
| 211 | { | ||
| 212 |
1/2✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→15) taken 2 times.
|
2 | if(e.num() == ENOTSOCK){ |
| 213 | ✗ | return PRecvStatus::BROKEN_SOCKET; | |
| 214 | } | ||
| 215 |
2/10✗ Branch 0 (16→17) not taken.
✓ Branch 1 (16→23) taken 2 times.
✗ Branch 2 (18→19) not taken.
✗ Branch 3 (18→23) not taken.
✗ Branch 4 (20→21) not taken.
✗ Branch 5 (20→23) not taken.
✗ Branch 6 (22→23) not taken.
✗ Branch 7 (22→24) not taken.
✓ Branch 8 (25→26) taken 2 times.
✗ Branch 9 (25→27) not taken.
|
2 | else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){ |
| 216 | 2 | return PRecvStatus::BROKEN_BACKEND; | |
| 217 | } | ||
| 218 | ✗ | else if(e.num() == EINTR){ | |
| 219 | ✗ | return PRecvStatus::SIGNAL_INTERRUPTION; | |
| 220 | } | ||
| 221 | else{ | ||
| 222 | ✗ | std::cerr << "Unknown ZMQ error in recv: " << e.what() << std::endl; | |
| 223 | ✗ | return PRecvStatus::BROKEN_BACKEND; | |
| 224 | } | ||
| 225 | 2 | } | |
| 226 | } | ||
| 227 | |||
| 228 | ///Check if the socket is connected | ||
| 229 | /** @return true if the socket is connected, false otherwise | ||
| 230 | */ | ||
| 231 | 1 | bool PZmqSocket::isConnected() const{ | |
| 232 |
1/2✓ Branch 0 (2→3) taken 1 times.
✗ Branch 1 (2→5) not taken.
|
1 | if(p_socket != nullptr){ |
| 233 | 1 | return p_socket->handle() != nullptr; | |
| 234 | } | ||
| 235 | ✗ | return false; | |
| 236 | } | ||
| 237 | |||
| 238 | ///Close the socket | ||
| 239 | 38 | void PZmqSocket::close(){ | |
| 240 |
2/2✓ Branch 0 (2→3) taken 13 times.
✓ Branch 1 (2→8) taken 25 times.
|
38 | if(p_socket != nullptr){ |
| 241 | 13 | p_socket->close(); | |
| 242 |
1/2✓ Branch 0 (4→5) taken 13 times.
✗ Branch 1 (4→7) not taken.
|
13 | delete p_socket; |
| 243 | 13 | p_socket = nullptr; | |
| 244 | } | ||
| 245 | 38 | } | |
| 246 | |||
| 247 | ///Default constructor of PZmqBackend setting the number of threads for zmq I/O to 1 | ||
| 248 | 11 | PZmqBackend::PZmqBackend() | |
| 249 | 11 | :p_context(1) | |
| 250 | { | ||
| 251 | |||
| 252 | 11 | } | |
| 253 | |||
| 254 | ///Create a client parameter | ||
| 255 | /** @return corresponding PZmqBackend::Param (or PZmqParam) | ||
| 256 | */ | ||
| 257 | 6 | PZmqBackend::Param PZmqBackend::client(){ | |
| 258 | 6 | return pzmq_createParamClient(ZMQ_PULL); | |
| 259 | } | ||
| 260 | |||
| 261 | ///Create a server parameter | ||
| 262 | /** @return corresponding PZmqBackend::Param (or PZmqParam) | ||
| 263 | */ | ||
| 264 | 7 | PZmqBackend::Param PZmqBackend::server(){ | |
| 265 | 7 | return pzmq_createParamServer(ZMQ_PUSH); | |
| 266 | } | ||
| 267 | |||
| 268 | ///Create a client socket | ||
| 269 | /** @param[out] socket : socket to be created | ||
| 270 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 271 | * @param port : port to be used for the connection | ||
| 272 | * @param param : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 273 | * @return true if the socket has been created, false otherwise | ||
| 274 | */ | ||
| 275 | 6 | bool PZmqBackend::createClientSocket(PZmqBackend::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){ | |
| 276 | 6 | return socket.createClientSocket(p_context, socketParam, param); | |
| 277 | } | ||
| 278 | |||
| 279 | ///Create a server socket | ||
| 280 | /** @param[out] socket : socket to be created | ||
| 281 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 282 | * @param param : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 283 | * @return true if the socket has been created, false otherwise | ||
| 284 | */ | ||
| 285 | 7 | bool PZmqBackend::createServerSocket(PZmqBackend::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){ | |
| 286 | 7 | return socket.createServerSocket(p_context, socketParam, param); | |
| 287 | } | ||
| 288 | |||
| 289 | ///Copy current backend message data into mock message | ||
| 290 | /** @param[out] mockMsg : mock message | ||
| 291 | * @param msg : message of the current backend to be converted | ||
| 292 | */ | ||
| 293 | ✗ | void PZmqBackend::msgToMock(DataStreamMsg & mockMsg, const PZmqBackend::Message & msg){ | |
| 294 | ✗ | size_t dataSize(msg.size()); | |
| 295 | ✗ | mockMsg.resize(dataSize); | |
| 296 | ✗ | memcpy(mockMsg.data(), (const void*)msg.data(), dataSize); | |
| 297 | ✗ | } | |
| 298 | |||
| 299 | ///Copy mock message data into current backend message | ||
| 300 | /** @param[out] msg : message of the current backend to be converted | ||
| 301 | * @param mockMsg : mock message | ||
| 302 | */ | ||
| 303 | ✗ | void PZmqBackend::mockToMsg(PZmqBackend::Message & msg, DataStreamMsg & mockMsg){ | |
| 304 | ✗ | size_t dataSize(mockMsg.size()); | |
| 305 | ✗ | msg.rebuild(dataSize); | |
| 306 | ✗ | memcpy((void*)msg.data(), mockMsg.data(), dataSize); | |
| 307 | ✗ | } | |
| 308 |