15 fBoardIds(move(boardIds))
19 for (
auto &it : fBoardsMap)
24 printf(
"BmnProfilometerSource::Init()\n");
27 for (
string &addr : fAddrs) {
28 void * raw_socket = zmq_socket(_ctx, ZMQ_SUB);
29 fRawSockets.push_back(raw_socket);
30 if (zmq_connect(raw_socket, addr.c_str()) != 0) {
32 fprintf(stderr,
"Error connecting to ZMQ socket: %s\n", strerror(errno));
35 printf(
"connected to %s\n", addr.c_str());
36 if (zmq_setsockopt(raw_socket, ZMQ_SUBSCRIBE, NULL, 0) == -1) {
38 fprintf(stderr,
"Error subscribing to ZMQ socket: %s\n", strerror(errno));
44 FairRootManager* ioman = FairRootManager::Instance();
45 fArr =
new TClonesArray(BmnADCDigit::Class());
46 fArrTemp =
new TClonesArray(BmnADCDigit::Class());
47 ioman->RegisterInputObject(
"ADC192ASIC", fArr);
49 ioman->RegisterInputObject(
"BmnEventHeader.", fEventHead);
50 TPRegexp re(
"\\w+(\\d+)");
51 for (
string &
id : fBoardIds) {
54 re.Substitute(str,
"$1");
56 fBoardsMap.insert(make_pair(
id, b));
62 for (
void * rs : fRawSockets)
64 zmq_ctx_destroy(_ctx);
70 if (fArrTemp->GetEntriesFast() == 0) {
79 fArr->AbsorbObjects(fArrTemp, 0, 0);
93BmnStatus BmnProfilometerSource::ProcessBuffer(uint32_t *word,
size_t len,
ProfBoard * board) {
94 uint32_t holdb_temp = 0;
98 for (uint32_t
i = 0;
i < len;
i++) {
99 uint32_t data = word[
i];
104 if (holdb_temp == holdb) {
140BmnStatus BmnProfilometerSource::ReceiveSpill() {
141 const Int_t MaxStrLen = 100;
142 bool isIdFound = kFALSE;
143 bool isHeaderFound = kFALSE;
144 bool isTrailerFound = kFALSE;
153 for (
void *rs : fRawSockets) {
156 Int_t frame_size = zmq_msg_recv(&msg, rs, ZMQ_DONTWAIT);
158 if (frame_size == -1) {
168 isReceiving = kFALSE;
169 keepWorking = kFALSE;
177 isReceiving = kFALSE;
178 keepWorking = kFALSE;
184 if (frame_size < MaxStrLen) {
185 string str(
static_cast<char*
> (zmq_msg_data(&msg)), zmq_msg_size(&msg));
188 if (str == DataTrailer) {
189 isTrailerFound = kTRUE;
192 keepWorking = kFALSE;
194 if (str == DataHeader) {
195 isHeaderFound = kTRUE;
202 auto it = fBoardsMap.find(BoardName);
203 if (it != fBoardsMap.end())
207 printf(
"id %s\n", str.c_str());
211 if (isHeaderFound && Board) {
212 ProcessBuffer(
static_cast<uint32_t*
> (zmq_msg_data(&msg)), zmq_msg_size(&msg) / 4, Board);
216 size_t opt_size =
sizeof (recv_more);
217 if (zmq_getsockopt(rs, ZMQ_RCVMORE, &recv_more, &opt_size) == -1) {
218 printf(
"ZMQ socket options error #%s\n", zmq_strerror(errno));
224 }
while (recv_more && isReceiving && (!isTrailerFound));
225 }
while (keepWorking);
227 printf(
"FullReceive\n");
228 if (isTrailerFound) {
230 printf(
"SPILLEND for %s\n", BoardName.c_str());
231 isHeaderFound = kFALSE;
232 isTrailerFound = kFALSE;
Int_t ReadEvent(UInt_t i=0)
BmnProfilometerSource(vector< string > addr={"tcp://159.93.49.126:5601", "tcp://159.93.49.126:5602"}, vector< string > boardIds={"board1", "board2"})
virtual ~BmnProfilometerSource()
void FillEventHeader(FairEventHeader *feh)
vector< uint16_t > adc2_word
vector< uint16_t > adc1_word