BmnRoot
Loading...
Searching...
No Matches
BmnDataReceiver.cxx
Go to the documentation of this file.
1/*
2 * To change this license header, choose License Headers in Project Properties.
3 * To change this template file, choose Tools | Templates
4 * and open the template in the editor.
5 */
6
7/*
8 * File: BmnDataReceiver.cxx
9 * Author: ilnur
10 *
11 * Created on October 18, 2016, 5:22 PM
12 */
13
14
15#include "BmnDataReceiver.h"
16#include <zmq.h>
17#include <signal.h>
18#include <netinet/in.h>
19#include <arpa/inet.h>
20#include <sys/types.h>
21#include <sys/socket.h>
22#include </usr/include/netdb.h>
23#include <vector>
24#include <thread>
25#include <string>
26#include "libxml/tree.h"
27//for old fairsoft
28#define ZMQ_XSUB 10
29#define ZMQ_STREAM 11
30#define ZMQ_ROUTER_RAW 41
31
33 //gSystem->Load("libxml2");
34 // gSystem->Load("libzmq");
35 InitSocks();
36}
37
39 DeinitSocks();
40}
41
42void BmnDataReceiver::InitSocks() //:
43//_ctx(1),
44//_socket_mcast(_ctx, ZMQ_SUB)
45{
46 _ctx = zmq_ctx_new();
47 _socket_mcast = zmq_socket(_ctx, ZMQ_XSUB);
48 _socket_data = zmq_socket(_ctx, ZMQ_STREAM);
49}
50
51void BmnDataReceiver::DeinitSocks() {
52 //_socket_mcast.close();
53 zmq_close(_socket_mcast);
54 zmq_close(_socket_data);
55 zmq_ctx_destroy(_ctx);
56}
57
58/*void BmnDataReceiver::HandleSignal(int signal){
59 switch (signal)
60 {
61 case SIGINT:
62 {
63 isListening = kFALSE;
64 printf("SIGINT received\n");
65 break;
66 }
67 default:
68 {
69
70 }
71 }
72}*/
73
74int BmnDataReceiver::ParsePNPMsg(Char_t* msgStr, serverInfo* sInfo) {
75 xmlDocPtr doc = xmlParseDoc((xmlChar*) msgStr);
76 xmlNodePtr root = xmlDocGetRootElement(doc);
77 xmlNodePtr cur_node = root;
78 while (cur_node) {
79 if ((cur_node->type == XML_ELEMENT_NODE) || (cur_node->type == XML_TEXT_NODE)) {
80 if (strcmp((Char_t*) cur_node->name, "program") == 0) {
81 strcpy(sInfo->hostName, (Char_t*) xmlGetProp(cur_node, (xmlChar*) "hostName"));
82 strcpy(sInfo->index, (Char_t*) xmlGetProp(cur_node, (xmlChar*) "index"));
83 strcpy(sInfo->name, (Char_t*) xmlGetProp(cur_node, (xmlChar*) "name"));
84 strcpy(sInfo->type, (Char_t*) xmlGetProp(cur_node, (xmlChar*) "type"));
85 cur_node = cur_node->children;
86 continue;
87 }
88 if (strcmp((Char_t*) cur_node->name, "interfaces") == 0) {
89 sInfo->interfaces.clear();
90 cur_node = cur_node->children;
91 continue;
92 }
93 if (strcmp((Char_t*) cur_node->name, "interface") == 0) {
94 serverIface newPort;
95 Char_t* portStr = (Char_t*) xmlGetProp(cur_node, (xmlChar*) "port");
96 if (portStr != NULL)
97 newPort.port = atoi(portStr);
98 newPort.isFree = strcmp(((Char_t*) xmlGetProp(cur_node, (xmlChar*) "isFree")), "1") == 0 ? 1 : 0;
99 newPort.enabled = strcmp(((Char_t*) xmlGetProp(cur_node, (xmlChar*) "enabled")), "1") == 0 ? 1 : 0;
100 strcpy(newPort.type, (Char_t*) xmlGetProp(cur_node, (xmlChar*) "type"));
101 //if (newPort.type == "data flow")
102 sInfo->interfaces.push_back(newPort);
103 if (cur_node->next == NULL)
104 cur_node = cur_node->parent->next;
105 else
106 cur_node = cur_node->next;
107 continue;
108 }
109 cur_node = cur_node->next;
110 }
111 }
112 xmlFreeDoc(doc);
113 return 0;
114
115}
116
118 DBG("started")
119 socklen_t addrlen = 0;
120 struct ip_mreq mreq;
121 mreq.imr_interface.s_addr = htons(INADDR_ANY);
122 mreq.imr_multiaddr.s_addr = inet_addr(PNP_DISCOVER_IP_ADDR);
123 struct sockaddr_in mcast_addr;
124 mcast_addr.sin_family = AF_INET;
125 mcast_addr.sin_port = htons(PNP_DISCOVER_PORT);
126 mcast_addr.sin_addr.s_addr = htonl(INADDR_ANY);
127 memset(&mcast_addr.sin_zero, 0, sizeof (mcast_addr.sin_zero));
128
129 _sfd = socket(AF_INET, SOCK_DGRAM, 0);
130 if (_sfd == -1) {
131 fprintf(stderr, "Error: %s\n", strerror(errno));
132 return -1;
133 }
134 UInt_t reusable = 1;
135 if (setsockopt(_sfd, SOL_SOCKET, SO_REUSEADDR, &reusable, sizeof (reusable))) {
136 close(_sfd);
137 fprintf(stderr, "Setting reusable error: %s\n", strerror(errno));
138 return -1;
139 }
140 addrlen = sizeof (mcast_addr);
141 if (bind(_sfd, (sockaddr*) & mcast_addr, addrlen) == -1) {
142 close(_sfd);
143 fprintf(stderr, "Bind error: %s\n", strerror(errno));
144 return -1;
145 }
146 if (setsockopt(_sfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq))) {
147 close(_sfd);
148 fprintf(stderr, "Adding multicast group error: %s\n", strerror(errno));
149 return -1;
150 }
151 Int_t nbytes;
152 Char_t buf[MAX_BUF_LEN];
153 //signal(SIGINT, HandleSignal);
154 isListening = kTRUE;
155 while (isListening) {
156 if ((nbytes = recvfrom(_sfd, buf, MAX_BUF_LEN, 0, (sockaddr*) & mcast_addr, &addrlen)) == -1) {
157 close(_sfd);
158 fprintf(stderr, "Receive error: %s\n", strerror(errno));
159 return -1;
160 }
161 buf[nbytes] = '\0';
162 // printf("%s\n", buf);
163 ParsePNPMsg(buf, &_dataServer);
164 printf("index = %s\n", _dataServer.index);
165 printf("type = %s\n", _dataServer.type);
166 if ((strcmp(_dataServer.index, "Glob") == 0) && (_dataServer.interfaces.size() > 0)) {
167 isAddr = true;
168 DBG("Glob EvB found")
169 break;
170 }
171 }
172 RecvData();
173
174 close(_sfd);
175 return 0;
176}
177
179 // Int_t isRaw = 1;
180 // if (zmq_setsockopt(_socket_data, ZMQ_ROUTER_RAW, &isRaw, sizeof(isRaw)) == -1)
181 // DBGERR("zmq_setsockopt of ZMQ_ROUTER_RAW")
182 Char_t endpoint_addr[MAX_ADDR_LEN];
183 struct addrinfo hints;
184 struct addrinfo *info, *p;
185 memset(&hints, 0, sizeof (hints));
186 hints.ai_family = AF_UNSPEC;
187 hints.ai_socktype = SOCK_STREAM;
188 Int_t g = 0;
189 if ((g = getaddrinfo(_dataServer.hostName, NULL, &hints, &info)) != 0)
190 {
191 printf("getaddrinfo error: %s\n", strerror(errno));
192 return -1;
193 }
194 Char_t *ip = NULL;
195 for (p = info; p != NULL; p = p->ai_next) {
196 ip = inet_ntoa(((sockaddr_in*) (p->ai_addr))->sin_addr);
197 printf("found family %d %s addr: %s\n", p->ai_family, p->ai_canonname, ip);
198 Int_t port = 0;
199 for (auto iface : _dataServer.interfaces)
200 if (strcmp(iface.type, "Monitor output data flow") == 0) {
201 port = iface.port;
202 break;
203 }
204 snprintf(endpoint_addr, MAX_ADDR_LEN, "tcp://%s:%d", ip, port);
205 if (zmq_connect(_socket_data, endpoint_addr) != 0) {
206 DBGERR("zmq connect")
207 continue;
208 } else {
209 printf("%s\n", endpoint_addr);
210 break;
211 }
212 }
213 if (p == NULL) {
214 printf("Valid address not found\n");
215 return -1;
216 }
217 Int_t rcvBuf = MAX_BUF_LEN;
218 size_t vl = sizeof (rcvBuf);
219 if (zmq_setsockopt(_socket_data, ZMQ_RCVBUF, &rcvBuf, sizeof (rcvBuf)) == -1)
220 DBGERR("zmq_setsockopt of ZMQ_RCVBUF")
221 if (zmq_setsockopt(_socket_data, ZMQ_SNDBUF, &rcvBuf, sizeof (rcvBuf)) == -1)
222 DBGERR("zmq_setsockopt of ZMQ_SNDBUF")
223 rcvBuf = 0;
224 if (zmq_getsockopt(_socket_data, ZMQ_RCVBUF, &rcvBuf, &vl) == -1)
225 DBGERR("zmq_getsockopt of ZMQ_RCVBUF")
226 printf("rcvbuf = %d\n", rcvBuf);
227 //Char_t id[MAX_ADDR_LEN];
228
229 return 0;
230}
231
233 freeaddrinfo(dataAddrInfo);
234}
235
237 // while ((isListening) && (msg_len < MAX_BUF_LEN)) {
238 // id_size = zmq_recv(_socket_data, &id, sizeof (id), 0);
239 // if (id_size == -1) {
240 // printf("Receive error #%s\n", zmq_strerror(errno));
241 // if (errno == EAGAIN)
242 // usleep(MSG_TIMEOUT);
243 // else
244 // return -1;
245 // } else {
246 // printf("ID size = %d\n Id:%x\n", id_size, id);
247 // }
248 // zmq_msg_t msg;
249 // zmq_msg_init(&msg);
250 // Int_t recv_more = 0;
251 // UInt_t *msgPtr;
252 // do {
253 // frame_size = zmq_msg_recv(&msg, _socket_data, 0); // ZMQ_DONTWAIT
254 // //frame_size = zmq_recv(_socket_data, buf, MAX_BUF_LEN, 0);
255 // if (frame_size == -1) {
256 // printf("Receive error № %d #%s\n", errno, zmq_strerror(errno));
257 // if (errno == EAGAIN)
258 // usleep(MSG_TIMEOUT);
259 // else
260 // return -1;
261 // } else {
262 // // UChar_t *str = (UChar_t*) malloc((frame_size + 1) * sizeof (UChar_t));
263 // msgPtr = (UInt_t*) zmq_msg_data(&msg);
264 // // memcpy(buf, zmq_msg_data(&msg), frame_size);
265 // // ((mutex*)_deque_mutex)->lock();
266 // for (Int_t offset = 0; offset < frame_size; offset++)
267 // data_queue.push_back(*(msgPtr + offset));
268 // // ((mutex*)_deque_mutex)->unlock();
269 // // memcpy(str, zmq_msg_data(&msg), frame_size);
270 // // str[frame_size] = '\0';
271 // msg_len += frame_size;
272 // // printf("Frame size = %d\n Msg:%x\n", frame_size, str);
273 // // free(str);
274 // }
275 // size_t opt_size = sizeof (recv_more);
276 // if (zmq_getsockopt(_socket_data, ZMQ_RCVMORE, &recv_more, &opt_size) == -1) {
277 // printf("ZMQ socket options error #%s\n", zmq_strerror(errno));
278 // return -1;
279 // }
280 // printf("ZMQ rcvmore = %d\n", recv_more);
281 //
282 // zmq_msg_close(&msg);
283 // } while (recv_more);
284 // }
285 return 0;
286}
287
289 // Int_t isRaw = 1;
290 // if (zmq_setsockopt(_socket_data, ZMQ_ROUTER_RAW, &isRaw, sizeof(isRaw)) == -1)
291 // DBGERR("zmq_setsockopt of ZMQ_ROUTER_RAW")
292 Char_t endpoint_addr[MAX_ADDR_LEN];
293 struct addrinfo hints;
294 struct addrinfo *p;
295 memset(&hints, 0, sizeof (hints));
296 hints.ai_family = AF_UNSPEC;
297 hints.ai_socktype = SOCK_STREAM;
298 Int_t g = 0;
299 if ((g = getaddrinfo(_dataServer.hostName, NULL, &hints, &dataAddrInfo)) != 0)
300 {
301 printf("getaddrinfo error: %s\n", strerror(errno));
302 return -1;
303 }
304 Char_t *ip = NULL;
305 for (p = dataAddrInfo; p != NULL; p = p->ai_next) {
306 ip = inet_ntoa(((sockaddr_in*) (p->ai_addr))->sin_addr);
307 printf("found family %d %s addr: %s\n", p->ai_family, p->ai_canonname, ip);
308 Int_t port = 0;
309 for (auto iface : _dataServer.interfaces)
310 if (strcmp(iface.type, "Monitor output data flow") == 0) {
311 port = iface.port;
312 break;
313 }
314 snprintf(endpoint_addr, MAX_ADDR_LEN, "tcp://%s:%d", ip, port);
315 if (zmq_connect(_socket_data, endpoint_addr) != 0) {
316 DBGERR("zmq")
317 continue;
318 } else {
319 printf("%s\n", endpoint_addr);
320 break;
321 }
322 }
323 if (p == NULL) {
324 printf("Valid address not found\n");
325 return -1;
326 }
327 Int_t rcvBufLen = MAX_BUF_LEN;
328 size_t vl = sizeof (rcvBufLen);
329 if (zmq_setsockopt(_socket_data, ZMQ_RCVBUF, &rcvBufLen, sizeof (rcvBufLen)) == -1)
330 DBGERR("zmq_setsockopt of ZMQ_RCVBUF")
331 if (zmq_setsockopt(_socket_data, ZMQ_SNDBUF, &rcvBufLen, sizeof (rcvBufLen)) == -1)
332 DBGERR("zmq_setsockopt of ZMQ_SNDBUF")
333 rcvBufLen = 0;
334 if (zmq_getsockopt(_socket_data, ZMQ_RCVBUF, &rcvBufLen, &vl) == -1)
335 DBGERR("zmq_getsockopt of ZMQ_RCVBUF")
336 printf("rcvbuf = %d\n", rcvBufLen);
337 // UInt_t *buf = (UInt_t*) malloc(MAX_BUF_LEN);
338 Char_t conID[MAX_ADDR_LEN];
339 Int_t conID_size;
340 Int_t msg_len = 0;
341 Int_t frame_size = 0;
342 isListening = kTRUE;
343 while ((isListening) && (msg_len < MAX_BUF_LEN)) {
344 conID_size = zmq_recv(_socket_data, &conID, sizeof (conID), 0);
345 if (conID_size == -1) {
346 printf("Receive error #%s\n", zmq_strerror(errno));
347 if (errno == EAGAIN)
348 usleep(MSG_TIMEOUT);
349 else
350 return -1;
351 } else {
352 printf("ID size = %d\n Id:%s\n", conID_size, conID);
353 }
354 zmq_msg_t msg;
355 zmq_msg_init(&msg);
356 Int_t recv_more = 0;
357 UInt_t *msgPtr;
358 do {
359 frame_size = zmq_msg_recv(&msg, _socket_data, 0); // ZMQ_DONTWAIT
360 //frame_size = zmq_recv(_socket_data, buf, MAX_BUF_LEN, 0);
361 if (frame_size == -1) {
362 printf("Receive error № %d #%s\n", errno, zmq_strerror(errno));
363 if (errno == EAGAIN)
364 usleep(MSG_TIMEOUT);
365 else
366 return -1;
367 } else {
368 // UChar_t *str = (UChar_t*) malloc((frame_size + 1) * sizeof (UChar_t));
369 msgPtr = (UInt_t*) zmq_msg_data(&msg);
370 // memcpy(buf, zmq_msg_data(&msg), frame_size);
371 // ((mutex*)_deque_mutex)->lock();
372 for (Int_t offset = 0; offset < frame_size; offset++)
373 data_queue.push_back(*(msgPtr + offset));
374 // ((mutex*)_deque_mutex)->unlock();
375 // memcpy(str, zmq_msg_data(&msg), frame_size);
376 // str[frame_size] = '\0';
377 msg_len += frame_size;
378 // printf("Frame size = %d\n Msg:%x\n", frame_size, str);
379 // free(str);
380 }
381 size_t opt_size = sizeof (recv_more);
382 if (zmq_getsockopt(_socket_data, ZMQ_RCVMORE, &recv_more, &opt_size) == -1) {
383 printf("ZMQ socket options error #%s\n", zmq_strerror(errno));
384 return -1;
385 }
386 printf("ZMQ rcvmore = %d\n", recv_more);
387
388 zmq_msg_close(&msg);
389 } while (recv_more);
390 }
391 // free(buf);
392 freeaddrinfo(dataAddrInfo);
393 return 0;
394}
395
397 Char_t endpoint_addr[MAX_ADDR_LEN];
398 snprintf(endpoint_addr, MAX_ADDR_LEN, "epgm://%s;%s:%d", INPUT_IFACE, PNP_DISCOVER_IP_ADDR, PNP_DISCOVER_PORT);
399 Int_t rc = 0;
400 rc = zmq_connect(_socket_mcast, endpoint_addr);
401 if (rc != 0)
402 printf("Error: %s\n", zmq_strerror(errno));
403 else
404 printf("%s\n", endpoint_addr);
405 //Char_t * buf = (Char_t*) malloc(255);
406 Int_t frame_size = 0;
407 for (Int_t i = 0; i < 10; i++) {
408 zmq_msg_t msg;
409 zmq_msg_init(&msg);
410 Int_t recv_more = 0;
411 do {
412 frame_size = zmq_msg_recv(&msg, _socket_mcast, 0);
413 //frame_size = zmq_recv(_socket_mcast, buf, 255, 0);
414 if (frame_size == -1) {
415 printf("Receive error #%s\n", zmq_strerror(errno));
416 break;
417 } else {
418 Char_t *str = (Char_t*) malloc((frame_size + 1) * sizeof (Char_t));
419 memcpy(str, zmq_msg_data(&msg), frame_size);
420 str[frame_size] = '\0';
421 printf("Frame size = %d\n Msg:%s\n", frame_size, str);
422 }
423 size_t opt_size = sizeof (Int_t);
424 if (zmq_getsockopt(_socket_mcast, ZMQ_RCVMORE, &recv_more, &opt_size) == -1) {
425 printf("ZMQ socket options error #%s\n", zmq_strerror(errno));
426 break;
427 }
428
429
430
431 } while (recv_more);
432 printf("Received msg # %d\n", i);
433 zmq_msg_close(&msg);
434 }
435 return 0;
436}
437
439 void * sender = zmq_socket(_ctx, ZMQ_XPUB);
440 const int maxlen = 255;
441 Char_t s[maxlen];
442 snprintf(s, maxlen, "epgm://%s;%s:%d", INPUT_IFACE, PNP_DISCOVER_IP_ADDR, PNP_DISCOVER_PORT); //enp3s0
443 Int_t rc = 0;
444 //_socket_mcast.connect(s);
445 rc = zmq_bind(sender, s);
446 printf("%s\n", s);
447 if (rc != 0)
448 printf("Error: %s\n", zmq_strerror(errno));
449
450 Char_t text[11] = "Hello port";
451 int len = strlen(text);
452 text[len] = '\0';
453 for (int i = 0; i < 5; i++) {
454 zmq_msg_t msg;
455 zmq_msg_init_size(&msg, len);
456 memcpy(zmq_msg_data(&msg), text, len);
457 rc = zmq_msg_send(&msg, sender, 0);
458 if (rc == -1)
459 printf("Send error: %s\n", zmq_strerror(errno));
460 else
461 printf("Sended bytes: %d\n", rc);
462 zmq_msg_close(&msg);
463 usleep(1000000);
464 }
465 zmq_close(sender);
466
467 return 0;
468}
469
#define ZMQ_STREAM
#define ZMQ_XSUB
void memset(T *dest, T i, size_t num)
uses binary expansion of copied volume for speed up
Definition L1Grid.h:25
int i
Definition P4_F32vec4.h:22
#define DBGERR(a)
Definition BmnMath.h:25
#define DBG(a)
Definition BmnMath.h:24
virtual ~BmnDataReceiver()
#define PNP_DISCOVER_PORT
#define MAX_ADDR_LEN
#define INPUT_IFACE
#define MAX_BUF_LEN
#define MSG_TIMEOUT
#define PNP_DISCOVER_IP_ADDR