22 return PSendStatus::OK;
25 int err = zmq_errno();
27 return PSendStatus::SOCKET_NOT_AVAILABLE;
30 std::cerr <<
"Unknown ZMQ error in send: '" << err <<
"'"<< std::endl;
31 return PSendStatus::BROKEN_BACKEND;
48 return PRecvStatus::OK;
51 int err = zmq_errno();
53 return PRecvStatus::NO_MESSAGE_RECEIVED;
56 return PRecvStatus::BROKEN_SOCKET;
66 if(flag == PSendFlag::NON_BLOCK){
return zmq::send_flags::dontwait;}
67 else{
return zmq::send_flags::none;}
75 if(flag == PRecvFlag::NON_BLOCK){
return zmq::recv_flags::dontwait;}
76 else{
return zmq::recv_flags::none;}
89 int bufferSizeByte,
size_t threadAffinity, ssize_t dataRate)
109 int bufferSizeByte,
size_t threadAffinity, ssize_t dataRate)
141#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
142 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
143 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
145 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut,
sizeof(
int));
146 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut,
sizeof(
int));
160#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
161 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
162 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
164 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut,
sizeof(
int));
165 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut,
sizeof(
int));
180 catch(
const zmq::error_t& e) {
182 if(e.num() == ENOTSOCK){
183 return PSendStatus::BROKEN_SOCKET;
185 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
186 return PSendStatus::BROKEN_BACKEND;
188 else if(e.num() == EINTR){
189 return PSendStatus::SIGNAL_INTERRUPTION;
191 else if(e.num() == EHOSTUNREACH){
192 return PSendStatus::NO_ROUTE_TO_RECEIVER;
195 std::cerr <<
"Unknown ZMQ error in send: " << e.what() << std::endl;
196 return PSendStatus::BROKEN_BACKEND;
210 }
catch(
const zmq::error_t& e)
212 if(e.num() == ENOTSOCK){
213 return PRecvStatus::BROKEN_SOCKET;
215 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
216 return PRecvStatus::BROKEN_BACKEND;
218 else if(e.num() == EINTR){
219 return PRecvStatus::SIGNAL_INTERRUPTION;
222 std::cerr <<
"Unknown ZMQ error in recv: " << e.what() << std::endl;
223 return PRecvStatus::BROKEN_BACKEND;
233 return p_socket->handle() !=
nullptr;
294 size_t dataSize(msg.size());
295 mockMsg.resize(dataSize);
296 memcpy(mockMsg.data(), (
const void*)msg.data(), dataSize);
304 size_t dataSize(mockMsg.size());
305 msg.rebuild(dataSize);
306 memcpy((
void*)msg.data(), mockMsg.data(), dataSize);
zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag)
Convert a send flag into zmq flag.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
Create param for a client socket.
zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag)
Convert a recv flag into zmq flag.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
Create param for a client socket.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l)
Create param for a client socket.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l)
Create param for a client socket.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
zmq::context_t p_context
Context ZMQ.
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
PZmqBackend()
Default constructor of PZmqBackend setting the number of threads for zmq I/O to 1.
void msgToMock(DataStreamMsg &mockMsg, const Message &msg)
Copy current backend message data into mock message.
bool createClientSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam ¶m)
Create a client socket.
static Param client()
Create a client parameter.
bool createServerSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam ¶m)
Create a server socket.
PZmqSocket Socket
Define the socket of the backend used by the PAbstractSocketManager.
void mockToMsg(Message &msg, DataStreamMsg &mockMsg)
Copy mock message data into current backend message.
static Param server()
Create a server parameter.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
zmq::socket_t * p_socket
ZMQ Socket.
PZmqSocket()
Default constructor of PZmqSocket.
bool isConnected() const
Check if the socket is connected.
bool createClientSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a client socket.
PSendStatus::PSendStatus sendMsg(Message &msg, PSendFlag::PSendFlag flag=PSendFlag::BLOCK)
Send data with the socket.
void close()
Close the socket.
bool createServerSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a server socket.
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
virtual ~PZmqSocket()
Destructor of PZmqSocket.
PRecvStatus::PRecvStatus recvMsg(Message &msg, PRecvFlag::PRecvFlag flag=PRecvFlag::BLOCK)
Receive data with the socket.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Set of parameters to be passed to create a socket with zmq backend.
ssize_t dataRate
Data rate.
int nbBufferMessage
Number of messages in the buffer.
int bufferSizeByte
Size of the message buffer in bytes.
size_t threadAffinity
Mask of threads which deal with reconnection.