PhoenixZMQ  7.2.0
Library which integrates zeromq use
Loading...
Searching...
No Matches
PZmqBackend.cpp
Go to the documentation of this file.
1/***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5****************************************/
6
7#include "PZmqBackend.h"
8
10
20PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res){
21 if(res.has_value()){
22 return PSendStatus::OK;
23 }else{
24 int err = zmq_errno();
25 if(err == EAGAIN){
26 return PSendStatus::SOCKET_NOT_AVAILABLE;
27 }else{
28 std::cerr << "Unknown ZMQ error in send: '" << err << "'"<< std::endl;
29 return PSendStatus::BROKEN_BACKEND;
30 }
31 }
32}
33
35
43PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res){
44 if(res.has_value()){
45 return PRecvStatus::OK;
46 }else{
47 int err = zmq_errno();
48 if(err == EAGAIN){
49 return PRecvStatus::NO_MESSAGE_RECEIVED;
50 }else{
51 return PRecvStatus::BROKEN_SOCKET;
52 }
53 }
54}
55
57
60zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag){
61 if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;}
62 else{return zmq::send_flags::none;}
63}
64
66
69zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag){
70 if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;}
71 else{return zmq::recv_flags::none;}
72}
73
75
84PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger){
85 PZmqParam param;
86 param.type = type;
87 param.nbBufferMessage = nbBufferMessage;
88 param.bufferSizeByte = bufferSizeByte;
89 param.threadAffinity = threadAffinity;
90 param.dataRate = dataRate;
91 param.linger = linger;
92 return param;
93}
94
96
104PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger){
105 PZmqParam param;
106 param.type = type;
107 param.nbBufferMessage = nbBufferMessage;
108 param.bufferSizeByte = bufferSizeByte;
109 param.threadAffinity = threadAffinity;
110 param.dataRate = dataRate;
111 param.linger = linger;
112 return param;
113}
114
117 :p_socket(NULL)
118{
119
120}
121
126
128
133bool PZmqSocket::createClientSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
134 p_socket = pzmq_createClientSocket(context, socketParam.hostname, socketParam.port, extraParam.type, extraParam.nbBufferMessage,
135 extraParam.bufferSizeByte, extraParam.threadAffinity, extraParam.dataRate);
136#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
138 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
139 p_socket->set(zmq::sockopt::linger, extraParam.linger); //number of ms to stop
140#else
141 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
142 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
143 p_socket->setsockopt(ZMQ_LINGER, extraParam.linger); //number of ms to stop
144#endif
145 return p_socket != NULL;
146}
147
149
154bool PZmqSocket::createServerSocket(zmq::context_t & context, const PSocketParam & socketParam, const Param & extraParam){
155 p_socket = pzmq_createServerSocket(context, socketParam.port, extraParam.type, extraParam.nbBufferMessage, extraParam.bufferSizeByte,
156 extraParam.threadAffinity, extraParam.dataRate);
157#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
158 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
159 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
160 p_socket->set(zmq::sockopt::linger, extraParam.linger); //number of ms to stop
161#else
162 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut, sizeof(int));
163 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut, sizeof(int));
164 p_socket->setsockopt(ZMQ_LINGER, extraParam.linger); //number of ms to stop
165#endif
166 return p_socket != NULL;
167}
168
170
174PSendStatus::PSendStatus PZmqSocket::sendMsg(Message & msg, PSendFlag::PSendFlag flag){
175 try {
176 zmq::send_result_t result = p_socket->send(msg, convertToSendFlag(flag));
177 return checkSendStatus(result);
178 }catch(const zmq::error_t& e) {
179 if(e.num() == ENOTSOCK){
180 return PSendStatus::BROKEN_SOCKET;
181 }else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
182 return PSendStatus::BROKEN_BACKEND;
183 }else if(e.num() == EINTR){
184 return PSendStatus::SIGNAL_INTERRUPTION;
185 }else if(e.num() == EHOSTUNREACH){
186 return PSendStatus::NO_ROUTE_TO_RECEIVER;
187 }else{
188 std::cerr << "Unknown ZMQ error in send: " << e.what() << std::endl;
189 return PSendStatus::BROKEN_BACKEND;
190 }
191 }
192}
193
195
199PRecvStatus::PRecvStatus PZmqSocket::recvMsg(Message & msg, PRecvFlag::PRecvFlag flag){
200 try {
201 zmq::recv_result_t result = p_socket->recv(msg, convertToRecvFlag(flag));
202 return checkRecvStatus(result);
203 }catch(const zmq::error_t& e){
204 if(e.num() == ENOTSOCK){
205 std::cerr << "ZMQ error in recv: " << e.what() << std::endl;
206 return PRecvStatus::BROKEN_SOCKET;
207 }
208 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
209 std::cerr << "ZMQ error in recv: " << e.what() << std::endl;
210 return PRecvStatus::BROKEN_BACKEND;
211 }
212 else if(e.num() == EINTR){
213 std::cerr << "ZMQ error in recv: " << e.what() << std::endl;
214 return PRecvStatus::SIGNAL_INTERRUPTION;
215 }
216 else{
217 std::cerr << "Unknown ZMQ error in recv: " << e.what() << std::endl;
218 return PRecvStatus::BROKEN_BACKEND;
219 }
220 }
221}
222
224
227 if(p_socket != NULL){
228 return p_socket->handle() != NULL;
229 }
230 return false;
231}
232
235 if(p_socket != NULL){
236 p_socket->close();
237 delete p_socket;
238 p_socket = NULL;
239 }
240}
241
248
250
255
257
262
264
270bool PZmqSocketGenerator::createClientSocket(PZmqSocketGenerator::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
271 return socket.createClientSocket(p_context, socketParam, param);
272}
273
275
280bool PZmqSocketGenerator::createServerSocket(PZmqSocketGenerator::Socket & socket, const PSocketParam & socketParam, const PZmqParam & param){
281 return socket.createServerSocket(p_context, socketParam, param);
282}
283
285
288void PZmqSocketGenerator::msgToMock(DataStreamMsg & mockMsg, const PZmqSocketGenerator::Message & msg){
289 size_t dataSize(msg.size());
290 mockMsg.resize(dataSize);
291 memcpy(mockMsg.data(), (const void*)msg.data(), dataSize);
292}
293
295
299 size_t dataSize(mockMsg.size());
300 msg.rebuild(dataSize);
301 memcpy((void*)msg.data(), mockMsg.data(), dataSize);
302}
zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag)
Convert a send flag into zmq flag.
zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag)
Convert a recv flag into zmq flag.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger)
Create param for a client socket.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger)
Create param for a client socket.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l, int linger=-1)
Create param for a client socket.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l, int linger=-1)
Create param for a client socket.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
zmq::context_t p_context
Context ZMQ.
Definition PZmqBackend.h:91
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
Definition PZmqBackend.h:76
PZmqSocket Socket
Define the socket of the backend used by the PAbstractSocketManager.
Definition PZmqBackend.h:72
bool createServerSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam &param)
Create a server socket.
bool createClientSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam &param)
Create a client socket.
static void mockToMsg(Message &msg, DataStreamMsg &mockMsg)
Copy mock message data into current backend message.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
Definition PZmqBackend.h:74
PZmqSocketGenerator()
Default constructor of PZmqSocketGenerator setting the number of threads for zmq I/O to 1.
static void msgToMock(DataStreamMsg &mockMsg, const Message &msg)
Copy current backend message data into mock message.
static Param server()
Create a server parameter.
static Param client()
Create a client parameter.
zmq::socket_t * p_socket
ZMQ Socket.
Definition PZmqBackend.h:65
PZmqSocket()
Default constructor of PZmqSocket.
bool isConnected() const
Check if the socket is connected.
bool createClientSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a client socket.
PSendStatus::PSendStatus sendMsg(Message &msg, PSendFlag::PSendFlag flag=PSendFlag::BLOCK)
Send data with the socket.
void close()
Close the socket.
bool createServerSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a server socket.
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
Definition PZmqBackend.h:45
virtual ~PZmqSocket()
Destructor of PZmqSocket.
PRecvStatus::PRecvStatus recvMsg(Message &msg, PRecvFlag::PRecvFlag flag=PRecvFlag::BLOCK)
Receive data with the socket.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
Definition PZmqBackend.h:43
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Set of parameters to be passed to create a socket with zmq backend.
Definition PZmqBackend.h:17
ssize_t dataRate
Data rate.
Definition PZmqBackend.h:27
int nbBufferMessage
Number of messages in the buffer.
Definition PZmqBackend.h:21
int bufferSizeByte
Size of the message buffer in bytes.
Definition PZmqBackend.h:23
int type
Socket type.
Definition PZmqBackend.h:19
int linger
linger period for socket shutdown
Definition PZmqBackend.h:29
size_t threadAffinity
Mask of threads which deal with reconnection.
Definition PZmqBackend.h:25