18#include <netinet/in.h>
21#include <sys/socket.h>
22#include </usr/include/netdb.h>
26#include "libxml/tree.h"
30#define ZMQ_ROUTER_RAW 41
42void BmnDataReceiver::InitSocks()
47 _socket_mcast = zmq_socket(_ctx,
ZMQ_XSUB);
51void BmnDataReceiver::DeinitSocks() {
53 zmq_close(_socket_mcast);
54 zmq_close(_socket_data);
55 zmq_ctx_destroy(_ctx);
74int BmnDataReceiver::ParsePNPMsg(Char_t* msgStr, serverInfo* sInfo) {
75 xmlDocPtr doc = xmlParseDoc((xmlChar*) msgStr);
76 xmlNodePtr root = xmlDocGetRootElement(doc);
77 xmlNodePtr cur_node = root;
79 if ((cur_node->type == XML_ELEMENT_NODE) || (cur_node->type == XML_TEXT_NODE)) {
80 if (strcmp((Char_t*) cur_node->name,
"program") == 0) {
81 strcpy(sInfo->hostName, (Char_t*) xmlGetProp(cur_node, (xmlChar*)
"hostName"));
82 strcpy(sInfo->index, (Char_t*) xmlGetProp(cur_node, (xmlChar*)
"index"));
83 strcpy(sInfo->name, (Char_t*) xmlGetProp(cur_node, (xmlChar*)
"name"));
84 strcpy(sInfo->type, (Char_t*) xmlGetProp(cur_node, (xmlChar*)
"type"));
85 cur_node = cur_node->children;
88 if (strcmp((Char_t*) cur_node->name,
"interfaces") == 0) {
89 sInfo->interfaces.clear();
90 cur_node = cur_node->children;
93 if (strcmp((Char_t*) cur_node->name,
"interface") == 0) {
95 Char_t* portStr = (Char_t*) xmlGetProp(cur_node, (xmlChar*)
"port");
97 newPort.port = atoi(portStr);
98 newPort.isFree = strcmp(((Char_t*) xmlGetProp(cur_node, (xmlChar*)
"isFree")),
"1") == 0 ? 1 : 0;
99 newPort.enabled = strcmp(((Char_t*) xmlGetProp(cur_node, (xmlChar*)
"enabled")),
"1") == 0 ? 1 : 0;
100 strcpy(newPort.type, (Char_t*) xmlGetProp(cur_node, (xmlChar*)
"type"));
102 sInfo->interfaces.push_back(newPort);
103 if (cur_node->next == NULL)
104 cur_node = cur_node->parent->next;
106 cur_node = cur_node->next;
109 cur_node = cur_node->next;
119 socklen_t addrlen = 0;
121 mreq.imr_interface.s_addr = htons(INADDR_ANY);
123 struct sockaddr_in mcast_addr;
124 mcast_addr.sin_family = AF_INET;
126 mcast_addr.sin_addr.s_addr = htonl(INADDR_ANY);
127 memset(&mcast_addr.sin_zero, 0, sizeof (mcast_addr.sin_zero));
129 _sfd = socket(AF_INET, SOCK_DGRAM, 0);
131 fprintf(stderr,
"Error: %s\n", strerror(errno));
135 if (setsockopt(_sfd, SOL_SOCKET, SO_REUSEADDR, &reusable,
sizeof (reusable))) {
137 fprintf(stderr,
"Setting reusable error: %s\n", strerror(errno));
140 addrlen =
sizeof (mcast_addr);
141 if (bind(_sfd, (sockaddr*) & mcast_addr, addrlen) == -1) {
143 fprintf(stderr,
"Bind error: %s\n", strerror(errno));
146 if (setsockopt(_sfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof (mreq))) {
148 fprintf(stderr,
"Adding multicast group error: %s\n", strerror(errno));
155 while (isListening) {
156 if ((nbytes = recvfrom(_sfd, buf,
MAX_BUF_LEN, 0, (sockaddr*) & mcast_addr, &addrlen)) == -1) {
158 fprintf(stderr,
"Receive error: %s\n", strerror(errno));
163 ParsePNPMsg(buf, &_dataServer);
164 printf(
"index = %s\n", _dataServer.index);
165 printf(
"type = %s\n", _dataServer.type);
166 if ((strcmp(_dataServer.index,
"Glob") == 0) && (_dataServer.interfaces.size() > 0)) {
168 DBG(
"Glob EvB found")
183 struct addrinfo hints;
184 struct addrinfo *info, *p;
185 memset(&hints, 0,
sizeof (hints));
186 hints.ai_family = AF_UNSPEC;
187 hints.ai_socktype = SOCK_STREAM;
189 if ((g = getaddrinfo(_dataServer.hostName, NULL, &hints, &info)) != 0)
191 printf(
"getaddrinfo error: %s\n", strerror(errno));
195 for (p = info; p != NULL; p = p->ai_next) {
196 ip = inet_ntoa(((sockaddr_in*) (p->ai_addr))->sin_addr);
197 printf(
"found family %d %s addr: %s\n", p->ai_family, p->ai_canonname, ip);
199 for (
auto iface : _dataServer.interfaces)
200 if (strcmp(iface.type,
"Monitor output data flow") == 0) {
204 snprintf(endpoint_addr,
MAX_ADDR_LEN,
"tcp://%s:%d", ip, port);
205 if (zmq_connect(_socket_data, endpoint_addr) != 0) {
209 printf(
"%s\n", endpoint_addr);
214 printf(
"Valid address not found\n");
218 size_t vl =
sizeof (rcvBuf);
219 if (zmq_setsockopt(_socket_data, ZMQ_RCVBUF, &rcvBuf,
sizeof (rcvBuf)) == -1)
220 DBGERR(
"zmq_setsockopt of ZMQ_RCVBUF")
221 if (zmq_setsockopt(_socket_data, ZMQ_SNDBUF, &rcvBuf,
sizeof (rcvBuf)) == -1)
222 DBGERR(
"zmq_setsockopt of ZMQ_SNDBUF")
224 if (zmq_getsockopt(_socket_data, ZMQ_RCVBUF, &rcvBuf, &vl) == -1)
225 DBGERR(
"zmq_getsockopt of ZMQ_RCVBUF")
226 printf(
"rcvbuf = %d\n", rcvBuf);
233 freeaddrinfo(dataAddrInfo);
293 struct addrinfo hints;
295 memset(&hints, 0,
sizeof (hints));
296 hints.ai_family = AF_UNSPEC;
297 hints.ai_socktype = SOCK_STREAM;
299 if ((g = getaddrinfo(_dataServer.hostName, NULL, &hints, &dataAddrInfo)) != 0)
301 printf(
"getaddrinfo error: %s\n", strerror(errno));
305 for (p = dataAddrInfo; p != NULL; p = p->ai_next) {
306 ip = inet_ntoa(((sockaddr_in*) (p->ai_addr))->sin_addr);
307 printf(
"found family %d %s addr: %s\n", p->ai_family, p->ai_canonname, ip);
309 for (
auto iface : _dataServer.interfaces)
310 if (strcmp(iface.type,
"Monitor output data flow") == 0) {
314 snprintf(endpoint_addr,
MAX_ADDR_LEN,
"tcp://%s:%d", ip, port);
315 if (zmq_connect(_socket_data, endpoint_addr) != 0) {
319 printf(
"%s\n", endpoint_addr);
324 printf(
"Valid address not found\n");
328 size_t vl =
sizeof (rcvBufLen);
329 if (zmq_setsockopt(_socket_data, ZMQ_RCVBUF, &rcvBufLen,
sizeof (rcvBufLen)) == -1)
330 DBGERR(
"zmq_setsockopt of ZMQ_RCVBUF")
331 if (zmq_setsockopt(_socket_data, ZMQ_SNDBUF, &rcvBufLen,
sizeof (rcvBufLen)) == -1)
332 DBGERR(
"zmq_setsockopt of ZMQ_SNDBUF")
334 if (zmq_getsockopt(_socket_data, ZMQ_RCVBUF, &rcvBufLen, &vl) == -1)
335 DBGERR(
"zmq_getsockopt of ZMQ_RCVBUF")
336 printf(
"rcvbuf = %d\n", rcvBufLen);
341 Int_t frame_size = 0;
344 conID_size = zmq_recv(_socket_data, &conID,
sizeof (conID), 0);
345 if (conID_size == -1) {
346 printf(
"Receive error #%s\n", zmq_strerror(errno));
352 printf(
"ID size = %d\n Id:%s\n", conID_size, conID);
359 frame_size = zmq_msg_recv(&msg, _socket_data, 0);
361 if (frame_size == -1) {
362 printf(
"Receive error № %d #%s\n", errno, zmq_strerror(errno));
369 msgPtr = (UInt_t*) zmq_msg_data(&msg);
372 for (Int_t offset = 0; offset < frame_size; offset++)
373 data_queue.push_back(*(msgPtr + offset));
377 msg_len += frame_size;
381 size_t opt_size =
sizeof (recv_more);
382 if (zmq_getsockopt(_socket_data, ZMQ_RCVMORE, &recv_more, &opt_size) == -1) {
383 printf(
"ZMQ socket options error #%s\n", zmq_strerror(errno));
386 printf(
"ZMQ rcvmore = %d\n", recv_more);
392 freeaddrinfo(dataAddrInfo);
400 rc = zmq_connect(_socket_mcast, endpoint_addr);
402 printf(
"Error: %s\n", zmq_strerror(errno));
404 printf(
"%s\n", endpoint_addr);
406 Int_t frame_size = 0;
407 for (Int_t
i = 0;
i < 10;
i++) {
412 frame_size = zmq_msg_recv(&msg, _socket_mcast, 0);
414 if (frame_size == -1) {
415 printf(
"Receive error #%s\n", zmq_strerror(errno));
418 Char_t *str = (Char_t*) malloc((frame_size + 1) *
sizeof (Char_t));
419 memcpy(str, zmq_msg_data(&msg), frame_size);
420 str[frame_size] =
'\0';
421 printf(
"Frame size = %d\n Msg:%s\n", frame_size, str);
423 size_t opt_size =
sizeof (Int_t);
424 if (zmq_getsockopt(_socket_mcast, ZMQ_RCVMORE, &recv_more, &opt_size) == -1) {
425 printf(
"ZMQ socket options error #%s\n", zmq_strerror(errno));
432 printf(
"Received msg # %d\n",
i);
439 void * sender = zmq_socket(_ctx, ZMQ_XPUB);
440 const int maxlen = 255;
445 rc = zmq_bind(sender, s);
448 printf(
"Error: %s\n", zmq_strerror(errno));
450 Char_t text[11] =
"Hello port";
451 int len = strlen(text);
453 for (
int i = 0;
i < 5;
i++) {
455 zmq_msg_init_size(&msg, len);
456 memcpy(zmq_msg_data(&msg), text, len);
457 rc = zmq_msg_send(&msg, sender, 0);
459 printf(
"Send error: %s\n", zmq_strerror(errno));
461 printf(
"Sended bytes: %d\n", rc);
void memset(T *dest, T i, size_t num)
uses binary expansion of copied volume for speed up
virtual ~BmnDataReceiver()
#define PNP_DISCOVER_PORT
#define PNP_DISCOVER_IP_ADDR