PhoenixZMQ  6.0.0
Library which integrates zeromq use
Loading...
Searching...
No Matches
PZmqBackend.cpp
Go to the documentation of this file.
1/***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5****************************************/
6
7#include "PZmqBackend.h"
8
10
20PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res){
21 if(res.has_value()){
22 return PSendStatus::OK;
23 }
24 else{
25 int err = zmq_errno();
26 if(err == EAGAIN){
27 return PSendStatus::SOCKET_NOT_AVAILABLE;
28 }
29 else{
30 std::cerr << "Unknown ZMQ error in send: '" << err << "'"<< std::endl;
31 return PSendStatus::BROKEN_BACKEND;
32 }
33 }
34}
35
37
46PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res){
47 if(res.has_value()){
48 return PRecvStatus::OK;
49 }
50 else{
51 int err = zmq_errno();
52 if(err == EAGAIN){
53 return PRecvStatus::NO_MESSAGE_RECEIVED;
54 }
55 else{
56 return PRecvStatus::BROKEN_SOCKET;
57 }
58 }
59}
60
62
65zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){
66 if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;}
67 else{return zmq::send_flags::none;}
68}
69
71
74zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){
75 if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;}
76 else{return zmq::recv_flags::none;}
77}
78
80
88PZmqParam pzmq_createParamClient(int type, int nbBufferMessage,
89 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
90{
91 PZmqParam param;
92 param.type = type;
93 param.nbBufferMessage = nbBufferMessage;
94 param.bufferSizeByte = bufferSizeByte;
95 param.threadAffinity = threadAffinity;
96 param.dataRate = dataRate;
97 return param;
98}
99
101
108PZmqParam pzmq_createParamServer(int type, int nbBufferMessage,
109 int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
110{
111 PZmqParam param;
112 param.type = type;
113 param.nbBufferMessage = nbBufferMessage;
114 param.bufferSizeByte = bufferSizeByte;
115 param.threadAffinity = threadAffinity;
116 param.dataRate = dataRate;
117 return param;
118}
119
122:p_socket(nullptr)
123{
124
125}
126
131
133
138bool PZmqSocket::createClientSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
139 p_socket = pzmq_createClientSocket(context, socketParam.hostname, socketParam.port, extraParam.type, extraParam.nbBufferMessage,
140 extraParam.bufferSizeByte, extraParam.threadAffinity, extraParam.dataRate);
141#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
142 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
143 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
144#else
145 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
146 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
147#endif
148 return p_socket != nullptr;
149}
150
152
157bool PZmqSocket::createServerSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
158 p_socket = pzmq_createServerSocket(context, socketParam.port, extraParam.type, extraParam.nbBufferMessage, extraParam.bufferSizeByte,
159 extraParam.threadAffinity, extraParam.dataRate);
160#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
161 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
162 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
163#else
164 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
165 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
166#endif
167 return p_socket != nullptr;
168}
169
171
175PSendStatus::PSendStatus PZmqSocket::sendMsg(Message & msg, PSendFlag::PSendFlag flag){
176 try {
177 zmq::send_result_t result = p_socket->send(msg, convertToSendFlag(flag));
178 return checkSendStatus(result);
179 }
180 catch(const zmq::error_t& e) {
181
182 if(e.num() == ENOTSOCK){
183 return PSendStatus::BROKEN_SOCKET;
184 }
185 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
186 return PSendStatus::BROKEN_BACKEND;
187 }
188 else if(e.num() == EINTR){
189 return PSendStatus::SIGNAL_INTERRUPTION;
190 }
191 else if(e.num() == EHOSTUNREACH){
192 return PSendStatus::NO_ROUTE_TO_RECEIVER;
193 }
194 else{
195 std::cerr << "Unknown ZMQ error in send: " << e.what() << std::endl;
196 return PSendStatus::BROKEN_BACKEND;
197 }
198 }
199}
200
202
206PRecvStatus::PRecvStatus PZmqSocket::recvMsg(Message & msg, PRecvFlag::PRecvFlag flag){
207 try {
208 zmq::recv_result_t result = p_socket->recv(msg, convertToRecvFlag(flag));
209 return checkRecvStatus(result);
210 } catch(const zmq::error_t& e)
211 {
212 if(e.num() == ENOTSOCK){
213 return PRecvStatus::BROKEN_SOCKET;
214 }
215 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
216 return PRecvStatus::BROKEN_BACKEND;
217 }
218 else if(e.num() == EINTR){
219 return PRecvStatus::SIGNAL_INTERRUPTION;
220 }
221 else{
222 std::cerr << "Unknown ZMQ error in recv: " << e.what() << std::endl;
223 return PRecvStatus::BROKEN_BACKEND;
224 }
225 }
226}
227
229
232 if(p_socket != nullptr){
233 return p_socket->handle() != nullptr;
234 }
235 return false;
236}
237
240 if(p_socket != nullptr){
241 p_socket->close();
242 delete p_socket;
243 p_socket = nullptr;
244 }
245}
246
249:p_context(1)
250{
251
252}
253
255
260
262
267
269
275bool PZmqBackend::createClientSocket(PZmqBackend::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
276 return socket.createClientSocket(p_context, socketParam, param);
277}
278
280
285bool PZmqBackend::createServerSocket(PZmqBackend::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
286 return socket.createServerSocket(p_context, socketParam, param);
287}
288
290
293void PZmqBackend::msgToMock(DataStreamMsg & mockMsg, const PZmqBackend::Message & msg){
294 size_t dataSize(msg.size());
295 mockMsg.resize(dataSize);
296 memcpy(mockMsg.data(), (const void*)msg.data(), dataSize);
297}
298
300
303void PZmqBackend::mockToMsg(PZmqBackend::Message & msg, DataStreamMsg & mockMsg){
304 size_t dataSize(mockMsg.size());
305 msg.rebuild(dataSize);
306 memcpy((void*)msg.data(), mockMsg.data(), dataSize);
307}
zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag)
Convert a send flag into zmq flag.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
Create param for a client socket.
zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag)
Convert a recv flag into zmq flag.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
Create param for a client socket.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l)
Create param for a client socket.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l)
Create param for a client socket.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
zmq::context_t p_context
Context ZMQ.
Definition PZmqBackend.h:89
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
Definition PZmqBackend.h:74
PZmqBackend()
Default constructor of PZmqBackend setting the number of threads for zmq I/O to 1.
void msgToMock(DataStreamMsg &mockMsg, const Message &msg)
Copy current backend message data into mock message.
bool createClientSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam &param)
Create a client socket.
static Param client()
Create a client parameter.
bool createServerSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam &param)
Create a server socket.
PZmqSocket Socket
Define the socket of the backend used by the PAbstractSocketManager.
Definition PZmqBackend.h:70
void mockToMsg(Message &msg, DataStreamMsg &mockMsg)
Copy mock message data into current backend message.
static Param server()
Create a server parameter.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
Definition PZmqBackend.h:72
zmq::socket_t * p_socket
ZMQ Socket.
Definition PZmqBackend.h:63
PZmqSocket()
Default constructor of PZmqSocket.
bool isConnected() const
Check if the socket is connected.
bool createClientSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a client socket.
PSendStatus::PSendStatus sendMsg(Message &msg, PSendFlag::PSendFlag flag=PSendFlag::BLOCK)
Send data with the socket.
void close()
Close the socket.
bool createServerSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a server socket.
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
Definition PZmqBackend.h:43
virtual ~PZmqSocket()
Destructor of PZmqSocket.
PRecvStatus::PRecvStatus recvMsg(Message &msg, PRecvFlag::PRecvFlag flag=PRecvFlag::BLOCK)
Receive data with the socket.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
Definition PZmqBackend.h:41
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Set of parameters to be passed to create a socket with zmq backend.
Definition PZmqBackend.h:17
ssize_t dataRate
Data rate.
Definition PZmqBackend.h:27
int nbBufferMessage
Number of messages in the buffer.
Definition PZmqBackend.h:21
int bufferSizeByte
Size of the message buffer in bytes.
Definition PZmqBackend.h:23
int type
Socket type.
Definition PZmqBackend.h:19
size_t threadAffinity
Mask of threads which deal with reconnection.
Definition PZmqBackend.h:25