BmnRoot
Loading...
Searching...
No Matches
BmnConverter.cxx
Go to the documentation of this file.
1#include "BmnConverter.h"
2// STL
3#include <algorithm>
4#include <arpa/inet.h> /* For ntohl for Big Endian LAND. */
5#include <bitset>
6#include <chrono>
7#include <filesystem>
8#include <iostream>
9#include <mutex>
10#include <unistd.h>
11// ROOT
12#include <ROOT/RDF/RInterface.hxx>
13#include <ROOT/RDataFrame.hxx>
14#include <ROOT/RSnapshotOptions.hxx>
15// BmnRoot
16#include "BmnConverterTools.h"
17#include "BmnRawDataDecoder.h"
18#include "FairLogger.h"
19#include "TStopwatch.h"
20#include "TSystem.h"
21#include "TTree.h"
22#include "UniDetectorParameter.h"
23#include "UniRawFile.h"
24#include "UniRun.h"
25
26namespace fs = std::filesystem;
27
28BmnConverter::BmnConverter(uint32_t threadCount, uint32_t period, TString outDir, bool tempFileOnDisk)
29 : temp_file_on_disk(tempFileOnDisk)
30 , fPeriodId(period)
31 , fRawRunHdrName("RawRunHeader")
32 , fMetadataName("RawRunMetadata")
33 , fRawTreeName("BMN_RAW")
34 , fRawTreeSpillName("BMN_RAW_SPILLS")
35 , fRootDirPath(outDir)
36 // , fRootFileOut(nullptr)
37 , spillHeader(nullptr)
38 , runHeader(make_unique<DigiRunHeader>())
39 , metadata(make_unique<BmnMetadataRaw>())
40 // , fMerger(fMerger)
41 , fExportJsonBlocks(false)
42 , tai_utc_dif(0)
43 , fCurWaitingThread(nullptr)
44{
45 TTree::SetMaxTreeSize(1'000'000'000'000); // 932 GB
46 ROOT::EnableThreadSafety();
47 LOGF(info, ANSI_COLOR_BLUE "Starting converter's constructor" ANSI_COLOR_RESET);
48 InitUTCShift();
49 LOGF(info, "concurrency: %u", std::thread::hardware_concurrency());
50 if (fRootDirPath.Length() == 0)
51 fRootDirPath = fs::current_path();
52 bool dc = fs::create_directories(fRootDirPath.Data());
53 LOGF(info, "Directory: %s was %s", fRootDirPath.Data(), dc ? "created" : "already present");
54 fRootFileName = TString::Format("%s/raw_temp_%d.root", fRootDirPath.Data(), getpid());
55 if (temp_file_on_disk)
56 CreateTempFile();
57
58 if (threadCount == 0) {
59 if (std::thread::hardware_concurrency() <= 1)
60 fThreadCnt = 1; // either 1 core or no info
61 else
62 fThreadCnt = std::thread::hardware_concurrency() - 1;
63 } else
64 fThreadCnt = threadCount;
66 LOGF(info, "Creating %u%s", fThreadCnt, (fThreadCnt == 1 ? " converter thread" : " converter threads"));
67 for (uint32_t i = 0; i < fThreadCnt; i++) {
69 tmp->SetPeriod(fPeriodId);
70 if (temp_file_on_disk)
71 tmp->SetMerger(fMerger.get());
72 tmp->CreateTrees();
73 tmp->RegisterBranches();
75 }
76 LoadConfig();
77 // bool convertMSC = conf.get<bool>("Decoder.ProcessMSC", false);
78 // if (convertMSC) {
79 /*fRawTreeSpills = new TTree("BMN_RAW_SPILLS", "BMN_RAW_SPILLS");
80 auto spill_reg_fun = [&] (TString name, TObject * ar) -> void {
81 TBranch *b = fRawTreeSpills->Branch(name, &ar);
82 LOGF(debug1, "Register branch %p : %s", (void*) b, name.Data());
83 return;
84 };*/
85 /*for(int i = 0; i < fThreadCnt; i++)
86 fThreads->GetThread(i)->RegisterSpillBranches();
87}*/
88 LOGF(info, "Created %u%s", fThreadCnt, (fThreadCnt == 1 ? " converter thread" : " converter threads"));
89 LOGF(info, ANSI_COLOR_BLUE "Finished creating converter\n" ANSI_COLOR_RESET);
90}
91
92BmnStatus BmnConverter::LoadConfig()
93{
94 if (fDecoderConfigFileName.length() == 0)
95 fDecoderConfigFileName = string(getenv("VMCWORKDIR")) + "/config/bmnconf.json";
96 pt::read_json(fDecoderConfigFileName, conf);
97 fExportJsonBlocks = conf.get<bool>("Decoder.ExportJson", false);
98 bool ExportExternalSpillStat = conf.get<bool>("Decoder.ExportExternalSpillStat", false);
99 for (uint32_t i = 0; i < fThreadCnt; i++) {
101 tmp->SetExportExternalSpillStat(ExportExternalSpillStat);
102 }
103 return kBMNSUCCESS;
104}
105
106BmnStatus BmnConverter::CreateTempFile()
107{
108 LOGF(info, "Creating temporary output root file");
109 fRootFileName = TString::Format("%s/raw_temp_%u.root", fRootDirPath.Data(), fRunId); //, getpid());
110 fMerger = make_unique<ROOT::TBufferMerger>(fRootFileName.Data());
111 // fRootFileOut = new TFile(fRootFileName, "recreate");
112 if (!fMerger) {
113 // if (!fRootFileOut->IsOpen()) {
114 // delete fRootFileOut;
115 LOGF(error, "Couldn't access the output file!!!!!\n");
116 return kBMNERROR;
117 }
118 LOGF(info, "Created temporary output root file");
119 return kBMNSUCCESS;
120}
121
122Int_t BmnConverter::GetSpillNumber(vector<SysPoint>* spillEnds, vector<SysPoint>* spillStarts, SysPoint time)
123{
124 auto t = std::upper_bound(spillEnds->begin(), spillEnds->end(), time);
125 if (t == spillEnds->end()) {
126 return -1; // currently no spill end;
127 } else {
128 if (time < (*spillStarts)[t - spillEnds->begin()]) {
129 return -2;
130 }
131 return t - spillEnds->begin();
132 }
133}
134
135Int_t BmnConverter::GetSpillNumber(vector<SysPoint>* spillEnds, vector<SysPoint>* spillStarts, TTimeStamp time)
136{
138 return GetSpillNumber(spillEnds, spillStarts, p);
139}
140
142{
143 fThreads->Finish();
145 for (UInt_t i = 0; i < fThreadCnt; i++) {
146 LOGF(info, "WriteTree for thread %u", i);
147 tmp = fThreads->GetThread(i);
148 // tmp->WriteTree();
149 // tmp->WriteTreeSpills();
150 tmp->FinalizeFile();
151 }
152 {
153 auto file = fMerger->GetFile();
154 file->WriteObject(runHeader.get(), fRawRunHdrName.Data());
155 file->WriteObject(metadata.get(), fMetadataName.Data());
156 file->Write();
157 file->Close();
158 file.reset();
159 }
160 LOGF(info, "main thread objects written");
161 // LOGF(info, "main thread file closed");
163 fMerger.reset();
164 // LOGF(info, "fMerger reset");
165 // {
166 // TFile file(fRootFileName, "update");
167 // TTree* ev_tree = static_cast<TTree*>(file.Get(fRawTreeName));
168 // Int_t build_status = ev_tree->BuildIndex("BmnEventHeader.fEventId");
169 // LOGF(info, "Building tree index: %d", build_status);
170 // ev_tree->Write();//"", TObject::kOverwrite);
171 // file.Close();
172 // }
173}
174
176{
177 LOGF(info, ANSI_COLOR_BLUE "Starting to write spills" ANSI_COLOR_RESET);
178 TStopwatch* timer = new TStopwatch();
179 timer->Start();
180 ROOT::EnableImplicitMT(2);
181 LOGF(info, "Preparing temporary file for read access");
182 OutputTrees();
183 LOGF(info, "Trees written");
184 TFile* fRootFileOut = new TFile(fRootFileName, "read");
185 if (!fRootFileOut->IsOpen()) {
186 delete fRootFileOut;
187 LOGF(error, "Couldn't access the temp file!!!!!\n");
188 return;
189 }
190 for (UInt_t i = 0; i < fThreadCnt; i++) {
191 fThreads->GetThread(i)->SetTrees((TTree*)(fRootFileOut->Get(fRawTreeName + to_string(i))),
192 (TTree*)(fRootFileOut->Get(fRawTreeSpillName + to_string(i))));
193 }
194 set<int> spillCheck;
195 for (size_t i = 0; i < vecTree.size(); i++)
196 spillCheck.insert(vecTree[i].first);
197 for (size_t i = 0; i < vecTreeSpills.size(); i++)
198 spillCheck.insert(vecTreeSpills[i].first);
199 LOGF(info, "Starting to write %d spills", spillCheck.size());
200 for (auto t : spillCheck) {
201 {
202 ROOT::TBufferMerger merger(to_string(fRunId) + (TString) "_s" + to_string(t) + ".root", "recreate");
204 for (UInt_t i = 0; i < fThreadCnt; i++) {
205 tmp = fThreads->GetWaitingThread();
206 tmp->SetMerger(&merger);
207 tmp->SetData(3, t);
208 tmp->Execute();
209 }
210 fThreads->Finish();
211 }
212 TFile file(to_string(fRunId) + (TString) "_s" + to_string(t) + ".root", "update");
213 file.WriteObject(runHeader.get(), fRawRunHdrName.Data());
214 file.WriteObject(metadata.get(), fMetadataName.Data());
215 file.Close();
216 if (t == -2)
217 LOGF(info, "Finished writing weird events (spill %d)", t);
218 else if (t == -1)
219 LOGF(info, "Finished writing unfinished spill (spill %d)", t);
220 else
221 LOGF(info, "Finished writing spill %d", t);
222 }
223 ROOT::DisableImplicitMT();
224 fRootFileOut->Close();
225 Close();
226 timer->Stop();
227 LOGF(info, ANSI_COLOR_BLUE "Finished writing spills in %.3fs\n" ANSI_COLOR_RESET, timer->RealTime());
228}
229
231{
232 fThreads->Finish();
233 LOGF(info, ANSI_COLOR_BLUE "Starting to split events by spills" ANSI_COLOR_RESET);
234 TStopwatch* timer = new TStopwatch();
235 timer->Start();
236 vector<SysPoint> spillEnds;
237 vector<SysPoint> spillStarts;
238 for (auto t : fSpillMap) {
239 spillStarts.push_back(t.second.start_ts);
240 spillEnds.push_back(t.second.stop_ts);
241 LOGF(info, "Spill:\n\tstart ts:%s\n\tstop ts:%s", BmnFunctionSet::TimePoint2String(t.second.start_ts),
242 BmnFunctionSet::TimePoint2String(t.second.stop_ts));
243 }
244 std::sort(spillEnds.begin(), spillEnds.end());
245 // for (size_t i = 0; i < spillEnds.size(); i++) {
246 // LOGF(info, "Spill end detected at: %s", spillEnds[i].AsString());
247 // }
248 vector<TTree*> tmp;
249 for (UInt_t i = 0; i < fThreadCnt; i++) {
250 tmp.push_back(fThreads->GetThread(i)->fRawTree);
251 tmp.push_back(fThreads->GetThread(i)->fRawTreeSpills);
252 }
253 vecTree.clear();
254 vecTreeSpills.clear();
255 for (size_t i = 0; i < tmp.size(); i += 2) {
256 int n = tmp[i]->GetEntriesFast();
257 TBranch* branch = tmp[i]->GetBranch("BmnEventHeader.");
258 for (int j = 0; j < n; j++) {
259 branch->GetEntry(j);
260 vecTree.push_back(make_pair(
261 GetSpillNumber(&spillEnds, &spillStarts, fThreads->GetThread(i / 2)->eventHeaderDAQ->GetEventTimeTS()),
262 make_pair(i / 2, j)));
263 }
264 }
265 for (size_t i = 1; i < tmp.size(); i += 2) {
266 int n = tmp[i]->GetEntriesFast();
267 TBranch* branch = tmp[i]->GetBranch("MSC");
268 TBranch* branchT0 = tmp[i]->GetBranch("T0Raw");
269 for (int j = 0; j < n; j++) {
271 SysPoint ts{SysPoint::min()};
272 branch->GetEntry(j);
273 if (th->msc->GetEntriesFast() > 0) {
274 ts = ((BmnMSCDigit<UChar_t>*)(th->msc->First()))->GetTime();
275 LOGF(debug2, "branchMSC ts\t%s", BmnFunctionSet::TimePoint2String(ts));
276 } else {
277 branchT0->GetEntry(j);
278 if (th->t0raw->GetEntriesFast() > 0) {
279 ts = ((BmnT0Raw<kT0_BIN_BLOCK_WORDS>*)(th->t0raw->First()))->GetTime();
280 LOGF(debug2, "branchT0 ts\t%s", BmnFunctionSet::TimePoint2String(ts));
281 ts -= 1s; // temp crutches to fit into the spill time
282 } else
283 continue;
284 }
285 vecTreeSpills.push_back(make_pair(GetSpillNumber(&spillEnds, &spillStarts, ts), make_pair(i / 2, j)));
286 LOGF(debug2, "GetSpillNumber %d", GetSpillNumber(&spillEnds, &spillStarts, ts));
287 }
288 }
289 for (UInt_t i = 0; i < fThreadCnt; i++) {
292 }
293 std::sort(vecTree.begin(), vecTree.end());
294 std::sort(vecTreeSpills.begin(), vecTreeSpills.end());
295 runHeader->SetSpillMap(fSpillMap);
296 timer->Stop();
297 LOGF(info, ANSI_COLOR_BLUE "Finished splitting events by spills in %.3fs\n" ANSI_COLOR_RESET, timer->RealTime());
298}
299
301{
302 fThreads->Finish();
303 LOGF(info, "Starting separating events by spill");
304 TStopwatch* timer = new TStopwatch();
305 timer->Start();
306
307 treeTimeMap.clear();
308 treeTimeMapDiv.clear();
309
310 vector<SysPoint> spillEnds;
311 vector<SysPoint> spillStarts;
312 for (auto t : fSpillMap) {
313 spillStarts.push_back(t.second.start_ts);
314 spillEnds.push_back(t.second.stop_ts);
315 LOGF(info, "Spill:\n\tstart ts: %s\n\tstop ts: %s", BmnFunctionSet::TimePoint2String(t.second.start_ts),
316 BmnFunctionSet::TimePoint2String(t.second.stop_ts));
317 }
318 std::sort(spillEnds.begin(), spillEnds.end());
319 for (size_t i = 0; i < spillEnds.size(); i++) {
320 LOGF(info, "Spill end detected at: %s", BmnFunctionSet::TimePoint2String(spillEnds[i]));
321 }
322
323 for (uint32_t i = 0; i < fThreadCnt; i++) {
325 TTree* tree = th->fRawTree;
326 int64_t n = tree->GetEntriesFast();
327 TBranch* branch = tree->GetBranch("BmnEventHeader.");
328 for (int64_t iEntry = 0; iEntry < n; iEntry++) {
329 branch->GetEntry(iEntry);
330 // treeTimeMap.emplace(th->eventHeaderDAQ->GetEventTimeTP(), make_pair(tree, iEntry));
332 int32_t sn = GetSpillNumber(&spillEnds, &spillStarts, tp);
333 treeTimeMapDiv[sn].emplace(tp, make_pair(tree, iEntry));
334 }
335 }
336 for (const auto& [spill_id, mm] : treeTimeMapDiv) {
337 // const int32_t& spill_id = pmm.first;
338 // const auto& mm = pmm.second;
339 LOGF(info, "t %d", spill_id);
340 TFile file(to_string(fRunId) + (TString) "_s" + to_string(spill_id) + ".root", "recreate");
341 TTree* sortedTree = mm.begin()->second.first->CloneTree(0);
342 sortedTree->SetAutoSave(0);
343 sortedTree->SetAutoFlush(0);
344 sortedTree->SetBasketSize("*", 1073741824);
345 if (!file.IsOpen()) {
346 LOGF(error, "Couldn't access the output file!!!!!\n");
347 break; // return kBMNERROR;
348 }
349 LOGF(info, "TFile open");
350 for (const pair<const SysPoint, TreePE>& el : mm) {
351 const TreePE& pe = el.second;
352 pe.first->GetEntry(pe.second);
353 sortedTree->Fill();
354 }
355 file.WriteObject(runHeader.get(), fRawRunHdrName.Data());
356 file.WriteObject(metadata.get(), fMetadataName.Data());
357 file.Close();
358 if (spill_id == -2)
359 LOGF(info, "Finished writing weird events (spill %d)", spill_id);
360 else if (spill_id == -1)
361 LOGF(info, "Finished writing unfinished spill (spill %d)", spill_id);
362 else
363 LOGF(info, "Finished writing spill %d", spill_id);
364 }
365
366 Close();
367 timer->Stop();
368 LOGF(info, ANSI_COLOR_BLUE "Finished splitting events by spills in %.3fs\n" ANSI_COLOR_RESET, timer->RealTime());
369}
370
371void BmnConverter::ReproduceBranches(TTree* inTree, TTree* outTree)
372{
373 TObjArray* brList = inTree->GetListOfBranches(); // ioman->GetBranchNameList();
374 if (inTree->GetEntry(0) == 0) {
375 LOGF(error, "Input tree is empty! Recreating empty output...");
376 // for (Int_t i = 0; i < brList->GetEntries(); i++) {
377 // TString str = static_cast<TObjString*> (brList->At(i))->GetString();
378 // TObject *obj = ioman->GetObject(str);
379 // TClass *cl = TClass::GetClass(obj->ClassName());
381 // if (cl == TClonesArray::Class()) {
382 // TClonesArray * inTCA = static_cast<TClonesArray*> (obj);
383 // TClonesArray * newTCA = new TClonesArray(inTCA->GetClass());
384 // fTreeTemp->Branch(str.Data(), &newTCA);
385 // fArrVecIn.push_back(inTCA);
386 // fArrVec.push_back(newTCA);
387 //
388 // } else {
389 // TNamed* inObj = static_cast<TNamed *> (obj);
391 // TObject* workObj = inObj->Clone(/*TString(inObj->GetName()) + "_clone." + str*/)/*funcNew(0)*/;
393 // /*TBranch *b = */fTreeTemp->Branch(str.Data(), "TObject", workObj);
395 // fNamVecIn.push_back(inObj);
396 // fNamVec.push_back(workObj);
397 // }
398 // return;
399 }
400 for (Int_t i = 0; i < brList->GetEntries(); i++) {
401 TBranch* br = static_cast<TBranch*>(brList->At(i));
402 TString str(br->GetName());
403 TObject* obj = *(reinterpret_cast<TObject**>(br->GetAddress()));
404 LOGF(debug, "obj->ClassName() %s", obj->ClassName());
405 TClass* cl = TClass::GetClass(obj->ClassName());
406 LOGF(debug, "ClassName: %s", br->ClassName());
407 if (cl == TClonesArray::Class()) {
408 TClonesArray* inTCA = static_cast<TClonesArray*>(obj);
409 LOGF(debug, "TCA->GetClass()->GetName() %s", inTCA->GetClass()->GetName());
410 outTree->Branch(str.Data(), &inTCA);
411 fArrVec.push_back(inTCA);
412
413 } else {
414 TNamed* inObj = static_cast<TNamed*>(obj);
415 outTree->Branch(str.Data(), "TObject", inObj);
416 fNamVec.push_back(inObj);
417 }
418 }
419 return;
420}
421
422BmnStatus BmnConverter::FeedEvent(UInt_t* buf, UInt_t len, Int_t type)
423{
424 fEventId = buf[0];
425 if (type == 1) {
426 fCurEvType = kBMNPAYLOAD;
427 QuickProcessEvent(buf, len);
428 return kBMNSUCCESS;
429 } else if (type == 2) {
430 fCurEvType = kBMNSTAT;
431 QuickConvertStatEvent(buf, len);
432 return kBMNSUCCESS;
433 } else
434 return kBMNERROR;
435}
436
437BmnStatus BmnConverter::FeedFile(TString name, Bool_t getRunId, Bool_t getSubName)
438{
439 if (getRunId)
441 if (getSubName)
443 fRawFileIn = fopen(name, "rb");
444 if (fRawFileIn == nullptr) {
445 LOGF(error, "Couldn't open file %s!!!!!\n", name.Data());
446 return kBMNERROR;
447 }
448 LOGF(info, ANSI_COLOR_BLUE "Starting to convert file %s" ANSI_COLOR_RESET, name.Data());
449 LOGF(info, "Period ID: %u, Run ID: %u, Subname: %s", fPeriodId, fRunId, fSubName.Data());
450 // if (!fRootFileOut)
451 // if (CreateTempFile() == kBMNERROR)
452 // return kBMNERROR;
453 for (UInt_t i = 0; i < fThreadCnt; i++)
455 TStopwatch* timer = new TStopwatch();
456 timer->Start();
457 while ((ConvertRawToRootIterateFileRead() != kBMNERROR)) {
458 // while ((ConvertRawToRootIterateFileReadFaster() != kBMNERROR)) {
459 // if ((fStartEventId == 0) && (fEventId))
460 // fStartEventId = fEventId;
461 // if (cc++>10000)
462 // break;
463 }
464 // LOGF(info,"tq size %ld ",fThreads->threadQueue.size());
465 if (fCurWaitingThread) { // crutch
466 fCurWaitingThread->CancelReady();
467 fCurWaitingThread = nullptr;
468 }
469 // LOGF(info,"tq size %ld ",fThreads->threadQueue.size());
470 fThreads->Finish();
471 timer->Stop();
472 LOGF(info, ANSI_COLOR_BLUE "Converted file %s in %.3fs\n" ANSI_COLOR_RESET, name.Data(), timer->RealTime());
473 std::fclose(fRawFileIn);
474 return kBMNSUCCESS;
475}
476
477BmnStatus BmnConverter::ConvertRawToRootIterateFileRead()
478{
479 // TStopwatch sw;
480 // sw.Start();
481 // printf("fMaxEvent %lu, fNSignalEvents %u, fMaxEvent %lu\n", fMaxEvent, fNSignalEvents, fMaxEvent);
482 fCurEvType = kBMNEMPTY;
483 // if (fMaxEvent > 0 && fNSignalEvents >= fMaxEvent) return kBMNERROR;
484 if (fread(&fDat, kWORDSIZE, 1, fRawFileIn) != 1)
485 return kBMNERROR;
486 // fCurentPositionRawFile = ftello64(fRawFileIn);
487 // if (fCurentPositionRawFile >= fLengthRawFile) return kBMNERROR;
488 switch (fDat) {
489 case SYNC_EVENT:
490 case SYNC_EVENT_OLD:
491 fCurEvType = kBMNPAYLOAD;
492 LOGF(debug2, ANSI_COLOR_BLUE "SYNC_EVENT %08X" ANSI_COLOR_RESET, fDat);
493 // read number of bytes in event
494 if (fread(&fDat, kWORDSIZE, 1, fRawFileIn) != 1)
495 return kBMNERROR;
496 if (fDat % kNBYTESINWORD) {
497 LOGF(error, "WTF?? in the main event: fDat == %u", fDat);
498 }
499 fDat = fDat / kNBYTESINWORD + (fPeriodId <= 7 ? 1 : 0); // bytes --> words
500 // printf("ev length %d\n", fDat);
501 // read array of current event data and process them
502 if (fread(data, kWORDSIZE, fDat, fRawFileIn) != fDat)
503 return kBMNERROR;
504 fEventId = data[0];
505 LOGF(debug, ANSI_COLOR_BLUE "iEv = %u" ANSI_COLOR_RESET, fEventId);
506 if (fEventId <= 0) {
507 return kBMNCONTINUE; // skip bad events
508 }
509 QuickProcessEvent(data, fDat);
510 // ProcessEvent(data, fDat);
511
512 // LOGF(trace, "evType %d", fCurEvType);
513 // LOGF(debug, "eventHeaderDAQ %p", (void*) eventHeaderDAQ);
514 // LOGF(debug1, "isSpillStart %d", isSpillStart);
515 // if (isSpillStart == kTRUE)
516 // isSpillStart = kFALSE;
517 // fNTotalEvents++;
518 // fNSignalEvents++;
519 // nSpillEvents++;
520 break;
521 case SYNC_EOS:
522 fCurEvType = kBMNEOS;
523 case SYNC_STAT:
524 LOGF(debug2, ANSI_COLOR_BLUE "SYNC_STAT %08X" ANSI_COLOR_RESET, fDat);
525 if (fDat == SYNC_STAT)
526 fCurEvType = kBMNSTAT;
527 // read number of bytes in event
528 if (fread(&fDat, kWORDSIZE, 1, fRawFileIn) != 1)
529 return kBMNERROR;
530 if (fDat % kNBYTESINWORD) {
531 LOGF(error, "WTF?? in the STAT: fDat == %u", fDat);
532 }
533 LOGF(debug3, "STAT ev length bytes %d", fDat);
534 fDat = fDat / kNBYTESINWORD + (fPeriodId <= 7 ? 1 : 0); // bytes --> words
535 // read array of current event data and process them
536 if (fread(data, kWORDSIZE, fDat, fRawFileIn) != fDat)
537 return kBMNERROR;
538 // ConvertStatEvent(data, fDat);
539 QuickConvertStatEvent(data, fDat);
540 break;
541 case SYNC_RUN_START:
542 LOGF(debug, "RUN START");
543 case SYNC_RUN_STOP:
544 if (fDat == SYNC_RUN_STOP)
545 LOGF(debug, "RUN STOP");
546 if (fread(&fDat, kWORDSIZE, 1, fRawFileIn) != 1)
547 return kBMNERROR;
548 if (fread(data, 1, fDat, fRawFileIn) != fDat)
549 return kBMNERROR;
550 BmnConverterTools::ParseComplexTLV(data, fDat, fRunId);
551 break;
552 case SYNC_FILE_BEGIN:
553 LOGF(debug, "FILE BEGIN");
554 case SYNC_FILE_END:
555 if (fDat == SYNC_FILE_END)
556 LOGF(debug, "FILE END");
557 if (fread(&fDat, kWORDSIZE, 1, fRawFileIn) != 1)
558 return kBMNERROR;
559 if (fread(data, 1, fDat, fRawFileIn) != fDat)
560 return kBMNERROR;
561 BmnConverterTools::ParseComplexTLV(data, fDat, fRunId);
562 break;
563 case SYNC_JSON:
564 if (fread(&fDat, kWORDSIZE, 1, fRawFileIn) != 1)
565 return kBMNERROR;
566 LOGF(debug, "SYNC JSON len %u", fDat);
567 if (fread(data, 1, fDat, fRawFileIn) != fDat)
568 return kBMNERROR;
569 ParseJsonTLV(data, fDat);
570 break;
571 default:
572 LOGF(error, "unrecognized sync %08X", fDat);
573 break;
574 }
575 return kBMNSUCCESS;
576}
577
578// BmnStatus BmnConverter::FeedSpill(TString name, Bool_t getRunId, Bool_t getSubName)
579//{ // Can be rewritten to work in multiple threads
580// if (getRunId)
581// fRunId = BmnRawDataDecoder::GetRunIdFromFile(name);
582// if (getSubName)
583// fSubName = BmnRawDataDecoder::GetSubNameFromFile(name);
584// TFile* fileIn = new TFile(name, "read");
585// if (fileIn == nullptr) {
586// LOGF(error, "Couldn't open file %s!!!!!\n", name.Data());
587// return kBMNERROR;
588// }
589// LOGF(info, ANSI_COLOR_BLUE "Starting to recover spill file %s" ANSI_COLOR_RESET, name.Data());
590// LOGF(info, "Period ID: %u, Run ID: %u, Subname: %s", fPeriodId, fRunId, fSubName.Data());
591// if (!fRootFileOut->IsOpen()) {
592// delete fRootFileOut;
593// LOGF(error, "Couldn't access the output file!!!!!\n");
594// return kBMNERROR;
595// }
596// TTree* tree = (TTree*)(fileIn->Get("BMN_RAW"));
597// TTree* treeSpills = (TTree*)(fileIn->Get("BMN_RAW_SPILLS"));
598// if (!tree || !treeSpills) {
599// LOGF(error, "Couldn't find needed trees in the spill file\n");
600// return kBMNERROR;
601// }
602// TStopwatch* timer = new TStopwatch();
603// timer->Start();
604// fThreads->Finish();
605// Int_t sigN = tree->GetEntriesFast(), statN = treeSpills->GetEntriesFast();
606// Int_t totalSigN = 0, totalStatN = 0;
607// for (UInt_t i = 0; i < fThreadCnt; i++) {
608// BmnConverterThread* tmp = fThreads->GetThread(i);
609// Int_t curSigN = sigN / fThreadCnt + (sigN % fThreadCnt > i ? 1 : 0);
610// Int_t curStatN = statN / fThreadCnt + (statN % fThreadCnt > i ? 1 : 0);
611// tree->SetBranchAddress("BmnEventHeader.", &(tmp->eventHeaderDAQ));
612// tree->SetBranchAddress("SYNC", &(tmp->sync));
613// tree->SetBranchAddress("ADC32", &(tmp->adc32));
614// tree->SetBranchAddress("ADC64", &(tmp->adc64));
615// tree->SetBranchAddress("ADC128", &(tmp->adc128));
616// tree->SetBranchAddress("ADC", &(tmp->adc));
617// tree->SetBranchAddress("TDC", &(tmp->tdc));
618// tree->SetBranchAddress("TQDC_ADC", &(tmp->tqdc_adc));
619// tree->SetBranchAddress("TQDC_TDC", &(tmp->tqdc_tdc));
620// tree->SetBranchAddress("HRB", &(tmp->hrb));
621// treeSpills->SetBranchAddress("MSC", &(tmp->msc));
622// treeSpills->SetBranchAddress("T0Raw", &(tmp->t0raw));
623// for (int j = 0; j < curSigN; j++) {
624// tree->GetEntry(totalSigN++);
625// tmp->fRawTree->Fill();
626// }
627// for (int j = 0; j < curStatN; j++) {
628// treeSpills->GetEntry(totalStatN++);
629// tmp->fRawTreeSpills->Fill();
630// }
631// LOGF(info, "Finished copying events to thread %d", i);
632// }
633// fileIn->Close();
634// fRootFileOut->cd();
635// timer->Stop();
636// LOGF(info, ANSI_COLOR_BLUE "Recovered spill file %s in %.3fs\n" ANSI_COLOR_RESET, name.Data(),
637// timer->RealTime()); return kBMNSUCCESS;
638// }
639
640void BmnConverter::QuickConvertStatEvent(UInt_t* d, UInt_t& len)
641{
643 tmp->SetData(2, len, d, fCurEvType, fRunId, fEventId);
644 tmp->Execute();
645}
646
647void BmnConverter::QuickProcessEvent(UInt_t* d, UInt_t& len)
648{
650 tmp->SetData(1, len, d, fCurEvType, fRunId, fEventId);
651 tmp->Execute();
652}
653
654void BmnConverter::QuickConvertStatEvent(BmnConverterThread*& tmp, UInt_t& len)
655{
656 // LOGF(info, "QuickConvertStatEvent thread %d", tmp->GetId());
657 tmp->SetData(2, len, nullptr, fCurEvType, fRunId, fEventId);
658 tmp->Execute();
659 tmp = nullptr;
660 // LOGF(info, "QuickConvertStatEvent thread %d done", tmp->GetId());
661}
662
663void BmnConverter::QuickProcessEvent(BmnConverterThread*& tmp, UInt_t& len)
664{
665 // LOGF(info, "QuickProcessEvent thread %d", tmp->GetId());
666 tmp->SetData(1, len, nullptr, fCurEvType, fRunId, fEventId);
667 tmp->Execute();
668 // LOGF(info, "QuickProcessEvent thread %d done", tmp->GetId());
669 tmp = nullptr;
670}
671
673{
674 try {
675 UInt_t idx = 0;
676 idx++; // skip evId (it is 0)
677 DeviceHeader* dh = reinterpret_cast<DeviceHeader*>(d + idx);
678 idx += sizeof(DeviceHeader) / kNBYTESINWORD;
679 // dh->Print();
680 UInt_t str_len = len - idx * kNBYTESINWORD;
681 string str(reinterpret_cast<const char*>(d + idx), str_len);
682 json j = json::parse(str);
683 // cout << "json type : " << j.type_name() << endl;
684 // Int_t key_cnt(0);
685 for (json::iterator it = j.begin(); it != j.end(); ++it) {
686 // std::cout << "["<< key_cnt <<"] : "<< it.key() << '\n';
687 string jkey(it.key());
688 // printf("j[%d] : %s\n", key_cnt++, jkey.c_str());
689 if (!jkey.compare("status")) {
690 auto& j_runtime = it.value()["runTime"];
691 auto j_spill = j_runtime["spill"];
692 Int_t phase = static_cast<Int_t>(j_spill["phase"]);
693 // UInt_t curSpillTimeMs = j_spill["curSpillTimeMs"];
694 auto wr = j_runtime["time"]["WR"];
695 Int_t ns = static_cast<Int_t>(wr["ns"]);
696 Int_t sec = static_cast<Int_t>(wr["sec"]);
697 if (tai_utc_dif == 0)
698 tai_utc_dif = GetUTCShift(TTimeStamp(time_t(sec), ns)); // crutch
699 sec -= tai_utc_dif;
700 SpillStatus ss = BmnConverterTools::ParseJsonStatus(it.value(), tai_utc_dif);
701 // skip invalid timestamps
702 if (!ss.times_valid)
703 continue;
704 // if (ts_spill_start > eventHeaderDAQ->GetSpillStartTS()) {
705 // eventHeaderDAQ->SetSpillStartTS(ts_spill_start); // @TODO: rewrite with basing on
706 // spill map eventHeaderDAQ->GetSpillId()++; LOGF(debug, "spill_id %d",
707 // eventHeaderDAQ->GetSpillId());
708 // }
709 LOGF(info, "JSON status phase %d", phase);
710 LOGF(info, "ts :\t%s start", BmnFunctionSet::TimePoint2String(ss.start_ts));
711 if (phase == 0)
712 LOGF(info, "ts :\t%s stop", BmnFunctionSet::TimePoint2String(ss.stop_ts));
713 auto spill_it = fSpillMap.find(ss.start_ts);
714 if (spill_it == fSpillMap.end()) {
715 bool inserted;
716 std::tie(spill_it, inserted) = fSpillMap.insert(
717 make_pair(ss.start_ts,
718 BmnSpillInfo{// .jsons = vector<json>(1, it.value()),
719 .start_ts = ss.start_ts,
720 .stop_ts = ss.start_ts}));
721 LOGF(debug, "spill record inserted %d", inserted);
722 }
723 BmnSpillInfo& info = spill_it->second;
724 // info.jsons.push_back(it.value());
725 if (info.stop_ts < ss.stop_ts)
726 info.stop_ts = ss.stop_ts;
727 if (fExportJsonBlocks) {
728 std::ofstream outfile(Form("j_%s_%d_%9d_%09d_%02X_%08X_phase_%d.json", jkey.c_str(), fRunId, sec,
729 ns, dh->DeviceId, dh->Serial, phase),
730 std::ofstream::binary);
731 outfile << std::setw(4) << j << std::endl;
732 outfile.close();
733 }
734 metadata->SpillStatusVec().push_back(move(ss));
735 } else {
736 LOGF(info, "JSON config");
737 if (!jkey.compare("config")) {
738 string start_DT = it.value()["meta"]["startDateTime"];
739 if (fExportJsonBlocks) {
740 std::ofstream outfile(Form("j_%s_%d_%s_%02X_%08X.json", jkey.c_str(), fRunId, start_DT.data(),
741 dh->DeviceId, dh->Serial),
742 std::ofstream::binary);
743 outfile << std::setw(4) << j << std::endl;
744 outfile.close();
745 }
746 BmnStatus jconfigStatus =
747 BmnConverterTools::ParseJsonConfig(it.value(), runHeader->GetTrigConfig());
748 if (jconfigStatus == kBMNERROR)
749 LOGF(error, "Parsing JSON config failed!");
750 } else
751 LOGF(info, "Unknown json key %s", jkey.data());
752 }
753 }
754 } catch (std::exception& ex) {
755 LOGF(error, "Exception for JSON block: %s", ex.what());
756 }
757 return kBMNSUCCESS;
758}
759
760BmnStatus BmnConverter::InitUTCShift()
761{
762 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1972, 1, 1, 0, 0, 9), 10));
763 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1972, 7, 1, 0, 0, 10), 11));
764 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1973, 1, 1, 0, 0, 11), 12));
765 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1974, 1, 1, 0, 0, 12), 13));
766 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1975, 1, 1, 0, 0, 13), 14));
767 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1976, 1, 1, 0, 0, 14), 15));
768 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1977, 1, 1, 0, 0, 15), 16));
769 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1978, 1, 1, 0, 0, 16), 17));
770 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1979, 1, 1, 0, 0, 17), 18));
771 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1980, 1, 1, 0, 0, 18), 19));
772 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1981, 7, 1, 0, 0, 19), 20));
773 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1982, 7, 1, 0, 0, 20), 21));
774 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1983, 7, 1, 0, 0, 21), 22));
775 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1985, 7, 1, 0, 0, 22), 23));
776 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1988, 1, 1, 0, 0, 23), 24));
777 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1990, 1, 1, 0, 0, 24), 25));
778 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1991, 1, 1, 0, 0, 25), 26));
779 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1992, 7, 1, 0, 0, 26), 27));
780 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1993, 7, 1, 0, 0, 27), 28));
781 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1994, 7, 1, 0, 0, 28), 29));
782 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1996, 1, 1, 0, 0, 29), 30));
783 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1997, 7, 1, 0, 0, 30), 31));
784 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1999, 1, 1, 0, 0, 31), 32));
785 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2006, 1, 1, 0, 0, 32), 33));
786 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2009, 1, 1, 0, 0, 33), 34));
787 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2012, 7, 1, 0, 0, 34), 35));
788 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2015, 7, 1, 0, 0, 35), 36));
789 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2017, 1, 1, 0, 0, 36), 37));
790 utc_valid = TTimeStamp(2026, 1, 1, 0, 0, 1);
791 return kBMNSUCCESS;
792}
793
794Int_t BmnConverter::GetUTCShift(TTimeStamp t)
795{
796 if (t < leaps.begin()->first) {
797 LOGF(warning, "Wrong time! %s", t.AsString());
798 return 0;
799 }
800 if (t > utc_valid)
801 LOGF(warning, "Warning! Leap seconds table expired!");
802 Int_t shift = 0;
803 auto it = leaps.lower_bound(t);
804 if ((it == leaps.end()))
805 it--;
806 else if (it->first > t)
807 it--;
808 shift = it->second;
809 return shift;
810}
const Float_t d
Z-ccordinate of the first GEM-station.
Definition BmnMwpcHit.cxx:7
int i
Definition P4_F32vec4.h:22
BmnStatus
Definition BmnEnums.h:24
@ kBMNERROR
Definition BmnEnums.h:26
@ kBMNSUCCESS
Definition BmnEnums.h:25
@ kBMNCONTINUE
Definition BmnEnums.h:29
@ kBMNSTAT
Definition BmnEnums.h:73
@ kBMNEOS
Definition BmnEnums.h:72
@ kBMNPAYLOAD
Definition BmnEnums.h:71
@ kBMNEMPTY
Definition BmnEnums.h:74
std::chrono::time_point< SysClock > SysPoint
#define ANSI_COLOR_RESET
Definition BmnMath.h:18
#define ANSI_COLOR_BLUE
Definition BmnMath.h:17
const uint32_t SYNC_RUN_START
Definition RawTypes.h:10
const uint32_t SYNC_EVENT
Definition RawTypes.h:5
const uint32_t SYNC_EOS
Definition RawTypes.h:8
const uint32_t SYNC_STAT
Definition RawTypes.h:7
const uint32_t SYNC_JSON
Definition RawTypes.h:9
const uint32_t SYNC_RUN_STOP
Definition RawTypes.h:11
const uint32_t kNBYTESINWORD
Definition RawTypes.h:19
const uint32_t SYNC_FILE_BEGIN
Definition RawTypes.h:12
const uint32_t SYNC_FILE_END
Definition RawTypes.h:13
const uint32_t SYNC_EVENT_OLD
Definition RawTypes.h:6
const size_t kWORDSIZE
Definition RawTypes.h:18
void SetMerger(TBufferMerger *m)
void SetInitData(vector< pair< int, pair< int, int > > > *vec_tree, vector< pair< int, pair< int, int > > > *vec_tree_spills)
void SetTrees(TTree *tree, TTree *treeSpills)
void SetExportExternalSpillStat(bool v)
BmnEventHeader * eventHeaderDAQ
void SetData(Int_t taskId, Int_t len=0, UInt_t *data=nullptr, BmnEventType fCurEvType=kBMNEMPTY, UInt_t fRunId=7444, UInt_t fEventId=-1)
static TString GetSubNameAfterRunId(TString name)
static SpillStatus ParseJsonStatus(json &j, Int_t tai_utc_dif=0)
static BmnStatus ParseComplexTLV(UInt_t *buf, UInt_t &len, UInt_t &runId)
static BmnStatus ParseJsonConfig(json &j, BmnTrigConfig &trig_conf)
map< int32_t, MapTP2TreePE > treeTimeMapDiv
BmnStatus ParseJsonTLV(UInt_t *buf, UInt_t &len)
BmnStatus FeedEvent(UInt_t *buf, UInt_t len, Int_t type)
BmnStatus FeedFile(TString name, Bool_t getRunId=kTRUE, Bool_t getSubName=kTRUE)
multimap< SysPoint, TreePE > treeTimeMap
void ReproduceBranches(TTree *inTree, TTree *outTree)
BmnThreadManager< BmnConverterThread > * fThreads
uint32_t fThreadCnt
void SeparateEventsBySpills()
BmnConverter(uint32_t threadCount=0, uint32_t period=8, TString outDir=".", bool tempFileOnDisk=false)
void SeparateEventsBySpillsTM()
vector< pair< int, pair< int, int > > > vecTree
vector< pair< int, pair< int, int > > > vecTreeSpills
SysPoint GetEventTimeTP()
TTimeStamp GetEventTimeTS()
static std::string TimePoint2String(SysPoint p)
static SysPoint TimeStamp2TP(TTimeStamp p)
static Int_t GetRunIdFromFile(TString name)
T * GetThread(int index)
T * Add(Int_t threadType=0)
a class to store JSON values
Definition json.hpp:17282
static JSON_HEDLEY_WARN_UNUSED_RESULT basic_json parse(InputType &&i, const parser_callback_t cb=nullptr, const bool allow_exceptions=true, const bool ignore_comments=false)
deserialize from a compatible input
Definition json.hpp:20814
iterator begin() noexcept
returns an iterator to the first element
Definition json.hpp:19664
iterator end() noexcept
returns an iterator to one past the last element
Definition json.hpp:19689
iter_impl< basic_json > iterator
an iterator for a basic_json container
Definition json.hpp:17407
pair< TTree *, uint64_t > TreePE
SysPoint stop_ts
SysPoint start_ts
Definition SpillStatus.h:39
bool times_valid
Definition SpillStatus.h:46
SysPoint stop_ts
Definition SpillStatus.h:40