36 printf(
"BmnMQSource::Init()\n");
44 gSystem->ProcessEvents();
45 frame_size = zmq_msg_recv(&_msg, _decoSocket, ZMQ_DONTWAIT);
46 if (frame_size == -1) {
58 fprintf(stderr,
"Receive error № %d #%s\n", errno, zmq_strerror(errno));
64 gSystem->ProcessEvents();
66 }
while ((frame_size <= 0));
70 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
71 parts =
static_cast<BmnParts*
>(_tBuf->ReadObjectAny(BmnParts::Class()));
72 cout <<
"TCA len : " << parts->
GetArrays().size() << endl;
73 cout <<
"Obj len : " << parts->
GetObjects().size() << endl;
75 FairRootManager* ioman = FairRootManager::Instance();
76 for (TClonesArray* ar : parts->
GetArrays()) {
77 TClonesArray* newAr =
new TClonesArray(ar->GetClass());
78 newAr->SetName(ar->GetName());
80 ioman->RegisterInputObject(newAr->GetName(), newAr);
82 fArrVec.push_back(newAr);
89 TNamed* ob =
static_cast<TNamed*
>(tn->Clone());
90 ioman->RegisterInputObject(tn->GetName(), ob);
91 fNamVec.push_back(ob);
93 if (!strcmp(ob->ClassName(),
"BmnEventHeader")) {
95 fRunId = eh->GetRunId();
97 LOG(debug) <<
"Set Digi RunID: " << fRunId << endl;
99 if (!strcmp(ob->ClassName(),
"DstEventHeader")) {
101 fRunId = eh->GetRunId();
103 LOG(debug) <<
"Set DST RunID: " << fRunId << endl;
107 _tBuf->DetachBuffer();
108 zmq_msg_close(&_msg);
131 gSystem->IgnoreInterrupt();
132 signal(SIGINT, signal_handler);
133 signal(SIGTERM, signal_handler);
135 Int_t frame_size = 0;
139 gSystem->ProcessEvents();
140 frame_size = zmq_msg_recv(&_msg, _decoSocket, ZMQ_DONTWAIT);
141 if (frame_size == -1) {
143 Int_t err_num = zmq_errno();
155 fprintf(stderr,
"Receive error # %d #%s\n", err_num, zmq_strerror(err_num));
159 fprintf(stderr,
"Receive error zmq# %d #%s\n", err_num, zmq_strerror(err_num));
163 }
while (frame_size <= 0);
166 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
167 BmnParts* parts =
static_cast<BmnParts*
>(_tBuf->ReadObject(BmnParts::Class()));
174 for (UInt_t iAr = 0; iAr < fArrVec.size(); iAr++) {
175 fArrVec[iAr]->Delete();
176 fArrVec[iAr]->AbsorbObjects(parts->
GetArrays()[iAr]);
180 for (UInt_t iAr = 0; iAr < fNamVec.size(); iAr++) {
183 if (!strcmp(fNamVec[iAr]->ClassName(),
"BmnEventHeader")) {
185 fEventHeader =
static_cast<FairEventHeader*
>(fNamVec[iAr]);
186 LOG(debug) <<
"Digi EventID: " <<
static_cast<BmnEventHeader*
>(fNamVec[iAr])->GetEventId() << endl;
188 if (!strcmp(fNamVec[iAr]->ClassName(),
"DstEventHeader")) {
192 fEventHeader =
static_cast<FairEventHeader*
>(fNamVec[iAr]);
194 LOG(debug) <<
"DST EventID: " <<
static_cast<DstEventHeader*
>(fNamVec[iAr])->GetEventId() << endl;
196 if (!strcmp(fNamVec[iAr]->ClassName(),
"CbmVertex")) {
203 fprintf(stderr,
"Failed to deserialize the incoming object!\n");
204 _tBuf->DetachBuffer();
205 zmq_msg_close(&_msg);
226 if (zmq_getsockopt(_decoSocket, ZMQ_RCVBUF, &rcvBuf, &vl) == -1)
230 if (zmq_setsockopt(_decoSocket, ZMQ_SUBSCRIBE, NULL, 0) == -1) {