BmnRoot
Loading...
Searching...
No Matches
BmnMQSource.cxx
Go to the documentation of this file.
1#include "BmnMQSource.h"
2
3#include <signal.h>
4// BmnRoot
5#include "BmnMath.h"
6
7using namespace std;
8
9namespace
10{
11int ReceivedSignal = 0;
12
13void signal_handler(int sig)
14{
15 ReceivedSignal = sig;
16 // fKeepWorking = false;
17 printf("Received signal %d Exiting\n", sig);
18}
19} // namespace
20
21BmnMQSource::BmnMQSource(std::string addr, Bool_t toFile)
22 : _tBuf(nullptr)
23 , _addrString(addr)
24 , fFirstEvent(kTRUE)
25 , fToFile(toFile)
26 // , fRunId(0)
27 , fPeriodId(9)
28 , iEventNumber(0)
29 , fEventHeader(nullptr)
30{}
31
33
35{
36 printf("BmnMQSource::Init()\n");
37 if (!InitZMQ())
38 return kFALSE;
39 zmq_msg_init(&_msg);
40 Int_t frame_size = 0;
41 do {
42 if (ReceivedSignal)
43 return kFALSE;
44 gSystem->ProcessEvents();
45 frame_size = zmq_msg_recv(&_msg, _decoSocket, ZMQ_DONTWAIT);
46 if (frame_size == -1) {
47 // printf("Receive error # %d #%s\n", errno, zmq_strerror(errno));
48 switch (errno) {
49 case EAGAIN:
50 usleep(TimeDelta);
51 break;
52 case EINTR:
53 printf("EINTR\n");
54 printf("Exit!\n");
55 return kFALSE;
56 break;
57 case EFAULT:
58 fprintf(stderr, "Receive error № %d #%s\n", errno, zmq_strerror(errno));
59 return kFALSE;
60 break;
61 default:
62 break;
63 }
64 gSystem->ProcessEvents();
65 }
66 } while ((frame_size <= 0));
67 // printf("Received frame_size = %d\n", frame_size);
68 BmnParts* parts = nullptr;
69 _tBuf->Reset();
70 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
71 parts = static_cast<BmnParts*>(_tBuf->ReadObjectAny(BmnParts::Class()));
72 cout << "TCA len : " << parts->GetArrays().size() << endl;
73 cout << "Obj len : " << parts->GetObjects().size() << endl;
74
75 FairRootManager* ioman = FairRootManager::Instance();
76 for (TClonesArray* ar : parts->GetArrays()) {
77 TClonesArray* newAr = new TClonesArray(ar->GetClass());
78 newAr->SetName(ar->GetName());
79 // printf("Register %30s %30s\n", ar->GetName(), newAr->GetName());
80 ioman->RegisterInputObject(newAr->GetName(), newAr);
81 // ioman->Register(newAr->GetName(), ".", newAr, fToFile);
82 fArrVec.push_back(newAr);
83 }
84 for (TNamed* tn : parts->GetObjects()) {
85 // printf("Register %20s\n", tn->GetName());
86 // printf("ClassName %20s Class_Name %20s GetName %20s\n", tn->ClassName(), tn->Class_Name(),
87 // tn->GetName());
88 // TClass * cl = tn->Class();
89 TNamed* ob = static_cast<TNamed*>(tn->Clone()); // cl->New());
90 ioman->RegisterInputObject(tn->GetName(), ob);
91 fNamVec.push_back(ob);
92
93 if (!strcmp(ob->ClassName(), "BmnEventHeader")) {
94 BmnEventHeader* eh = static_cast<BmnEventHeader*>(ob);
95 fRunId = eh->GetRunId();
96 fPeriodId = eh->GetPeriodId();
97 LOG(debug) << "Set Digi RunID: " << fRunId << endl;
98 }
99 if (!strcmp(ob->ClassName(), "DstEventHeader")) {
100 DstEventHeader* eh = static_cast<DstEventHeader*>(ob);
101 fRunId = eh->GetRunId();
102 fPeriodId = eh->GetPeriodId();
103 LOG(debug) << "Set DST RunID: " << fRunId << endl;
104 }
105 }
106 delete parts;
107 _tBuf->DetachBuffer();
108 zmq_msg_close(&_msg);
109 return kTRUE;
110}
111
113{
114 for (TClonesArray*& ar : fArrVec)
115 if (ar) {
116 ar->Delete();
117 delete ar;
118 }
119 for (TNamed*& ar : fNamVec)
120 if (ar)
121 delete ar;
122 zmq_close(_decoSocket);
123 zmq_ctx_destroy(_ctx);
124 _ctx = NULL;
125 delete _tBuf;
126}
127
129{
130 // printf("ReadEvent(%4u)\n", i);
131 gSystem->IgnoreInterrupt();
132 signal(SIGINT, signal_handler);
133 signal(SIGTERM, signal_handler);
134 zmq_msg_init(&_msg);
135 Int_t frame_size = 0;
136 do {
137 if (ReceivedSignal)
138 return 1;
139 gSystem->ProcessEvents();
140 frame_size = zmq_msg_recv(&_msg, _decoSocket, ZMQ_DONTWAIT);
141 if (frame_size == -1) {
142 // fprintf(stderr, "Receive error zmq# %d #%s\n", zmq_errno(), zmq_strerror(zmq_errno()));
143 Int_t err_num = zmq_errno();
144 switch (err_num) {
145 case EAGAIN:
146 usleep(TimeDelta);
147 return 2;
148 break;
149 case EINTR:
150 printf("EINTR\n");
151 printf("Exit!\n");
152 return 1;
153 break;
154 case EFAULT:
155 fprintf(stderr, "Receive error # %d #%s\n", err_num, zmq_strerror(err_num));
156 return 1;
157 break;
158 default:
159 fprintf(stderr, "Receive error zmq# %d #%s\n", err_num, zmq_strerror(err_num));
160 break;
161 }
162 }
163 } while (frame_size <= 0);
164
165 _tBuf->Reset();
166 _tBuf->SetBuffer(zmq_msg_data(&_msg), zmq_msg_size(&_msg));
167 BmnParts* parts = static_cast<BmnParts*>(_tBuf->ReadObject(BmnParts::Class()));
168 if (parts) {
169 // cout << "TCA len : " << parts->GetArrays().size() << endl;
170 // cout << "Obj len : " << parts->GetObjects().size() << endl;
171 // cout << "Accepted: " << fDigiArrays->header->GetTrigInfo()->GetTrigAccepted() << endl;
172
173 // move result TClonesArray to registered TClonesArray
174 for (UInt_t iAr = 0; iAr < fArrVec.size(); iAr++) {
175 fArrVec[iAr]->Delete();
176 fArrVec[iAr]->AbsorbObjects(parts->GetArrays()[iAr]);
177 // cout << "Count of " << fArrVec[iAr]->GetName() << " digits: " <<
178 // fArrVec[iAr]->GetEntriesFast() << endl;
179 }
180 for (UInt_t iAr = 0; iAr < fNamVec.size(); iAr++) {
181 // printf("ClassName %20s Class_Name %20s GetName %20s\n", fNamVec[iAr]->ClassName(),
182 // fNamVec[iAr]->Class_Name(), fNamVec[iAr]->GetName());
183 if (!strcmp(fNamVec[iAr]->ClassName(), "BmnEventHeader")) {
184 *static_cast<BmnEventHeader*>(fNamVec[iAr]) = *static_cast<BmnEventHeader*>(parts->GetObjects()[iAr]);
185 fEventHeader = static_cast<FairEventHeader*>(fNamVec[iAr]);
186 LOG(debug) << "Digi EventID: " << static_cast<BmnEventHeader*>(fNamVec[iAr])->GetEventId() << endl;
187 }
188 if (!strcmp(fNamVec[iAr]->ClassName(), "DstEventHeader")) {
189 // static_cast<DstEventHeader*>(fNamVec[iAr])
190 // ->CopyFrom(static_cast<DstEventHeader*>(parts->GetObjects()[iAr]));
191 *static_cast<DstEventHeader*>(fNamVec[iAr]) = *static_cast<DstEventHeader*>(parts->GetObjects()[iAr]);
192 fEventHeader = static_cast<FairEventHeader*>(fNamVec[iAr]);
193 // cout << "Object " << fNamVec[iAr]->GetName() << endl;
194 LOG(debug) << "DST EventID: " << static_cast<DstEventHeader*>(fNamVec[iAr])->GetEventId() << endl;
195 }
196 if (!strcmp(fNamVec[iAr]->ClassName(), "CbmVertex")) {
197 static_cast<CbmVertex*>(fNamVec[iAr])->CopyFrom(static_cast<CbmVertex*>(parts->GetObjects()[iAr]));
198 // cout << "VZ " << static_cast<CbmVertex*> (fNamVec[iAr])->GetZ() << endl;
199 }
200 }
201 delete parts;
202 } else
203 fprintf(stderr, "Failed to deserialize the incoming object!\n");
204 _tBuf->DetachBuffer();
205 zmq_msg_close(&_msg);
206 return 0;
207}
208
209void BmnMQSource::FillEventHeader(FairEventHeader* feh)
210{
211 if (fEventHeader) {
212 feh->SetEventTime(fEventHeader->GetEventTime());
213 feh->SetRunId(fEventHeader->GetRunId());
214 feh->SetMCEntryNumber(fEventHeader->GetMCEntryNumber());
215 }
216 LOGF(debug, "feh run id = %d clname %s\n", feh->GetRunId(), feh->ClassName());
217 return;
218}
219
220Bool_t BmnMQSource::InitZMQ()
221{
222 _ctx = zmq_ctx_new();
223 _decoSocket = zmq_socket(_ctx, ZMQ_SUB);
224 uint32_t rcvBuf = 0;
225 size_t vl = sizeof(rcvBuf);
226 if (zmq_getsockopt(_decoSocket, ZMQ_RCVBUF, &rcvBuf, &vl) == -1)
227 DBGERR("zmq_getsockopt of ZMQ_RCVBUF")
228 printf("ZMQ_RCVBUF %u\n", rcvBuf);
229
230 if (zmq_setsockopt(_decoSocket, ZMQ_SUBSCRIBE, NULL, 0) == -1) {
231 // DBGERR("zmq subscribe")
232 fprintf(stderr, "Error subscribing to ZMQ socket: %s\n", strerror(errno));
233 return kFALSE;
234 }
235 if (zmq_connect(_decoSocket, _addrString.c_str()) != 0) {
236 // DBGERR("zmq connect")
237 fprintf(stderr, "Error connecting to ZMQ socket: %s\n", strerror(errno));
238 return kFALSE;
239 } else {
240 printf("Listening to %s\n", _addrString.c_str());
241 }
242 _tBuf = new TBufferFile(TBuffer::kRead);
243 return kTRUE;
244}
int i
Definition P4_F32vec4.h:22
#define DBGERR(a)
Definition BmnMath.h:25
UInt_t GetPeriodId()
virtual ~BmnMQSource()
Int_t ReadEvent(UInt_t i=0)
void FillEventHeader(FairEventHeader *feh)
Bool_t Init()
BmnMQSource(std::string addr="tcp://localhost:6666", Bool_t toFile=kFALSE)
vector< TNamed * > & GetObjects()
Definition BmnParts.h:44
vector< TClonesArray * > & GetArrays()
Definition BmnParts.h:45
UInt_t GetPeriodId()
STL namespace.