22 return PSendStatus::OK;
24 int err = zmq_errno();
26 return PSendStatus::SOCKET_NOT_AVAILABLE;
28 std::cerr <<
"Unknown ZMQ error in send: '" << err <<
"'"<< std::endl;
29 return PSendStatus::BROKEN_BACKEND;
45 return PRecvStatus::OK;
47 int err = zmq_errno();
49 return PRecvStatus::NO_MESSAGE_RECEIVED;
51 return PRecvStatus::BROKEN_SOCKET;
61 if(flag == PSendFlag::NON_BLOCK){
return zmq::send_flags::dontwait;}
62 else{
return zmq::send_flags::none;}
70 if(flag == PRecvFlag::NON_BLOCK){
return zmq::recv_flags::dontwait;}
71 else{
return zmq::recv_flags::none;}
136#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
138 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
141 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut,
sizeof(
int));
142 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut,
sizeof(
int));
157#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
158 p_socket->set(zmq::sockopt::rcvtimeo, socketParam.recvTimeOut);
159 p_socket->set(zmq::sockopt::sndtimeo, socketParam.sendTimeOut);
162 p_socket->setsockopt(ZMQ_RCVTIMEO, &socketParam.recvTimeOut,
sizeof(
int));
163 p_socket->setsockopt(ZMQ_SNDTIMEO, &socketParam.sendTimeOut,
sizeof(
int));
178 }
catch(
const zmq::error_t& e) {
179 if(e.num() == ENOTSOCK){
180 return PSendStatus::BROKEN_SOCKET;
181 }
else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
182 return PSendStatus::BROKEN_BACKEND;
183 }
else if(e.num() == EINTR){
184 return PSendStatus::SIGNAL_INTERRUPTION;
185 }
else if(e.num() == EHOSTUNREACH){
186 return PSendStatus::NO_ROUTE_TO_RECEIVER;
188 std::cerr <<
"Unknown ZMQ error in send: " << e.what() << std::endl;
189 return PSendStatus::BROKEN_BACKEND;
203 }
catch(
const zmq::error_t& e){
204 if(e.num() == ENOTSOCK){
205 std::cerr <<
"ZMQ error in recv: " << e.what() << std::endl;
206 return PRecvStatus::BROKEN_SOCKET;
208 else if(e.num() == ENOTSUP || e.num() == EFSM || e.num() == ETERM || e.num() == EINVAL){
209 std::cerr <<
"ZMQ error in recv: " << e.what() << std::endl;
210 return PRecvStatus::BROKEN_BACKEND;
212 else if(e.num() == EINTR){
213 std::cerr <<
"ZMQ error in recv: " << e.what() << std::endl;
214 return PRecvStatus::SIGNAL_INTERRUPTION;
217 std::cerr <<
"Unknown ZMQ error in recv: " << e.what() << std::endl;
218 return PRecvStatus::BROKEN_BACKEND;
289 size_t dataSize(msg.size());
290 mockMsg.resize(dataSize);
291 memcpy(mockMsg.data(), (
const void*)msg.data(), dataSize);
299 size_t dataSize(mockMsg.size());
300 msg.rebuild(dataSize);
301 memcpy((
void*)msg.data(), mockMsg.data(), dataSize);
zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag)
Convert a send flag into zmq flag.
zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag)
Convert a recv flag into zmq flag.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger)
Create param for a client socket.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate, int linger)
Create param for a client socket.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
PRecvStatus::PRecvStatus checkRecvStatus(zmq::recv_result_t res)
Check the recv result and convert it into PRecvStatus.
PZmqParam pzmq_createParamServer(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l, int linger=-1)
Create param for a client socket.
PZmqParam pzmq_createParamClient(int type, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l, int linger=-1)
Create param for a client socket.
PSendStatus::PSendStatus checkSendStatus(zmq::send_result_t res)
Check the send result and convert it into PSendStatus.
zmq::context_t p_context
Context ZMQ.
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
PZmqSocket Socket
Define the socket of the backend used by the PAbstractSocketManager.
bool createServerSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam ¶m)
Create a server socket.
bool createClientSocket(Socket &socket, const PSocketParam &socketParam, const PZmqParam ¶m)
Create a client socket.
static void mockToMsg(Message &msg, DataStreamMsg &mockMsg)
Copy mock message data into current backend message.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
PZmqSocketGenerator()
Default constructor of PZmqSocketGenerator setting the number of threads for zmq I/O to 1.
static void msgToMock(DataStreamMsg &mockMsg, const Message &msg)
Copy current backend message data into mock message.
static Param server()
Create a server parameter.
static Param client()
Create a client parameter.
zmq::socket_t * p_socket
ZMQ Socket.
PZmqSocket()
Default constructor of PZmqSocket.
bool isConnected() const
Check if the socket is connected.
bool createClientSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a client socket.
PSendStatus::PSendStatus sendMsg(Message &msg, PSendFlag::PSendFlag flag=PSendFlag::BLOCK)
Send data with the socket.
void close()
Close the socket.
bool createServerSocket(zmq::context_t &context, const PSocketParam &socketParam, const Param &extraParam)
Create a server socket.
PZmqParam Param
Define the type of extra parameters which can be used to create a Socket used by the PAbstractSocketM...
virtual ~PZmqSocket()
Destructor of PZmqSocket.
PRecvStatus::PRecvStatus recvMsg(Message &msg, PRecvFlag::PRecvFlag flag=PRecvFlag::BLOCK)
Receive data with the socket.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Set of parameters to be passed to create a socket with zmq backend.
ssize_t dataRate
Data rate.
int nbBufferMessage
Number of messages in the buffer.
int bufferSizeByte
Size of the message buffer in bytes.
int linger
linger period for socket shutdown
size_t threadAffinity
Mask of threads which deal with reconnection.