Line |
Branch |
Exec |
Source |
1 |
|
|
/*************************************** |
2 |
|
|
Auteur : Pierre Aubert |
3 |
|
|
Mail : pierre.aubert@lapp.in2p3.fr |
4 |
|
|
Licence : CeCILL-C |
5 |
|
|
****************************************/ |
6 |
|
|
|
7 |
|
|
#include <sstream> |
8 |
|
|
#include "phoenix_zmq.h" |
9 |
|
|
|
10 |
|
|
|
11 |
|
|
///Create a client socket to be used by the SocketManagerZMQ |
12 |
|
|
/** @param context : zeromq context which defines the number of thread to be used in the data transfert |
13 |
|
|
* @param type : type of the socket (ZMQ_PULL, ZMQ_PUSH, etc) |
14 |
|
|
* @param address : address of the socket (example localhost or 127.0.0.1) |
15 |
|
|
* @param port : port to be used |
16 |
|
|
* @return create socket |
17 |
|
|
*/ |
18 |
|
✗ |
zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, int type, const std::string & address, size_t port){ |
19 |
|
✗ |
std::stringstream socketAddressData; |
20 |
|
✗ |
socketAddressData << "tcp://" << address <<":" << port; |
21 |
|
✗ |
zmq::socket_t *socket = new zmq::socket_t(context, type); |
22 |
|
✗ |
socket->connect(socketAddressData.str()); |
23 |
|
|
|
24 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
25 |
|
✗ |
socket->set(zmq::sockopt::linger, 1); //1 ms to stop |
26 |
|
|
#else |
27 |
|
|
socket->setsockopt(ZMQ_LINGER, 1); //1 ms to stop |
28 |
|
|
#endif |
29 |
|
✗ |
if(type == ZMQ_SUB){ |
30 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
31 |
|
✗ |
socket->set(zmq::sockopt::subscribe, ""); |
32 |
|
✗ |
socket->set(zmq::sockopt::conflate, 1); |
33 |
|
|
#else |
34 |
|
|
socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); |
35 |
|
|
int conflate(1); |
36 |
|
|
socket->setsockopt(ZMQ_CONFLATE, &conflate, sizeof(int)); |
37 |
|
|
#endif |
38 |
|
|
} |
39 |
|
✗ |
return socket; |
40 |
|
|
} |
41 |
|
|
|
42 |
|
|
///Create a server socket to be used by the SocketManagerZMQ |
43 |
|
|
/** @param context : zeromq context which defines the number of thread to be used in the data transfert |
44 |
|
|
* @param type : type of the socket (ZMQ_PULL, ZMQ_PUSH, etc) |
45 |
|
|
* @param port : port to be used |
46 |
|
|
* @return create socket |
47 |
|
|
*/ |
48 |
|
✗ |
zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, int type, size_t port){ |
49 |
|
✗ |
std::stringstream socketAddressData; |
50 |
|
✗ |
socketAddressData << "tcp://*:" << port; |
51 |
|
✗ |
zmq::socket_t *socket = new zmq::socket_t(context, type); |
52 |
|
✗ |
socket->bind(socketAddressData.str()); |
53 |
|
✗ |
return socket; |
54 |
|
|
} |
55 |
|
|
|
56 |
|
|
///Add a server socket to the manager |
57 |
|
|
/** @param context : zmq context where to create socket |
58 |
|
|
* @param port : port to be used |
59 |
|
|
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) |
60 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
61 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
62 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
63 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
64 |
|
|
* @return zmq socket |
65 |
|
|
*/ |
66 |
|
✗ |
zmq::socket_t* pzmq_createServerSocket(zmq::context_t & context, size_t port, int type, int nbBufferMessage, int bufferSizeByte, |
67 |
|
|
size_t threadAffinity, ssize_t dataRate) |
68 |
|
|
{ |
69 |
|
✗ |
zmq::socket_t* socket = pzmq_createServerSocket(context, type, port); |
70 |
|
✗ |
bool b(socket != NULL); |
71 |
|
✗ |
if(b){ |
72 |
|
✗ |
pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte); |
73 |
|
✗ |
pzmq_setThreadAffinity(socket, threadAffinity); |
74 |
|
|
} |
75 |
|
✗ |
return socket; |
76 |
|
|
} |
77 |
|
|
|
78 |
|
|
|
79 |
|
|
///Add a client socket to the manager |
80 |
|
|
/** @param context : zmq context where to create socket |
81 |
|
|
* @param address : address of the server to be connected to |
82 |
|
|
* @param port : port to be used |
83 |
|
|
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc) |
84 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
85 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
86 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
87 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
88 |
|
|
* @return zmq socket |
89 |
|
|
*/ |
90 |
|
✗ |
zmq::socket_t* pzmq_createClientSocket(zmq::context_t & context, const std::string & address, size_t port, int type, int nbBufferMessage, |
91 |
|
|
int bufferSizeByte, size_t threadAffinity, ssize_t dataRate) |
92 |
|
|
{ |
93 |
|
✗ |
zmq::socket_t* socket = pzmq_createClientSocket(context, type, address, port); |
94 |
|
✗ |
bool b(socket != NULL); |
95 |
|
✗ |
if(b){ |
96 |
|
✗ |
pzmq_setBufferSize(socket, type, nbBufferMessage, dataRate, bufferSizeByte); |
97 |
|
✗ |
pzmq_setThreadAffinity(socket, threadAffinity); |
98 |
|
|
} |
99 |
|
✗ |
return socket; |
100 |
|
|
} |
101 |
|
|
|
102 |
|
|
///Close the given server socket |
103 |
|
|
/** @param[out] socket : pointer to the server socket to be closed (will be set to NULL at then end of the function) |
104 |
|
|
*/ |
105 |
|
✗ |
void pzmq_closeServerSocket(zmq::socket_t *& socket){ |
106 |
|
✗ |
if(socket != NULL){ |
107 |
|
✗ |
delete socket; |
108 |
|
✗ |
socket = NULL; |
109 |
|
|
} |
110 |
|
|
} |
111 |
|
|
|
112 |
|
|
///Set the number of messages in the messages buffer |
113 |
|
|
/** @param[out] socket : socket to be modified |
114 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
115 |
|
|
*/ |
116 |
|
✗ |
void pzmq_setNbMessageBuffer(zmq::socket_t* socket, int nbBufferMessage){ |
117 |
|
✗ |
if(socket != NULL){ |
118 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
119 |
|
✗ |
socket->set(zmq::sockopt::rcvhwm, nbBufferMessage); |
120 |
|
|
#else |
121 |
|
|
//See doc at http://api.zeromq.org/3-1:zmq-setsockopt |
122 |
|
|
socket->setsockopt(ZMQ_RCVHWM, &nbBufferMessage, sizeof(int)); |
123 |
|
|
socket->setsockopt(ZMQ_SNDHWM, &nbBufferMessage, sizeof(int)); |
124 |
|
|
#endif |
125 |
|
|
} |
126 |
|
|
} |
127 |
|
|
|
128 |
|
|
///Set the data rate of the socket |
129 |
|
|
/** @param[out] socket : socket to be modified |
130 |
|
|
* @param type : type of the socket to be used |
131 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
132 |
|
|
*/ |
133 |
|
✗ |
void pzmq_setDataRate(zmq::socket_t* socket, int type, int dataRate){ |
134 |
|
✗ |
if(socket != NULL && (type == ZMQ_PUB || type == ZMQ_SUB)){ |
135 |
|
✗ |
int dataRateKbit(dataRate*8l); |
136 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
137 |
|
✗ |
socket->set(zmq::sockopt::rate, dataRateKbit); |
138 |
|
|
#else |
139 |
|
|
//See doc at http://api.zeromq.org/3-1:zmq-setsockopt |
140 |
|
|
socket->setsockopt(ZMQ_RATE, dataRateKbit); |
141 |
|
|
#endif |
142 |
|
|
} |
143 |
|
|
} |
144 |
|
|
|
145 |
|
|
///Set the size of the buffer to recieved messages |
146 |
|
|
/** @param[out] socket : socket to be modified |
147 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
148 |
|
|
*/ |
149 |
|
✗ |
void pzmq_setRecvBufferSize(zmq::socket_t* socket, int bufferSizeByte){ |
150 |
|
✗ |
if(socket != NULL){ |
151 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
152 |
|
✗ |
socket->set(zmq::sockopt::rcvbuf, bufferSizeByte); |
153 |
|
|
#else |
154 |
|
|
//See doc at http://api.zeromq.org/3-1:zmq-setsockopt |
155 |
|
|
socket->setsockopt(ZMQ_RCVBUF, bufferSizeByte); |
156 |
|
|
#endif |
157 |
|
|
} |
158 |
|
|
} |
159 |
|
|
|
160 |
|
|
///Set the size of the buffer to send messages |
161 |
|
|
/** @param[out] socket : socket to be modified |
162 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
163 |
|
|
*/ |
164 |
|
✗ |
void pzmq_setSendBufferSize(zmq::socket_t* socket, int bufferSizeByte){ |
165 |
|
✗ |
if(socket != NULL){ |
166 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
167 |
|
✗ |
socket->set(zmq::sockopt::sndbuf, bufferSizeByte); |
168 |
|
|
#else |
169 |
|
|
//See doc at http://api.zeromq.org/3-1:zmq-setsockopt |
170 |
|
|
socket->setsockopt(ZMQ_SNDBUF, bufferSizeByte); |
171 |
|
|
#endif |
172 |
|
|
} |
173 |
|
|
} |
174 |
|
|
|
175 |
|
|
///Set the thread affinity of zmq |
176 |
|
|
/** @param socket : socket to be modified |
177 |
|
|
* @param threadAffinity : bit mask which determines which threads from the 0MQ I/O thread pool associated with the socket's context shall handle newly created connections (1 : means first, 2 : means second, 3 : means first and second, etc) |
178 |
|
|
*/ |
179 |
|
✗ |
void pzmq_setThreadAffinity(zmq::socket_t* socket, size_t threadAffinity){ |
180 |
|
✗ |
if(socket != NULL){ |
181 |
|
|
#if (CPPZMQ_VERSION_MAJOR*100 + CPPZMQ_VERSION_MINOR*10 + CPPZMQ_VERSION_PATCH) >= 471 |
182 |
|
✗ |
socket->set(zmq::sockopt::affinity, threadAffinity); |
183 |
|
|
#else |
184 |
|
|
//See doc at http://api.zeromq.org/3-1:zmq-setsockopt |
185 |
|
|
socket->setsockopt(ZMQ_AFFINITY, threadAffinity); |
186 |
|
|
#endif |
187 |
|
|
} |
188 |
|
|
} |
189 |
|
|
|
190 |
|
|
///Set the size of the buffer to send messages |
191 |
|
|
/** @param[out] socket : socket to be modified |
192 |
|
|
* @param type : type of the socket to be used |
193 |
|
|
* @param nbBufferMessage : number of messages to be buffered |
194 |
|
|
* @param dataRate : expected data rate (in kilobytes per second) |
195 |
|
|
* @param bufferSizeByte : size of the zmq buffer in bytes |
196 |
|
|
*/ |
197 |
|
✗ |
void pzmq_setBufferSize(zmq::socket_t* socket, int type, int nbBufferMessage, int dataRate, size_t bufferSizeByte){ |
198 |
|
✗ |
pzmq_setNbMessageBuffer(socket, nbBufferMessage); |
199 |
|
✗ |
pzmq_setDataRate(socket, type, dataRate); |
200 |
|
✗ |
if(type == ZMQ_PULL || type == ZMQ_SUB){ |
201 |
|
✗ |
pzmq_setRecvBufferSize(socket, bufferSizeByte); |
202 |
|
✗ |
}else if(type == ZMQ_PUSH || type == ZMQ_PUB){ |
203 |
|
✗ |
pzmq_setSendBufferSize(socket, bufferSizeByte); |
204 |
|
|
} |
205 |
|
|
} |
206 |
|
|
|
207 |
|
|
|