PhoenixZMQ  2.0.0
Library which integrates zeromq use in Phoenix
phoenix_zmq.cpp File Reference
#include <sstream>
#include "phoenix_zmq.h"
+ Include dependency graph for phoenix_zmq.cpp:

Go to the source code of this file.

Functions

void pzmq_closeServerSocket (zmq::socket_t *&socket)
 Close the given server socket. More...
 
zmq::socket_t * pzmq_createClientSocket (zmq::context_t &context, const std::string &address, size_t port, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
 Add a client socket to the manager. More...
 
zmq::socket_t * pzmq_createClientSocket (zmq::context_t &context, int type, const std::string &address, size_t port)
 Create a client socket to be used by the SocketManagerZMQ. More...
 
zmq::socket_t * pzmq_createServerSocket (zmq::context_t &context, int type, size_t port)
 Create a server socket to be used by the SocketManagerZMQ. More...
 
zmq::socket_t * pzmq_createServerSocket (zmq::context_t &context, size_t port, int type, int nbBufferMessage, int bufferSizeByte, size_t threadAffinity, ssize_t dataRate)
 Add a server socket to the manager. More...
 
void pzmq_setBufferSize (zmq::socket_t *socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte)
 Set the size of the buffer to send messages. More...
 
void pzmq_setDataRate (zmq::socket_t *socket, int type, int dataRate)
 Set the data rate of the socket. More...
 
void pzmq_setNbMessageBuffer (zmq::socket_t *socket, int nbBufferMessage)
 Set the number of messages in the messages buffer. More...
 
void pzmq_setRecvBufferSize (zmq::socket_t *socket, int bufferSizeByte)
 Set the size of the buffer to recieved messages. More...
 
void pzmq_setSendBufferSize (zmq::socket_t *socket, int bufferSizeByte)
 Set the size of the buffer to send messages. More...
 
void pzmq_setThreadAffinity (zmq::socket_t *socket, size_t threadAffinity)
 Set the thread affinity of zmq. More...
 

Function Documentation

◆ pzmq_closeServerSocket()

void pzmq_closeServerSocket ( zmq::socket_t *&  socket)

Close the given server socket.

Parameters
[out]socket: pointer to the server socket to be closed (will be set to NULL at then end of the function)

Definition at line 105 of file phoenix_zmq.cpp.

105  {
106  if(socket != NULL){
107  delete socket;
108  socket = NULL;
109  }
110 }

Referenced by PZmqBackend::close().

+ Here is the caller graph for this function:

◆ pzmq_createClientSocket() [1/2]

zmq::socket_t* pzmq_createClientSocket ( zmq::context_t &  context,
const std::string &  address,
size_t  port,
int  type,
int  nbBufferMessage,
int  bufferSizeByte,
size_t  threadAffinity,
ssize_t  dataRate 
)

Add a client socket to the manager.

Parameters
context: zmq context where to create socket
address: address of the server to be connected to
port: port to be used
type: type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
nbBufferMessage: number of messages to be buffered
bufferSizeByte: size of the zmq buffer in bytes
threadAffinity: bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
dataRate: expected data rate (in kilobytes per second)
Returns
zmq socket

Definition at line 90 of file phoenix_zmq.cpp.

92 {
93  zmq::socket_t* socket = pzmq_createClientSocket(context, type, address, port);
94  bool b(socket != NULL);
95  if(b){
96  pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
97  pzmq_setThreadAffinity(socket, threadAffinity);
98  }
99  return socket;
100 }
zmq::socket_t * pzmq_createClientSocket(zmq::context_t &context, int type, const std::string &address, size_t port)
Create a client socket to be used by the SocketManagerZMQ.
Definition: phoenix_zmq.cpp:18
void pzmq_setThreadAffinity(zmq::socket_t *socket, size_t threadAffinity)
Set the thread affinity of zmq.
void pzmq_setBufferSize(zmq::socket_t *socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte)
Set the size of the buffer to send messages.

References pzmq_createClientSocket(), pzmq_setBufferSize(), and pzmq_setThreadAffinity().

+ Here is the call graph for this function:

◆ pzmq_createClientSocket() [2/2]

zmq::socket_t* pzmq_createClientSocket ( zmq::context_t &  context,
int  type,
const std::string &  address,
size_t  port 
)

Create a client socket to be used by the SocketManagerZMQ.

Parameters
context: zeromq context which defines the number of thread to be used in the data transfert
type: type of the socket (ZMQ_PULL, ZMQ_PUSH, etc)
address: address of the socket (example localhost or 127.0.0.1)
port: port to be used
Returns
create socket

Definition at line 18 of file phoenix_zmq.cpp.

18  {
19  std::stringstream socketAddressData;
20  socketAddressData << "tcp://" << address <<":" << port;
21  zmq::socket_t *socket = new zmq::socket_t(context, type);
22  socket->connect(socketAddressData.str());
23 
24 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
25  socket->set(zmq::sockopt::linger, 1); //1 ms to stop
26 #else
27  socket->setsockopt(ZMQ_LINGER, 1); //1 ms to stop
28 #endif
29  if(type == ZMQ_SUB){
30 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
31  socket->set(zmq::sockopt::subscribe, "");
32  socket->set(zmq::sockopt::conflate, 1);
33 #else
34  socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
35  int conflate(1);
36  socket->setsockopt(ZMQ_CONFLATE, &conflate, sizeof(int));
37 #endif
38  }
39  return socket;
40 }

Referenced by PZmqBackend::createClientSocket(), and pzmq_createClientSocket().

+ Here is the caller graph for this function:

◆ pzmq_createServerSocket() [1/2]

zmq::socket_t* pzmq_createServerSocket ( zmq::context_t &  context,
int  type,
size_t  port 
)

Create a server socket to be used by the SocketManagerZMQ.

Parameters
context: zeromq context which defines the number of thread to be used in the data transfert
type: type of the socket (ZMQ_PULL, ZMQ_PUSH, etc)
port: port to be used
Returns
create socket

Definition at line 48 of file phoenix_zmq.cpp.

48  {
49  std::stringstream socketAddressData;
50  socketAddressData << "tcp://*:" << port;
51  zmq::socket_t *socket = new zmq::socket_t(context, type);
52  socket->bind(socketAddressData.str());
53  return socket;
54 }

Referenced by PZmqBackend::createServerSocket(), and pzmq_createServerSocket().

+ Here is the caller graph for this function:

◆ pzmq_createServerSocket() [2/2]

zmq::socket_t* pzmq_createServerSocket ( zmq::context_t &  context,
size_t  port,
int  type,
int  nbBufferMessage,
int  bufferSizeByte,
size_t  threadAffinity,
ssize_t  dataRate 
)

Add a server socket to the manager.

Parameters
context: zmq context where to create socket
port: port to be used
type: type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
nbBufferMessage: number of messages to be buffered
bufferSizeByte: size of the zmq buffer in bytes
threadAffinity: bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)
dataRate: expected data rate (in kilobytes per second)
Returns
zmq socket

Definition at line 66 of file phoenix_zmq.cpp.

68 {
69  zmq::socket_t* socket = pzmq_createServerSocket(context, type, port);
70  bool b(socket != NULL);
71  if(b){
72  pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte);
73  pzmq_setThreadAffinity(socket, threadAffinity);
74  }
75  return socket;
76 }
zmq::socket_t * pzmq_createServerSocket(zmq::context_t &context, int type, size_t port)
Create a server socket to be used by the SocketManagerZMQ.
Definition: phoenix_zmq.cpp:48

References pzmq_createServerSocket(), pzmq_setBufferSize(), and pzmq_setThreadAffinity().

+ Here is the call graph for this function:

◆ pzmq_setBufferSize()

void pzmq_setBufferSize ( zmq::socket_t *  socket,
int  type,
int  nbBufferMessage,
int  dataRate,
size_t  bufferSizeByte 
)

Set the size of the buffer to send messages.

Parameters
[out]socket: socket to be modified
type: type of the socket to be used
nbBufferMessage: number of messages to be buffered
dataRate: expected data rate (in kilobytes per second)
bufferSizeByte: size of the zmq buffer in bytes

Definition at line 197 of file phoenix_zmq.cpp.

197  {
198  pzmq_setNbMessageBuffer(socket, nbBufferMessage);
199  pzmq_setDataRate(socket, type, dataRate);
200  if(type == ZMQ_PULL || type == ZMQ_SUB){
201  pzmq_setRecvBufferSize(socket, bufferSizeByte);
202  }else if(type == ZMQ_PUSH || type == ZMQ_PUB){
203  pzmq_setSendBufferSize(socket, bufferSizeByte);
204  }
205 }
void pzmq_setSendBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to send messages.
void pzmq_setRecvBufferSize(zmq::socket_t *socket, int bufferSizeByte)
Set the size of the buffer to recieved messages.
void pzmq_setDataRate(zmq::socket_t *socket, int type, int dataRate)
Set the data rate of the socket.
void pzmq_setNbMessageBuffer(zmq::socket_t *socket, int nbBufferMessage)
Set the number of messages in the messages buffer.

References pzmq_setDataRate(), pzmq_setNbMessageBuffer(), pzmq_setRecvBufferSize(), and pzmq_setSendBufferSize().

Referenced by pzmq_createClientSocket(), and pzmq_createServerSocket().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ pzmq_setDataRate()

void pzmq_setDataRate ( zmq::socket_t *  socket,
int  type,
int  dataRate 
)

Set the data rate of the socket.

Parameters
[out]socket: socket to be modified
type: type of the socket to be used
dataRate: expected data rate (in kilobytes per second)

Definition at line 133 of file phoenix_zmq.cpp.

133  {
134  if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){
135  int dataRateKbit(dataRate*8l);
136 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
137  socket->set(zmq::sockopt::rate, dataRateKbit);
138 #else
139  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
140  socket->setsockopt(ZMQ_RATE, dataRateKbit);
141 #endif
142  }
143 }

Referenced by pzmq_setBufferSize().

+ Here is the caller graph for this function:

◆ pzmq_setNbMessageBuffer()

void pzmq_setNbMessageBuffer ( zmq::socket_t *  socket,
int  nbBufferMessage 
)

Set the number of messages in the messages buffer.

Parameters
[out]socket: socket to be modified
nbBufferMessage: number of messages to be buffered

Definition at line 116 of file phoenix_zmq.cpp.

116  {
117  if(socket != NULL){
118 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
119  socket->set(zmq::sockopt::rcvhwm, nbBufferMessage);
120 #else
121  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
122  socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage, sizeof(int));
123  socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage, sizeof(int));
124 #endif
125  }
126 }

Referenced by pzmq_setBufferSize().

+ Here is the caller graph for this function:

◆ pzmq_setRecvBufferSize()

void pzmq_setRecvBufferSize ( zmq::socket_t *  socket,
int  bufferSizeByte 
)

Set the size of the buffer to recieved messages.

Parameters
[out]socket: socket to be modified
bufferSizeByte: size of the zmq buffer in bytes

Definition at line 149 of file phoenix_zmq.cpp.

149  {
150  if(socket != NULL){
151 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
152  socket->set(zmq::sockopt::rcvbuf, bufferSizeByte);
153 #else
154  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
155  socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte);
156 #endif
157  }
158 }

Referenced by pzmq_setBufferSize().

+ Here is the caller graph for this function:

◆ pzmq_setSendBufferSize()

void pzmq_setSendBufferSize ( zmq::socket_t *  socket,
int  bufferSizeByte 
)

Set the size of the buffer to send messages.

Parameters
[out]socket: socket to be modified
bufferSizeByte: size of the zmq buffer in bytes

Definition at line 164 of file phoenix_zmq.cpp.

164  {
165  if(socket != NULL){
166 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
167  socket->set(zmq::sockopt::sndbuf, bufferSizeByte);
168 #else
169  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
170  socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte);
171 #endif
172  }
173 }

Referenced by pzmq_setBufferSize().

+ Here is the caller graph for this function:

◆ pzmq_setThreadAffinity()

void pzmq_setThreadAffinity ( zmq::socket_t *  socket,
size_t  threadAffinity 
)

Set the thread affinity of zmq.

Parameters
socket: socket to be modified
threadAffinity: bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc)

Definition at line 179 of file phoenix_zmq.cpp.

179  {
180  if(socket != NULL){
181 #if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471
182  socket->set(zmq::sockopt::affinity, threadAffinity);
183 #else
184  //See doc at http://api.zeromq.org/3-1:zmq-setsockopt
185  socket->setsockopt(ZMQ_AFFINITY, threadAffinity);
186 #endif
187  }
188 }

Referenced by pzmq_createClientSocket(), and pzmq_createServerSocket().

+ Here is the caller graph for this function: