PhoenixZMQ  2.0.0
Library which integrates zeromq use in Phoenix
PZmqBackend.cpp
Go to the documentation of this file.
1 /***************************************
2  Auteur : Pierre Aubert
3  Mail : pierre.aubert@lapp.in2p3.fr
4  Licence : CeCILL-C
5 ****************************************/
6 
7 #include "PZmqBackend.h"
8 
10 
20 PZmqParam pzmq_createParamClient(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage,
21  int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
22 {
23  PZmqParam param;
24  param.address = address;
25  param.port = port;
26  param.context = context;
27  param.type = type;
28  param.nbBufferMessage = nbBufferMessage;
29  param.bufferSizeByte = bufferSizeByte;
30  param.threadAffinity = threadAffinity;
31  param.dataRate = dataRate;
32  return param;
33 }
34 
36 
46 PZmqParam pzmq_createParamServer(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage,
47  int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
48 {
49  PZmqParam param;
50  param.address = address;
51  param.port = port;
52  param.context = context;
53  param.type = type;
54  param.nbBufferMessage = nbBufferMessage;
55  param.bufferSizeByte = bufferSizeByte;
56  param.threadAffinity = threadAffinity;
57  param.dataRate = dataRate;
58  return param;
59 }
60 
63 
64 }
65 
67 
77 PZmqBackend::Param PZmqBackend::client(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate){
78  return pzmq_createParamClient(address, port, context, type, nbBufferMessage, bufferSizeByte, threadAffinity, dataRate);
79 }
80 
82 
92 PZmqBackend::Param PZmqBackend::server(const std::string & address, size_t port, zmq::context_t* context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate){
93  return pzmq_createParamServer(address, port, context, type, nbBufferMessage, bufferSizeByte, threadAffinity, dataRate);
94 }
95 
97 
103 bool PZmqBackend::createClientSocket(PZmqBackend::Socket & socket, const std::string & address, size_t port, const PZmqParam & param){
104  socket = pzmq_createClientSocket(*param.context, address, port, param.type, param.nbBufferMessage,
105  param.bufferSizeByte, param.threadAffinity, param.dataRate);
106  return socket != NULL;
107 }
108 
110 
116 bool PZmqBackend::createServerSocket(PZmqBackend::Socket & socket, const std::string & address, size_t port, const PZmqParam & param){
117  socket = pzmq_createServerSocket(*param.context, port, param.type, param.nbBufferMessage, param.bufferSizeByte,
118  param.threadAffinity, param.dataRate);
119  return socket != NULL;
120 }
121 
123 
128  socket = pzmq_createClientSocket(*param.context, param.address, param.port, param.type, param.nbBufferMessage,
129  param.bufferSizeByte, param.threadAffinity, param.dataRate);
130  return socket != NULL;
131 }
132 
134 
139  socket = pzmq_createServerSocket(*param.context, param.port, param.type, param.nbBufferMessage, param.bufferSizeByte,
140  param.threadAffinity, param.dataRate);
141  return socket != NULL;
142 }
143 
145 
149  if(flag == PSendFlag::NON_BLOCK){return zmq::send_flags::dontwait;}
150  else{return zmq::send_flags::none;}
151 }
152 
154 
160  if(socket != NULL){
161  zmq::send_result_t res = socket->send(msg, convertToSendFlag(flag));
162  return res.has_value(); //Seems, if there is no value, there is also no error
163  }else{
164  return false;
165  }
166 }
167 
169 
173  if(flag == PRecvFlag::NON_BLOCK){return zmq::recv_flags::dontwait;}
174  else{return zmq::recv_flags::none;}
175 }
176 
178 
184  if(socket != NULL){
185  zmq::recv_result_t res = socket->recv(msg, convertToRecvFlag(flag));
186  return res.has_value(); //Seems, if there is no value, there is also no error
187  }else{
188  return false;
189  }
190 }
191 
193 
196 void PZmqBackend::msgResize(PZmqBackend::Message & msg, size_t sizeMsg){
197  msg.rebuild(sizeMsg);
198 }
199 
201 
205  return msg.size();
206 }
207 
209 
213  return (const DataStreamIter)msg.data();
214 }
215 
217 
221  return (DataStreamIter)msg.data();
222 }
223 
225 
228  pzmq_closeServerSocket(socket);
229 }
230 
232 
236  if(socket != NULL){
237 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
238  return socket->handle() != NULL;
239 #else
240  return socket->connected();
241 #endif
242  }else{
243  return false;
244  }
245 }
246 
248 
252  size_t dataSize(PZmqBackend::msgSize(msg));
253  mockMsg.resize(dataSize);
254  memcpy(mockMsg.data(), PZmqBackend::msgData(msg), dataSize);
255 }
256 
258 
262  size_t dataSize(mockMsg.size());
263  PZmqBackend::msgResize(msg, dataSize);
264  memcpy(PZmqBackend::msgData(msg), mockMsg.data(), dataSize);
265 }
266 
267 
268 
269 
270 
zmq::send_flags convertToSendFlag(PSendFlag::PSendFlag flag)
Convert a send flag into zmq flag.
PZmqParam pzmq_createParamServer(const std::string &address, size_t port, zmq::context_t *context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
Create param for a client socket.
Definition: PZmqBackend.cpp:46
zmq::recv_flags convertToRecvFlag(PRecvFlag::PRecvFlag flag)
Convert a recv flag into zmq flag.
PZmqParam pzmq_createParamClient(const std::string &address, size_t port, zmq::context_t *context, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
Create param for a client socket.
Definition: PZmqBackend.cpp:20
static void close(Socket &socket)
Close the given socket.
static Param server(const std::string &address, size_t port, zmq::context_t *context=NULL, int type=ZMQ_PUSH, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l)
Create a server parameter.
Definition: PZmqBackend.cpp:92
static Param client(const std::string &address, size_t port, zmq::context_t *context=NULL, int type=ZMQ_PULL, int nbBufferMessage=10000, int bufferSizeByte=1000000, size_t threadAffinity=0lu, ssize_t dataRate=200000l)
Create a client parameter.
Definition: PZmqBackend.cpp:77
static size_t msgSize(const Message &msg)
Get the size of a message.
PZmqBackend()
Default constructor of PZmqBackend.
Definition: PZmqBackend.cpp:62
static void msgToMock(DataStreamMsg &mockMsg, const Message &msg)
Copy current backend message data into mock message.
static void msgResize(Message &msg, size_t sizeMsg)
Resize a message.
static bool createClientSocket(Socket &socket, const std::string &address, size_t port, const PZmqParam &param)
Create a client socket.
static bool recv(Socket &socket, Message &msg, PRecvFlag::PRecvFlag flag)
Recieve message from the given socket.
static const DataStreamIter msgData(const Message &msg)
Get the data of a message.
static void mockToMsg(Message &msg, DataStreamMsg &mockMsg)
Copy mock message data into current backend message.
zmq::socket_t * Socket
Define the socket of the backend used by the PAbstractSocketManager.
Definition: PZmqBackend.h:41
static bool isConnected(const Socket &socket)
Close the given socket.
static bool send(Socket &socket, Message &msg, PSendFlag::PSendFlag flag)
Send message on the given socket.
zmq::message_t Message
Define the type of message used by the PAbstractSocketManager.
Definition: PZmqBackend.h:43
static bool createServerSocket(Socket &socket, const std::string &address, size_t port, const PZmqParam &param)
Create a server socket.
std::vector< DataStreamType > DataStreamMsg
DataStreamType * DataStreamIter
PRecvFlag
describe the recieving flag of the Socket
Definition: PSocketFlag.h:20
PSendFlag
describe the sending flag of the Socket
Definition: PSocketFlag.h:12
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
Definition: phoenix_zmq.cpp:48
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Definition: phoenix_zmq.cpp:18
void pzmq_closeServerSocket(zmq::socket_t *&socket)
Close the given server socket.
Set of parameters to be passed to create a socket with zmq backend.
Definition: PZmqBackend.h:15
zmq::context_t * context
Context.
Definition: PZmqBackend.h:17
ssize_t dataRate
Data rate.
Definition: PZmqBackend.h:31
int nbBufferMessage
Number of messages in the buffer.
Definition: PZmqBackend.h:25
int bufferSizeByte
Size of the message buffer in bytes.
Definition: PZmqBackend.h:27
size_t port
Connection port.
Definition: PZmqBackend.h:21
int type
Socket type.
Definition: PZmqBackend.h:23
size_t threadAffinity
Mask of threads which deal with reconnection.
Definition: PZmqBackend.h:29
std::string address
Host address.
Definition: PZmqBackend.h:19