41 printf(
"BmnDecoSource::Init()\n");
46 gSystem->IgnoreInterrupt();
47 signal(SIGINT, signal_handler);
48 signal(SIGTERM, signal_handler);
50 FairRootManager* ioman = FairRootManager::Instance();
52 _decoSocket = zmq_socket(_ctx, ZMQ_SUB);
66 if (zmq_setsockopt(_decoSocket, ZMQ_RCVHWM, &hwm,
sizeof(hwm)) == -1)
67 fprintf(stderr,
"Error in zmq_setsockopt of ZMQ_RCVHWM: %s\n", strerror(errno));
68 size_t vl =
sizeof(hwm);
69 if (zmq_getsockopt(_decoSocket, ZMQ_RCVHWM, &hwm, &vl) == -1)
70 fprintf(stderr,
"Error getting ZMQ socket RCVHWM size: %s\n", strerror(errno));
71 printf(
"ZMQ_RCV HWM %d\n", hwm);
73 if (zmq_setsockopt(_decoSocket, ZMQ_SUBSCRIBE,
nullptr, 0) == -1) {
75 fprintf(stderr,
"Error subscribing to ZMQ socket: %s\n", strerror(errno));
78 if (zmq_connect(_decoSocket, _addrString.Data()) != 0) {
80 fprintf(stderr,
"Error connecting to ZMQ socket: %s\n", strerror(errno));
83 printf(
"connected to %s\n", _addrString.Data());
86 _tBuf =
new TBufferFile(TBuffer::kRead);
88 Int_t frame_size = -1;
92 gSystem->ProcessEvents();
93 frame_size = zmq_msg_recv(&_msg, _decoSocket, ZMQ_DONTWAIT);
94 if (frame_size == -1) {
106 fprintf(stderr,
"Receive error № %d #%s\n", errno, zmq_strerror(errno));
112 gSystem->ProcessEvents();
114 }
while ((frame_size <= 0));
117 fDigiArrays->
Clear();
119 fDigiArrays =
nullptr;
122 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
123 fDigiArrays = (
DigiArrays*)(_tBuf->ReadObject(DigiArrays::Class()));
124 _tBuf->DetachBuffer();
125 zmq_msg_close(&_msg);
127 for (TClonesArray* ar : *(fDigiArrays->
trigAr)) {
128 TClonesArray* newAr =
new TClonesArray(ar->GetClass());
129 newAr->SetName(ar->GetName());
130 ioman->RegisterInputObject(newAr->GetName(), newAr);
131 fT0Digits.push_back(newAr);
135 ioman->RegisterInputObject(
"BmnEventHeader.", fEventHeader);
136 fRunId = fEventHeader->GetRunId();
139 fGemDigits =
new TClonesArray(
"BmnGemStripDigit");
140 ioman->RegisterInputObject(
"GEM", fGemDigits);
142 fVspDigits =
new TClonesArray(
"BmnVSPDigit");
143 ioman->RegisterInputObject(
"VSP", fVspDigits);
145 fSilDigits =
new TClonesArray(
"BmnSiliconDigit");
146 ioman->RegisterInputObject(
"SILICON", fSilDigits);
148 fSiBTDigits =
new TClonesArray(
"BmnSiBTDigit");
149 ioman->RegisterInputObject(
"SiBT", fSiBTDigits);
151 fCscDigits =
new TClonesArray(
"BmnCSCDigit");
152 ioman->RegisterInputObject(
"CSC", fCscDigits);
154 fTof400Digits =
new TClonesArray(
"BmnTof1Digit");
155 ioman->RegisterInputObject(
"TOF400", fTof400Digits);
157 fTof700Digits =
new TClonesArray(
"BmnTof701Digit");
158 ioman->RegisterInputObject(
"TOF701", fTof700Digits);
208 Int_t frame_size = zmq_msg_recv(&_msg, _decoSocket, 0);
209 if (frame_size == -1) {
210 fprintf(stderr,
"Receive error № %d #%s\n", errno, zmq_strerror(errno));
214 fDigiArrays->
Clear();
216 fDigiArrays =
nullptr;
219 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
220 fDigiArrays = (
DigiArrays*)(_tBuf->ReadObject(DigiArrays::Class()));
223 for (TClonesArray* ar : fT0Digits)
225 fGemDigits->Delete();
226 fVspDigits->Delete();
227 fSilDigits->Delete();
228 fSiBTDigits->Delete();
229 fCscDigits->Delete();
230 fTof400Digits->Delete();
231 fTof700Digits->Delete();
233 fEventHeader->SetRunId(header->GetRunId());
234 fEventHeader->
SetEventId(header->GetEventId());
236 fEventHeader->SetEventTime(header->GetEventTime());
244 fEventHeader->
SetStartSignalInfo(header->GetStartSignalTime(), header->GetStartSignalWidth());
245 for (
size_t iTrig = 0; iTrig < fDigiArrays->
trigAr->size(); iTrig++)
246 fT0Digits[iTrig]->AbsorbObjects((*fDigiArrays->
trigAr)[iTrig]);
247 fGemDigits->AbsorbObjects(fDigiArrays->
gem);
248 fSilDigits->AbsorbObjects(fDigiArrays->
silicon);
249 fSiBTDigits->AbsorbObjects(fDigiArrays->
sibt);
250 fCscDigits->AbsorbObjects(fDigiArrays->
csc);
251 fVspDigits->AbsorbObjects(fDigiArrays->
vsp);
253 fTof400Digits->AbsorbObjects(fDigiArrays->
tof400);
254 fTof700Digits->AbsorbObjects(fDigiArrays->
tof701);
256 _tBuf->DetachBuffer();
257 zmq_msg_close(&_msg);