BmnRoot
Loading...
Searching...
No Matches
BmnDecoSource.cxx
Go to the documentation of this file.
1#include "BmnDecoSource.h"
2
3#include <signal.h>
4
5using namespace std;
6
7namespace
8{
9int ReceivedSignal = 0;
10
11void signal_handler(int sig)
12{
13 ReceivedSignal = sig;
14 // fKeepWorking = false;
15 printf("Received signal %d Exiting\n", sig);
16}
17} // namespace
18
19BmnDecoSource::BmnDecoSource(TString addr, Int_t periodId)
20 : _addrString(addr)
21 , fDigiArrays(nullptr)
22 , fFirstEvent(kTRUE)
23 // , fRunId(0)
24 , fPeriodId(periodId)
25 , iEventNumber(0)
26 , iT0BranchIndex(-1)
27 , fGemDigits(nullptr)
28 , fVspDigits(nullptr)
29 , fSilDigits(nullptr)
30 , fSiBTDigits(nullptr)
31 , fCscDigits(nullptr)
32 , fTof400Digits(nullptr)
33 , fTof700Digits(nullptr)
34 , fInitDone{false}
35{}
36
38
40{
41 printf("BmnDecoSource::Init()\n");
42 if (fInitDone) {
43 printf("Twice\n");
44 return true;
45 }
46 gSystem->IgnoreInterrupt();
47 signal(SIGINT, signal_handler);
48 signal(SIGTERM, signal_handler);
49
50 FairRootManager* ioman = FairRootManager::Instance();
51 _ctx = zmq_ctx_new();
52 _decoSocket = zmq_socket(_ctx, ZMQ_SUB);
53
54 // int32_t rcvBufLen = 8 * 1024 * 1024;
55 // int32_t ret = 0;
56 // if (ret=zmq_setsockopt(_decoSocket, ZMQ_RCVBUF, &rcvBufLen, sizeof(rcvBufLen)) == -1)
57 // fprintf(stderr, "Error in zmq_setsockopt of ZMQ_RCVBUF: %s\n", strerror(errno));
58 //
59 // int32_t rcvBuf = 0;
60 // size_t vl = sizeof(rcvBuf);
61 // if (zmq_getsockopt(_decoSocket, ZMQ_RCVBUF, &rcvBuf, &vl) == -1)
62 // fprintf(stderr, "Error getting ZMQ socket RCVBUF size: %s\n", strerror(errno));
63 // printf("ZMQ_RCVBUF %d\n", rcvBuf);
64
65 int32_t hwm = 5;
66 if (zmq_setsockopt(_decoSocket, ZMQ_RCVHWM, &hwm, sizeof(hwm)) == -1)
67 fprintf(stderr, "Error in zmq_setsockopt of ZMQ_RCVHWM: %s\n", strerror(errno));
68 size_t vl = sizeof(hwm);
69 if (zmq_getsockopt(_decoSocket, ZMQ_RCVHWM, &hwm, &vl) == -1)
70 fprintf(stderr, "Error getting ZMQ socket RCVHWM size: %s\n", strerror(errno));
71 printf("ZMQ_RCV HWM %d\n", hwm);
72
73 if (zmq_setsockopt(_decoSocket, ZMQ_SUBSCRIBE, nullptr, 0) == -1) {
74 // DBGERR("zmq subscribe")
75 fprintf(stderr, "Error subscribing to ZMQ socket: %s\n", strerror(errno));
76 return kFALSE;
77 }
78 if (zmq_connect(_decoSocket, _addrString.Data()) != 0) {
79 // DBGERR("zmq connect")
80 fprintf(stderr, "Error connecting to ZMQ socket: %s\n", strerror(errno));
81 return kFALSE;
82 } else {
83 printf("connected to %s\n", _addrString.Data());
84 }
85
86 _tBuf = new TBufferFile(TBuffer::kRead);
87 zmq_msg_init(&_msg);
88 Int_t frame_size = -1;
89 do {
90 if (ReceivedSignal)
91 return false;
92 gSystem->ProcessEvents();
93 frame_size = zmq_msg_recv(&_msg, _decoSocket, ZMQ_DONTWAIT);
94 if (frame_size == -1) {
95 // printf("Receive error # %d #%s\n", errno, zmq_strerror(errno));
96 switch (errno) {
97 case EAGAIN:
98 usleep(TimeDelta);
99 break;
100 case EINTR:
101 printf("EINTR\n");
102 printf("Exit!\n");
103 // return kFALSE;
104 break;
105 case EFAULT:
106 fprintf(stderr, "Receive error № %d #%s\n", errno, zmq_strerror(errno));
107 // return kFALSE;
108 break;
109 default:
110 break;
111 }
112 gSystem->ProcessEvents();
113 }
114 } while ((frame_size <= 0));
115
116 if (fDigiArrays) {
117 fDigiArrays->Clear();
118 delete fDigiArrays;
119 fDigiArrays = nullptr;
120 }
121 _tBuf->Reset();
122 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
123 fDigiArrays = (DigiArrays*)(_tBuf->ReadObject(DigiArrays::Class()));
124 _tBuf->DetachBuffer();
125 zmq_msg_close(&_msg);
126
127 for (TClonesArray* ar : *(fDigiArrays->trigAr)) {
128 TClonesArray* newAr = new TClonesArray(ar->GetClass());
129 newAr->SetName(ar->GetName());
130 ioman->RegisterInputObject(newAr->GetName(), newAr);
131 fT0Digits.push_back(newAr);
132 }
133
134 fEventHeader = new BmnEventHeader();
135 ioman->RegisterInputObject("BmnEventHeader.", fEventHeader);
136 fRunId = fEventHeader->GetRunId();
137 fPeriodId = fEventHeader->GetPeriodId();
138
139 fGemDigits = new TClonesArray("BmnGemStripDigit"); //::Class());
140 ioman->RegisterInputObject("GEM", fGemDigits);
141
142 fVspDigits = new TClonesArray("BmnVSPDigit");
143 ioman->RegisterInputObject("VSP", fVspDigits);
144
145 fSilDigits = new TClonesArray("BmnSiliconDigit"); //::Class());
146 ioman->RegisterInputObject("SILICON", fSilDigits);
147
148 fSiBTDigits = new TClonesArray("BmnSiBTDigit");
149 ioman->RegisterInputObject("SiBT", fSiBTDigits);
150
151 fCscDigits = new TClonesArray("BmnCSCDigit"); //::Class());
152 ioman->RegisterInputObject("CSC", fCscDigits);
153
154 fTof400Digits = new TClonesArray("BmnTof1Digit"); //::Class());
155 ioman->RegisterInputObject("TOF400", fTof400Digits);
156
157 fTof700Digits = new TClonesArray("BmnTof701Digit"); //::Class());
158 ioman->RegisterInputObject("TOF701", fTof700Digits);
159 fInitDone = true;
160 return true;
161}
162
164{
165 printf("BmnDecoSource::Close\n");
166 signal(SIGINT, SIG_DFL);
167 signal(SIGTERM, SIG_DFL);
168 if (fEventHeader) {
169 // fEventHeader->Delete();
170 delete fEventHeader;
171 }
172 if (fGemDigits) {
173 fGemDigits->Delete();
174 delete fGemDigits;
175 }
176 if (fVspDigits) {
177 fVspDigits->Delete();
178 delete fVspDigits;
179 }
180 if (fSilDigits) {
181 fSilDigits->Delete();
182 delete fSilDigits;
183 }
184 if (fSiBTDigits) {
185 fSiBTDigits->Delete();
186 delete fSiBTDigits;
187 }
188 if (fTof400Digits) {
189 fTof400Digits->Delete();
190 delete fTof400Digits;
191 }
192 if (fTof700Digits) {
193 fTof700Digits->Delete();
194 delete fTof700Digits;
195 }
196 for (TClonesArray* ar : fT0Digits)
197 ar->Delete();
198
199 zmq_close(_decoSocket);
200 zmq_ctx_destroy(_ctx);
201 _ctx = nullptr;
202 delete _tBuf;
203}
204
206{
207 zmq_msg_init(&_msg);
208 Int_t frame_size = zmq_msg_recv(&_msg, _decoSocket, 0); // ZMQ_DONTWAIT
209 if (frame_size == -1) {
210 fprintf(stderr, "Receive error № %d #%s\n", errno, zmq_strerror(errno));
211 } else {
212 // printf("Received frame_size = %d\n", frame_size);
213 if (fDigiArrays) {
214 fDigiArrays->Clear();
215 delete fDigiArrays;
216 fDigiArrays = nullptr;
217 }
218 _tBuf->Reset();
219 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
220 fDigiArrays = (DigiArrays*)(_tBuf->ReadObject(DigiArrays::Class()));
221
222 // move result TClonesArray to registered TClonesArray
223 for (TClonesArray* ar : fT0Digits)
224 ar->Delete();
225 fGemDigits->Delete();
226 fVspDigits->Delete();
227 fSilDigits->Delete();
228 fSiBTDigits->Delete();
229 fCscDigits->Delete();
230 fTof400Digits->Delete();
231 fTof700Digits->Delete();
232 BmnEventHeader* header = fDigiArrays->header;
233 fEventHeader->SetRunId(header->GetRunId());
234 fEventHeader->SetEventId(header->GetEventId());
235 fEventHeader->SetEventTimeTS(header->GetEventTimeTS());
236 fEventHeader->SetEventTime(header->GetEventTime());
237 fEventHeader->SetEventType(header->GetEventType());
238 fEventHeader->SetTripWord(kFALSE);
239 fEventHeader->SetTrigInfo(header->GetTrigInfo());
240 fEventHeader->SetTimeShift(header->GetTimeShift());
241 fEventHeader->SetInputSignalsAR(header->GetInputSignalsAR());
242 fEventHeader->SetInputSignalsBR(header->GetInputSignalsBR());
243 // fEventHeader->SetInputSignalsVector(move(header->GetInputSignalsVector()));
244 fEventHeader->SetStartSignalInfo(header->GetStartSignalTime(), header->GetStartSignalWidth());
245 for (size_t iTrig = 0; iTrig < fDigiArrays->trigAr->size(); iTrig++)
246 fT0Digits[iTrig]->AbsorbObjects((*fDigiArrays->trigAr)[iTrig]);
247 fGemDigits->AbsorbObjects(fDigiArrays->gem);
248 fSilDigits->AbsorbObjects(fDigiArrays->silicon);
249 fSiBTDigits->AbsorbObjects(fDigiArrays->sibt);
250 fCscDigits->AbsorbObjects(fDigiArrays->csc);
251 fVspDigits->AbsorbObjects(fDigiArrays->vsp);
252 // if (iT0BranchIndex > -1)
253 fTof400Digits->AbsorbObjects(fDigiArrays->tof400);
254 fTof700Digits->AbsorbObjects(fDigiArrays->tof701);
255 // fTof700Digits->AbsorbObjects(fDigiArrays->tof700);
256 _tBuf->DetachBuffer();
257 zmq_msg_close(&_msg);
258 }
259 return 0;
260}
int i
Definition P4_F32vec4.h:22
Int_t ReadEvent(UInt_t i=0)
BmnDecoSource(TString addr="tcp://localhost:5555", Int_t periodId=9)
virtual ~BmnDecoSource()
void SetStartSignalInfo(Double_t time, Double_t width)
UInt_t GetPeriodId()
void SetEventType(BmnEventType event_type)
void SetInputSignalsAR(UInt_t v)
void SetTimeShift(unordered_map< UInt_t, Long64_t > time_shift)
void SetEventTimeTS(TTimeStamp event_time)
void SetTripWord(Bool_t flag)
void SetTrigInfo(BmnTrigInfo &trig_info)
void SetInputSignalsBR(UInt_t v)
void SetEventId(UInt_t event_id)
std::vector< TClonesArray * > * trigAr
Definition DigiArrays.h:144
TClonesArray * gem
Definition DigiArrays.h:126
BmnEventHeader * header
Definition DigiArrays.h:146
TClonesArray * vsp
Definition DigiArrays.h:128
void Clear()
Definition DigiArrays.h:38
TClonesArray * silicon
Definition DigiArrays.h:125
TClonesArray * sibt
Definition DigiArrays.h:141
TClonesArray * tof400
Definition DigiArrays.h:129
TClonesArray * tof701
Definition DigiArrays.h:131
TClonesArray * csc
Definition DigiArrays.h:127
STL namespace.