| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /*************************************** | ||
| 2 | Auteur : Pierre Aubert | ||
| 3 | Mail : pierre.aubert@lapp.in2p3.fr | ||
| 4 | Licence : CeCILL-C | ||
| 5 | ****************************************/ | ||
| 6 | |||
| 7 | #include "PZmqBackend.h" | ||
| 8 | |||
| 9 | ///Check the send result and convert it into PSendStatus | ||
| 10 | /** | ||
| 11 | * @param res : result of the zmq send | ||
| 12 | * @return corresponding PSendStatus | ||
| 13 | * | ||
| 14 | * ZMQ socket send has a bit of a strange implementation: | ||
| 15 | * @li if sending works: the send_result_t has a value | ||
| 16 | * @li if sending doesn't work due to EAGAIN error: the send_result_t is empty (no value) and error should be EAGAIN | ||
| 17 | * @li if sending doesn't work for another reason: an exception is thrown | ||
| 18 | * This function only checks the behaviour due to the send result, not the thrown error. | ||
| 19 | */ | ||
| 20 | 195 | PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res){ | |
| 21 |
2/2✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 175 times.
|
195 | if(res.has_value()){ |
| 22 | 20 | return PSendStatus::OK; | |
| 23 | }else{ | ||
| 24 | 175 | int err = zmq_errno(); | |
| 25 |
1/2✓ Branch 0 (6→7) taken 175 times.
✗ Branch 1 (6→8) not taken.
|
175 | if(err == EAGAIN){ |
| 26 | 175 | return PSendStatus::SOCKET_NOT_AVAILABLE; | |
| 27 | }else{ | ||
| 28 | ✗ | std::cerr << "Unknown ZMQ error in send: '" << err << "'"<< std::endl; | |
| 29 | ✗ | return PSendStatus::BROKEN_BACKEND; | |
| 30 | } | ||
| 31 | } | ||
| 32 | } | ||
| 33 | |||
| 34 | ///Check the recv result and convert it into PRecvStatus | ||
| 35 | /** @param res : result of the zmq recv | ||
| 36 | * @return corresponding PRecvStatus | ||
| 37 | * ZMQ socket recv has a bit of a strange implementation: | ||
| 38 | * @li if recv works: the recv_result_t has a value | ||
| 39 | * @li if recv doesn't work due to EAGAIN error: the recv_result_t is empty (no value) and error should be EAGAIN | ||
| 40 | * @li if recv doesn't work for another reason: an exception is thrown | ||
| 41 | * This function only checks the behaviour due to the recv result, not the thrown error. | ||
| 42 | */ | ||
| 43 | 195 | PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res){ | |
| 44 |
2/2✓ Branch 0 (3→4) taken 20 times.
✓ Branch 1 (3→5) taken 175 times.
|
195 | if(res.has_value()){ |
| 45 | 20 | return PRecvStatus::OK; | |
| 46 | }else{ | ||
| 47 | 175 | int err = zmq_errno(); | |
| 48 |
1/2✓ Branch 0 (6→7) taken 175 times.
✗ Branch 1 (6→8) not taken.
|
175 | if(err == EAGAIN){ |
| 49 | 175 | return PRecvStatus::NO_MESSAGE_RECEIVED; | |
| 50 | }else{ | ||
| 51 | ✗ | return PRecvStatus::BROKEN_SOCKET; | |
| 52 | } | ||
| 53 | } | ||
| 54 | } | ||
| 55 | |||
| 56 | ///Convert a send flag into zmq flag | ||
| 57 | /** @param flag : generic PSendFlag | ||
| 58 | * @return corresponding zmq::send_flags | ||
| 59 | */ | ||
| 60 | 196 | zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){ | |
| 61 |
2/2✓ Branch 0 (2→3) taken 186 times.
✓ Branch 1 (2→4) taken 10 times.
|
196 | if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;} |
| 62 | 10 | else{return zmq::send_flags::none;} | |
| 63 | } | ||
| 64 | |||
| 65 | ///Convert a recv flag into zmq flag | ||
| 66 | /** @param flag : generic PRecvFlag | ||
| 67 | * @return corresponding zmq::recv_flags | ||
| 68 | */ | ||
| 69 | 197 | zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){ | |
| 70 |
2/2✓ Branch 0 (2→3) taken 187 times.
✓ Branch 1 (2→4) taken 10 times.
|
197 | if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;} |
| 71 | 10 | else{return zmq::recv_flags::none;} | |
| 72 | } | ||
| 73 | |||
| 74 | ///Create param for a client socket | ||
| 75 | /** @param context : zmq context where to create socket | ||
| 76 | * @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) | ||
| 77 | * @param nbBufferMessage : number of messages to be buffered | ||
| 78 | * @param bufferSizeByte : size of the zmq buffer in bytes | ||
| 79 | * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) | ||
| 80 | * @param dataRate : expected data rate (in kilobytes per second) | ||
| 81 | * @param linger : linger period for socket shutdown | ||
| 82 | * @return corresponding PZmqParam | ||
| 83 | */ | ||
| 84 | 6 | PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger){ | |
| 85 | PZmqParam param; | ||
| 86 | 6 | param.type = type; | |
| 87 | 6 | param.nbBufferMessage = nbBufferMessage; | |
| 88 | 6 | param.bufferSizeByte = bufferSizeByte; | |
| 89 | 6 | param.threadAffinity = threadAffinity; | |
| 90 | 6 | param.dataRate = dataRate; | |
| 91 | 6 | param.linger = linger; | |
| 92 | 6 | return param; | |
| 93 | } | ||
| 94 | |||
| 95 | ///Create param for a client socket | ||
| 96 | /** @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) | ||
| 97 | * @param nbBufferMessage : number of messages to be buffered | ||
| 98 | * @param bufferSizeByte : size of the zmq buffer in bytes | ||
| 99 | * @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) | ||
| 100 | * @param dataRate : expected data rate (in kilobytes per second) | ||
| 101 | * @param linger : linger period for socket shutdown | ||
| 102 | * @return corresponding PZmqParam | ||
| 103 | */ | ||
| 104 | 7 | PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger){ | |
| 105 | PZmqParam param; | ||
| 106 | 7 | param.type = type; | |
| 107 | 7 | param.nbBufferMessage = nbBufferMessage; | |
| 108 | 7 | param.bufferSizeByte = bufferSizeByte; | |
| 109 | 7 | param.threadAffinity = threadAffinity; | |
| 110 | 7 | param.dataRate = dataRate; | |
| 111 | 7 | param.linger = linger; | |
| 112 | 7 | return param; | |
| 113 | } | ||
| 114 | |||
| 115 | ///Default constructor of PZmqSocket | ||
| 116 | 13 | PZmqSocket::PZmqSocket() | |
| 117 | 13 | :p_socket(NULL) | |
| 118 | { | ||
| 119 | |||
| 120 | 13 | } | |
| 121 | |||
| 122 | ///Destructor of PZmqSocket | ||
| 123 | 13 | PZmqSocket::~PZmqSocket(){ | |
| 124 | 13 | close(); | |
| 125 | 13 | } | |
| 126 | |||
| 127 | ///Create a client socket | ||
| 128 | /** @param context : zmq context where to create socket | ||
| 129 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 130 | * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 131 | * @return true if the socket has been created, false otherwise | ||
| 132 | */ | ||
| 133 | 6 | bool PZmqSocket::createClientSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){ | |
| 134 | 12 | p_socket = pzmq_createClientSocket(context, socketParam.hostname, socketParam.port, extraParam.type, extraParam.nbBufferMessage, | |
| 135 | 6 | extraParam.bufferSizeByte, extraParam.threadAffinity, extraParam.dataRate); | |
| 136 | #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 | ||
| 137 | 6 | p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut); | |
| 138 | 6 | p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut); | |
| 139 | 6 | p_socket->set(zmq::sockopt::linger, extraParam.linger); //number of ms to stop | |
| 140 | #else | ||
| 141 | p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int)); | ||
| 142 | p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int)); | ||
| 143 | p_socket->setsockopt(ZMQ_LINGER, &extraParam.linger, sizeof(int)); //number of ms to stop | ||
| 144 | #endif | ||
| 145 | 6 | return p_socket != NULL; | |
| 146 | } | ||
| 147 | |||
| 148 | ///Create a server socket | ||
| 149 | /** @param context : zmq context where to create socket | ||
| 150 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 151 | * @param extraParam : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 152 | * @return true if the socket has been created, false otherwise | ||
| 153 | */ | ||
| 154 | 7 | bool PZmqSocket::createServerSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){ | |
| 155 | 14 | p_socket = pzmq_createServerSocket(context, socketParam.port, extraParam.type, extraParam.nbBufferMessage, extraParam.bufferSizeByte, | |
| 156 | 7 | extraParam.threadAffinity, extraParam.dataRate); | |
| 157 | #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 | ||
| 158 | 7 | p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut); | |
| 159 | 7 | p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut); | |
| 160 | 7 | p_socket->set(zmq::sockopt::linger, extraParam.linger); //number of ms to stop | |
| 161 | #else | ||
| 162 | p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int)); | ||
| 163 | p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int)); | ||
| 164 | p_socket->setsockopt(ZMQ_LINGER, &extraParam.linger, sizeof(int)); //number of ms to stop | ||
| 165 | #endif | ||
| 166 | 7 | return p_socket != NULL; | |
| 167 | } | ||
| 168 | |||
| 169 | ///Send data with the socket | ||
| 170 | /** @param msg : message to be sent with the socket | ||
| 171 | * @param flag : sending flag (BLOCK, NON_BLOCK) | ||
| 172 | * @return status of the send | ||
| 173 | */ | ||
| 174 | 196 | PSendStatus::PSendStatus PZmqSocket::sendMsg(Message & msg, PSendFlag::PSendFlag flag){ | |
| 175 | try { | ||
| 176 |
2/2✓ Branch 0 (2→3) taken 196 times.
✓ Branch 2 (3→4) taken 195 times.
|
196 | zmq::send_result_t result = p_socket->send(msg, convertToSendFlag(flag)); |
| 177 |
1/1✓ Branch 0 (4→5) taken 195 times.
|
195 | return checkSendStatus(result); |
| 178 |
1/2✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 1 times.
|
1 | }catch(const zmq::error_t& e) { |
| 179 |
1/2✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→15) taken 1 times.
|
1 | if(e.num() == ENOTSOCK){ |
| 180 | ✗ | return PSendStatus::BROKEN_SOCKET; | |
| 181 |
2/10✗ Branch 0 (16→17) not taken.
✓ Branch 1 (16→23) taken 1 times.
✗ Branch 2 (18→19) not taken.
✗ Branch 3 (18→23) not taken.
✗ Branch 4 (20→21) not taken.
✗ Branch 5 (20→23) not taken.
✗ Branch 6 (22→23) not taken.
✗ Branch 7 (22→24) not taken.
✓ Branch 8 (25→26) taken 1 times.
✗ Branch 9 (25→27) not taken.
|
1 | }else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){ |
| 182 | 1 | return PSendStatus::BROKEN_BACKEND; | |
| 183 | ✗ | }else if(e.num() == EINTR){ | |
| 184 | ✗ | return PSendStatus::SIGNAL_INTERRUPTION; | |
| 185 | ✗ | }else if(e.num() == EHOSTUNREACH){ | |
| 186 | ✗ | return PSendStatus::NO_ROUTE_TO_RECEIVER; | |
| 187 | }else{ | ||
| 188 | ✗ | std::cerr << "Unknown ZMQ error in send: " << e.what() << std::endl; | |
| 189 | ✗ | return PSendStatus::BROKEN_BACKEND; | |
| 190 | } | ||
| 191 | 1 | } | |
| 192 | } | ||
| 193 | |||
| 194 | ///Receive data with the socket | ||
| 195 | /** @param msg : message to be received with the socket | ||
| 196 | * @param flag : receiving flag (BLOCK, NON_BLOCK) | ||
| 197 | * @return status of the recv | ||
| 198 | */ | ||
| 199 | 197 | PRecvStatus::PRecvStatus PZmqSocket::recvMsg(Message & msg, PRecvFlag::PRecvFlag flag){ | |
| 200 | try { | ||
| 201 |
2/2✓ Branch 0 (2→3) taken 197 times.
✓ Branch 2 (3→4) taken 195 times.
|
197 | zmq::recv_result_t result = p_socket->recv(msg, convertToRecvFlag(flag)); |
| 202 |
1/1✓ Branch 0 (4→5) taken 195 times.
|
195 | return checkRecvStatus(result); |
| 203 |
1/2✗ Branch 0 (9→10) not taken.
✓ Branch 1 (9→11) taken 2 times.
|
2 | }catch(const zmq::error_t& e){ |
| 204 |
1/2✗ Branch 0 (13→14) not taken.
✓ Branch 1 (13→19) taken 2 times.
|
2 | if(e.num() == ENOTSOCK){ |
| 205 | ✗ | std::cerr << "ZMQ error in recv: " << e.what() << std::endl; | |
| 206 | ✗ | return PRecvStatus::BROKEN_SOCKET; | |
| 207 | } | ||
| 208 |
2/10✗ Branch 0 (20→21) not taken.
✓ Branch 1 (20→27) taken 2 times.
✗ Branch 2 (22→23) not taken.
✗ Branch 3 (22→27) not taken.
✗ Branch 4 (24→25) not taken.
✗ Branch 5 (24→27) not taken.
✗ Branch 6 (26→27) not taken.
✗ Branch 7 (26→28) not taken.
✓ Branch 8 (29→30) taken 2 times.
✗ Branch 9 (29→35) not taken.
|
2 | else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){ |
| 209 |
3/3✓ Branch 0 (30→31) taken 2 times.
✓ Branch 2 (32→33) taken 2 times.
✓ Branch 4 (33→34) taken 2 times.
|
2 | std::cerr << "ZMQ error in recv: " << e.what() << std::endl; |
| 210 | 2 | return PRecvStatus::BROKEN_BACKEND; | |
| 211 | } | ||
| 212 | ✗ | else if(e.num() == EINTR){ | |
| 213 | ✗ | std::cerr << "ZMQ error in recv: " << e.what() << std::endl; | |
| 214 | ✗ | return PRecvStatus::SIGNAL_INTERRUPTION; | |
| 215 | } | ||
| 216 | else{ | ||
| 217 | ✗ | std::cerr << "Unknown ZMQ error in recv: " << e.what() << std::endl; | |
| 218 | ✗ | return PRecvStatus::BROKEN_BACKEND; | |
| 219 | } | ||
| 220 | 2 | } | |
| 221 | } | ||
| 222 | |||
| 223 | ///Check if the socket is connected | ||
| 224 | /** @return true if the socket is connected, false otherwise | ||
| 225 | */ | ||
| 226 | 1 | bool PZmqSocket::isConnected() const{ | |
| 227 |
1/2✓ Branch 0 (2→3) taken 1 times.
✗ Branch 1 (2→5) not taken.
|
1 | if(p_socket != NULL){ |
| 228 | 1 | return p_socket->handle() != NULL; | |
| 229 | } | ||
| 230 | ✗ | return false; | |
| 231 | } | ||
| 232 | |||
| 233 | ///Close the socket | ||
| 234 | 38 | void PZmqSocket::close(){ | |
| 235 |
2/2✓ Branch 0 (2→3) taken 13 times.
✓ Branch 1 (2→8) taken 25 times.
|
38 | if(p_socket != NULL){ |
| 236 | 13 | p_socket->close(); | |
| 237 |
1/2✓ Branch 0 (4→5) taken 13 times.
✗ Branch 1 (4→7) not taken.
|
13 | delete p_socket; |
| 238 | 13 | p_socket = NULL; | |
| 239 | } | ||
| 240 | 38 | } | |
| 241 | |||
| 242 | ///Default constructor of PZmqSocketGenerator setting the number of threads for zmq I/O to 1 | ||
| 243 | 11 | PZmqSocketGenerator::PZmqSocketGenerator() | |
| 244 | 11 | :p_context(1) | |
| 245 | { | ||
| 246 | |||
| 247 | 11 | } | |
| 248 | |||
| 249 | ///Create a client parameter | ||
| 250 | /** @return corresponding PZmqSocketGenerator::Param (or PZmqParam) | ||
| 251 | */ | ||
| 252 | 6 | PZmqSocketGenerator::Param PZmqSocketGenerator::client(){ | |
| 253 | 6 | return pzmq_createParamClient(ZMQ_PULL); | |
| 254 | } | ||
| 255 | |||
| 256 | ///Create a server parameter | ||
| 257 | /** @return corresponding PZmqSocketGenerator::Param (or PZmqParam) | ||
| 258 | */ | ||
| 259 | 7 | PZmqSocketGenerator::Param PZmqSocketGenerator::server(){ | |
| 260 | 7 | return pzmq_createParamServer(ZMQ_PUSH); | |
| 261 | } | ||
| 262 | |||
| 263 | ///Create a client socket | ||
| 264 | /** @param[out] socket : socket to be created | ||
| 265 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 266 | * @param port : port to be used for the connection | ||
| 267 | * @param param : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 268 | * @return true if the socket has been created, false otherwise | ||
| 269 | */ | ||
| 270 | 6 | bool PZmqSocketGenerator::createClientSocket(PZmqSocketGenerator::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){ | |
| 271 | 6 | return socket.createClientSocket(p_context, socketParam, param); | |
| 272 | } | ||
| 273 | |||
| 274 | ///Create a server socket | ||
| 275 | /** @param[out] socket : socket to be created | ||
| 276 | * @param socketParam : parameters of the server (hostname, port), the client has to connect to | ||
| 277 | * @param param : extra customisable parameters for the creation of the socket (depends on the backend) | ||
| 278 | * @return true if the socket has been created, false otherwise | ||
| 279 | */ | ||
| 280 | 7 | bool PZmqSocketGenerator::createServerSocket(PZmqSocketGenerator::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){ | |
| 281 | 7 | return socket.createServerSocket(p_context, socketParam, param); | |
| 282 | } | ||
| 283 | |||
| 284 | ///Copy current backend message data into mock message | ||
| 285 | /** @param[out] mockMsg : mock message | ||
| 286 | * @param msg : message of the current backend to be converted | ||
| 287 | */ | ||
| 288 | ✗ | void PZmqSocketGenerator::msgToMock(DataStreamMsg & mockMsg, const PZmqSocketGenerator::Message & msg){ | |
| 289 | ✗ | size_t dataSize(msg.size()); | |
| 290 | ✗ | mockMsg.resize(dataSize); | |
| 291 | ✗ | memcpy(mockMsg.data(), (const void*)msg.data(), dataSize); | |
| 292 | ✗ | } | |
| 293 | |||
| 294 | ///Copy mock message data into current backend message | ||
| 295 | /** @param[out] msg : message of the current backend to be converted | ||
| 296 | * @param mockMsg : mock message | ||
| 297 | */ | ||
| 298 | ✗ | void PZmqSocketGenerator::mockToMsg(PZmqSocketGenerator::Message & msg, DataStreamMsg & mockMsg){ | |
| 299 | ✗ | size_t dataSize(mockMsg.size()); | |
| 300 | ✗ | msg.rebuild(dataSize); | |
| 301 | ✗ | memcpy((void*)msg.data(), mockMsg.data(), dataSize); | |
| 302 | ✗ | } | |
| 303 |