12#include <ROOT/RDF/RInterface.hxx>
13#include <ROOT/RDataFrame.hxx>
14#include <ROOT/RSnapshotOptions.hxx>
18#include "FairLogger.h"
19#include "TStopwatch.h"
22#include "UniDetectorParameter.h"
23#include "UniRawFile.h"
26namespace fs = std::filesystem;
29 : temp_file_on_disk(tempFileOnDisk)
31 , fRawRunHdrName(
"RawRunHeader")
32 , fMetadataName(
"RawRunMetadata")
33 , fRawTreeName(
"BMN_RAW")
34 , fRawTreeSpillName(
"BMN_RAW_SPILLS")
35 , fRootDirPath(outDir)
37 , spillHeader(nullptr)
41 , fExportJsonBlocks(false)
43 , fCurWaitingThread(nullptr)
45 TTree::SetMaxTreeSize(1'000'000'000'000);
46 ROOT::EnableThreadSafety();
49 LOGF(info,
"concurrency: %u", std::thread::hardware_concurrency());
50 if (fRootDirPath.Length() == 0)
51 fRootDirPath = fs::current_path();
52 bool dc = fs::create_directories(fRootDirPath.Data());
53 LOGF(info,
"Directory: %s was %s", fRootDirPath.Data(), dc ?
"created" :
"already present");
54 fRootFileName = TString::Format(
"%s/raw_temp_%d.root", fRootDirPath.Data(), getpid());
55 if (temp_file_on_disk)
58 if (threadCount == 0) {
59 if (std::thread::hardware_concurrency() <= 1)
62 fThreadCnt = std::thread::hardware_concurrency() - 1;
66 LOGF(info,
"Creating %u%s",
fThreadCnt, (
fThreadCnt == 1 ?
" converter thread" :
" converter threads"));
70 if (temp_file_on_disk)
88 LOGF(info,
"Created %u%s",
fThreadCnt, (
fThreadCnt == 1 ?
" converter thread" :
" converter threads"));
94 if (fDecoderConfigFileName.length() == 0)
95 fDecoderConfigFileName = string(getenv(
"VMCWORKDIR")) +
"/config/bmnconf.json";
96 pt::read_json(fDecoderConfigFileName, conf);
97 fExportJsonBlocks = conf.get<
bool>(
"Decoder.ExportJson",
false);
98 bool ExportExternalSpillStat = conf.get<
bool>(
"Decoder.ExportExternalSpillStat",
false);
108 LOGF(info,
"Creating temporary output root file");
109 fRootFileName = TString::Format(
"%s/raw_temp_%u.root", fRootDirPath.Data(), fRunId);
110 fMerger = make_unique<ROOT::TBufferMerger>(fRootFileName.Data());
115 LOGF(error,
"Couldn't access the output file!!!!!\n");
118 LOGF(info,
"Created temporary output root file");
122Int_t BmnConverter::GetSpillNumber(vector<SysPoint>* spillEnds, vector<SysPoint>* spillStarts,
SysPoint time)
124 auto t = std::upper_bound(spillEnds->begin(), spillEnds->end(), time);
125 if (t == spillEnds->end()) {
128 if (time < (*spillStarts)[t - spillEnds->begin()]) {
131 return t - spillEnds->begin();
135Int_t BmnConverter::GetSpillNumber(vector<SysPoint>* spillEnds, vector<SysPoint>* spillStarts, TTimeStamp time)
138 return GetSpillNumber(spillEnds, spillStarts, p);
146 LOGF(info,
"WriteTree for thread %u",
i);
153 auto file = fMerger->GetFile();
154 file->WriteObject(runHeader.get(), fRawRunHdrName.Data());
155 file->WriteObject(metadata.get(), fMetadataName.Data());
160 LOGF(info,
"main thread objects written");
178 TStopwatch* timer =
new TStopwatch();
180 ROOT::EnableImplicitMT(2);
181 LOGF(info,
"Preparing temporary file for read access");
183 LOGF(info,
"Trees written");
184 TFile* fRootFileOut =
new TFile(fRootFileName,
"read");
185 if (!fRootFileOut->IsOpen()) {
187 LOGF(error,
"Couldn't access the temp file!!!!!\n");
192 (TTree*)(fRootFileOut->Get(fRawTreeSpillName + to_string(
i))));
196 spillCheck.insert(
vecTree[
i].first);
199 LOGF(info,
"Starting to write %d spills", spillCheck.size());
200 for (
auto t : spillCheck) {
202 ROOT::TBufferMerger merger(to_string(fRunId) + (TString)
"_s" + to_string(t) +
".root",
"recreate");
212 TFile file(to_string(fRunId) + (TString)
"_s" + to_string(t) +
".root",
"update");
213 file.WriteObject(runHeader.get(), fRawRunHdrName.Data());
214 file.WriteObject(metadata.get(), fMetadataName.Data());
217 LOGF(info,
"Finished writing weird events (spill %d)", t);
219 LOGF(info,
"Finished writing unfinished spill (spill %d)", t);
221 LOGF(info,
"Finished writing spill %d", t);
223 ROOT::DisableImplicitMT();
224 fRootFileOut->Close();
234 TStopwatch* timer =
new TStopwatch();
236 vector<SysPoint> spillEnds;
237 vector<SysPoint> spillStarts;
238 for (
auto t : fSpillMap) {
239 spillStarts.push_back(t.second.start_ts);
240 spillEnds.push_back(t.second.stop_ts);
244 std::sort(spillEnds.begin(), spillEnds.end());
255 for (
size_t i = 0;
i < tmp.size();
i += 2) {
256 int n = tmp[
i]->GetEntriesFast();
257 TBranch* branch = tmp[
i]->GetBranch(
"BmnEventHeader.");
258 for (
int j = 0; j < n; j++) {
262 make_pair(
i / 2, j)));
265 for (
size_t i = 1;
i < tmp.size();
i += 2) {
266 int n = tmp[
i]->GetEntriesFast();
267 TBranch* branch = tmp[
i]->GetBranch(
"MSC");
268 TBranch* branchT0 = tmp[
i]->GetBranch(
"T0Raw");
269 for (
int j = 0; j < n; j++) {
273 if (th->
msc->GetEntriesFast() > 0) {
277 branchT0->GetEntry(j);
278 if (th->
t0raw->GetEntriesFast() > 0) {
285 vecTreeSpills.push_back(make_pair(GetSpillNumber(&spillEnds, &spillStarts, ts), make_pair(
i / 2, j)));
286 LOGF(debug2,
"GetSpillNumber %d", GetSpillNumber(&spillEnds, &spillStarts, ts));
295 runHeader->SetSpillMap(fSpillMap);
303 LOGF(info,
"Starting separating events by spill");
304 TStopwatch* timer =
new TStopwatch();
310 vector<SysPoint> spillEnds;
311 vector<SysPoint> spillStarts;
312 for (
auto t : fSpillMap) {
313 spillStarts.push_back(t.second.start_ts);
314 spillEnds.push_back(t.second.stop_ts);
318 std::sort(spillEnds.begin(), spillEnds.end());
319 for (
size_t i = 0;
i < spillEnds.size();
i++) {
326 int64_t n = tree->GetEntriesFast();
327 TBranch* branch = tree->GetBranch(
"BmnEventHeader.");
328 for (int64_t iEntry = 0; iEntry < n; iEntry++) {
329 branch->GetEntry(iEntry);
332 int32_t sn = GetSpillNumber(&spillEnds, &spillStarts, tp);
339 LOGF(info,
"t %d", spill_id);
340 TFile file(to_string(fRunId) + (TString)
"_s" + to_string(spill_id) +
".root",
"recreate");
341 TTree* sortedTree = mm.begin()->second.first->CloneTree(0);
342 sortedTree->SetAutoSave(0);
343 sortedTree->SetAutoFlush(0);
344 sortedTree->SetBasketSize(
"*", 1073741824);
345 if (!file.IsOpen()) {
346 LOGF(error,
"Couldn't access the output file!!!!!\n");
349 LOGF(info,
"TFile open");
350 for (
const pair<const SysPoint, TreePE>& el : mm) {
351 const TreePE& pe = el.second;
352 pe.first->GetEntry(pe.second);
355 file.WriteObject(runHeader.get(), fRawRunHdrName.Data());
356 file.WriteObject(metadata.get(), fMetadataName.Data());
359 LOGF(info,
"Finished writing weird events (spill %d)", spill_id);
360 else if (spill_id == -1)
361 LOGF(info,
"Finished writing unfinished spill (spill %d)", spill_id);
363 LOGF(info,
"Finished writing spill %d", spill_id);
373 TObjArray* brList = inTree->GetListOfBranches();
374 if (inTree->GetEntry(0) == 0) {
375 LOGF(error,
"Input tree is empty! Recreating empty output...");
400 for (Int_t
i = 0;
i < brList->GetEntries();
i++) {
401 TBranch* br =
static_cast<TBranch*
>(brList->At(
i));
402 TString str(br->GetName());
403 TObject* obj = *(
reinterpret_cast<TObject**
>(br->GetAddress()));
404 LOGF(debug,
"obj->ClassName() %s", obj->ClassName());
405 TClass* cl = TClass::GetClass(obj->ClassName());
406 LOGF(debug,
"ClassName: %s", br->ClassName());
407 if (cl == TClonesArray::Class()) {
408 TClonesArray* inTCA =
static_cast<TClonesArray*
>(obj);
409 LOGF(debug,
"TCA->GetClass()->GetName() %s", inTCA->GetClass()->GetName());
410 outTree->Branch(str.Data(), &inTCA);
411 fArrVec.push_back(inTCA);
414 TNamed* inObj =
static_cast<TNamed*
>(obj);
415 outTree->Branch(str.Data(),
"TObject", inObj);
416 fNamVec.push_back(inObj);
427 QuickProcessEvent(buf, len);
429 }
else if (type == 2) {
431 QuickConvertStatEvent(buf, len);
443 fRawFileIn = fopen(name,
"rb");
444 if (fRawFileIn ==
nullptr) {
445 LOGF(error,
"Couldn't open file %s!!!!!\n", name.Data());
449 LOGF(info,
"Period ID: %u, Run ID: %u, Subname: %s", fPeriodId, fRunId, fSubName.Data());
455 TStopwatch* timer =
new TStopwatch();
457 while ((ConvertRawToRootIterateFileRead() !=
kBMNERROR)) {
465 if (fCurWaitingThread) {
467 fCurWaitingThread =
nullptr;
473 std::fclose(fRawFileIn);
477BmnStatus BmnConverter::ConvertRawToRootIterateFileRead()
484 if (fread(&fDat,
kWORDSIZE, 1, fRawFileIn) != 1)
494 if (fread(&fDat,
kWORDSIZE, 1, fRawFileIn) != 1)
497 LOGF(error,
"WTF?? in the main event: fDat == %u", fDat);
502 if (fread(data,
kWORDSIZE, fDat, fRawFileIn) != fDat)
509 QuickProcessEvent(data, fDat);
528 if (fread(&fDat,
kWORDSIZE, 1, fRawFileIn) != 1)
531 LOGF(error,
"WTF?? in the STAT: fDat == %u", fDat);
533 LOGF(debug3,
"STAT ev length bytes %d", fDat);
536 if (fread(data,
kWORDSIZE, fDat, fRawFileIn) != fDat)
539 QuickConvertStatEvent(data, fDat);
542 LOGF(debug,
"RUN START");
545 LOGF(debug,
"RUN STOP");
546 if (fread(&fDat,
kWORDSIZE, 1, fRawFileIn) != 1)
548 if (fread(data, 1, fDat, fRawFileIn) != fDat)
553 LOGF(debug,
"FILE BEGIN");
556 LOGF(debug,
"FILE END");
557 if (fread(&fDat,
kWORDSIZE, 1, fRawFileIn) != 1)
559 if (fread(data, 1, fDat, fRawFileIn) != fDat)
564 if (fread(&fDat,
kWORDSIZE, 1, fRawFileIn) != 1)
566 LOGF(debug,
"SYNC JSON len %u", fDat);
567 if (fread(data, 1, fDat, fRawFileIn) != fDat)
572 LOGF(error,
"unrecognized sync %08X", fDat);
640void BmnConverter::QuickConvertStatEvent(UInt_t*
d, UInt_t& len)
643 tmp->
SetData(2, len,
d, fCurEvType, fRunId, fEventId);
647void BmnConverter::QuickProcessEvent(UInt_t*
d, UInt_t& len)
650 tmp->
SetData(1, len,
d, fCurEvType, fRunId, fEventId);
657 tmp->
SetData(2, len,
nullptr, fCurEvType, fRunId, fEventId);
666 tmp->
SetData(1, len,
nullptr, fCurEvType, fRunId, fEventId);
677 DeviceHeader* dh =
reinterpret_cast<DeviceHeader*
>(
d + idx);
681 string str(
reinterpret_cast<const char*
>(
d + idx), str_len);
687 string jkey(it.key());
689 if (!jkey.compare(
"status")) {
690 auto& j_runtime = it.value()[
"runTime"];
691 auto j_spill = j_runtime[
"spill"];
692 Int_t phase =
static_cast<Int_t
>(j_spill[
"phase"]);
694 auto wr = j_runtime[
"time"][
"WR"];
695 Int_t ns =
static_cast<Int_t
>(wr[
"ns"]);
696 Int_t sec =
static_cast<Int_t
>(wr[
"sec"]);
697 if (tai_utc_dif == 0)
698 tai_utc_dif = GetUTCShift(TTimeStamp(time_t(sec), ns));
709 LOGF(info,
"JSON status phase %d", phase);
713 auto spill_it = fSpillMap.find(ss.
start_ts);
714 if (spill_it == fSpillMap.end()) {
716 std::tie(spill_it, inserted) = fSpillMap.insert(
719 .start_ts = ss.start_ts,
720 .stop_ts = ss.start_ts}));
721 LOGF(debug,
"spill record inserted %d", inserted);
727 if (fExportJsonBlocks) {
728 std::ofstream outfile(Form(
"j_%s_%d_%9d_%09d_%02X_%08X_phase_%d.json", jkey.c_str(), fRunId, sec,
729 ns, dh->DeviceId, dh->Serial, phase),
730 std::ofstream::binary);
731 outfile << std::setw(4) << j << std::endl;
734 metadata->SpillStatusVec().push_back(move(ss));
736 LOGF(info,
"JSON config");
737 if (!jkey.compare(
"config")) {
738 string start_DT = it.value()[
"meta"][
"startDateTime"];
739 if (fExportJsonBlocks) {
740 std::ofstream outfile(Form(
"j_%s_%d_%s_%02X_%08X.json", jkey.c_str(), fRunId, start_DT.data(),
741 dh->DeviceId, dh->Serial),
742 std::ofstream::binary);
743 outfile << std::setw(4) << j << std::endl;
749 LOGF(error,
"Parsing JSON config failed!");
751 LOGF(info,
"Unknown json key %s", jkey.data());
754 }
catch (std::exception& ex) {
755 LOGF(error,
"Exception for JSON block: %s", ex.what());
762 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1972, 1, 1, 0, 0, 9), 10));
763 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1972, 7, 1, 0, 0, 10), 11));
764 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1973, 1, 1, 0, 0, 11), 12));
765 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1974, 1, 1, 0, 0, 12), 13));
766 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1975, 1, 1, 0, 0, 13), 14));
767 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1976, 1, 1, 0, 0, 14), 15));
768 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1977, 1, 1, 0, 0, 15), 16));
769 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1978, 1, 1, 0, 0, 16), 17));
770 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1979, 1, 1, 0, 0, 17), 18));
771 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1980, 1, 1, 0, 0, 18), 19));
772 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1981, 7, 1, 0, 0, 19), 20));
773 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1982, 7, 1, 0, 0, 20), 21));
774 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1983, 7, 1, 0, 0, 21), 22));
775 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1985, 7, 1, 0, 0, 22), 23));
776 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1988, 1, 1, 0, 0, 23), 24));
777 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1990, 1, 1, 0, 0, 24), 25));
778 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1991, 1, 1, 0, 0, 25), 26));
779 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1992, 7, 1, 0, 0, 26), 27));
780 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1993, 7, 1, 0, 0, 27), 28));
781 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1994, 7, 1, 0, 0, 28), 29));
782 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1996, 1, 1, 0, 0, 29), 30));
783 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1997, 7, 1, 0, 0, 30), 31));
784 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(1999, 1, 1, 0, 0, 31), 32));
785 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2006, 1, 1, 0, 0, 32), 33));
786 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2009, 1, 1, 0, 0, 33), 34));
787 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2012, 7, 1, 0, 0, 34), 35));
788 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2015, 7, 1, 0, 0, 35), 36));
789 leaps.insert(pair<TTimeStamp, Int_t>(TTimeStamp(2017, 1, 1, 0, 0, 36), 37));
790 utc_valid = TTimeStamp(2026, 1, 1, 0, 0, 1);
794Int_t BmnConverter::GetUTCShift(TTimeStamp t)
796 if (t < leaps.begin()->first) {
797 LOGF(warning,
"Wrong time! %s", t.AsString());
801 LOGF(warning,
"Warning! Leap seconds table expired!");
803 auto it = leaps.lower_bound(t);
804 if ((it == leaps.end()))
806 else if (it->first > t)
const Float_t d
Z-ccordinate of the first GEM-station.
std::chrono::time_point< SysClock > SysPoint
const uint32_t SYNC_RUN_START
const uint32_t SYNC_EVENT
const uint32_t SYNC_RUN_STOP
const uint32_t kNBYTESINWORD
const uint32_t SYNC_FILE_BEGIN
const uint32_t SYNC_FILE_END
const uint32_t SYNC_EVENT_OLD
void SetMerger(TBufferMerger *m)
void SetInitData(vector< pair< int, pair< int, int > > > *vec_tree, vector< pair< int, pair< int, int > > > *vec_tree_spills)
void SetTrees(TTree *tree, TTree *treeSpills)
void SetExportExternalSpillStat(bool v)
BmnEventHeader * eventHeaderDAQ
void SetData(Int_t taskId, Int_t len=0, UInt_t *data=nullptr, BmnEventType fCurEvType=kBMNEMPTY, UInt_t fRunId=7444, UInt_t fEventId=-1)
void RegisterSpillBranches()
map< int32_t, MapTP2TreePE > treeTimeMapDiv
BmnStatus ParseJsonTLV(UInt_t *buf, UInt_t &len)
BmnStatus FeedEvent(UInt_t *buf, UInt_t len, Int_t type)
BmnStatus FeedFile(TString name, Bool_t getRunId=kTRUE, Bool_t getSubName=kTRUE)
multimap< SysPoint, TreePE > treeTimeMap
void ReproduceBranches(TTree *inTree, TTree *outTree)
BmnThreadManager< BmnConverterThread > * fThreads
void SeparateEventsBySpills()
BmnConverter(uint32_t threadCount=0, uint32_t period=8, TString outDir=".", bool tempFileOnDisk=false)
void SeparateEventsBySpillsTM()
vector< pair< int, pair< int, int > > > vecTree
vector< pair< int, pair< int, int > > > vecTreeSpills
static std::string TimePoint2String(SysPoint p)
static SysPoint TimeStamp2TP(TTimeStamp p)
static Int_t GetRunIdFromFile(TString name)
T * Add(Int_t threadType=0)
a class to store JSON values
static JSON_HEDLEY_WARN_UNUSED_RESULT basic_json parse(InputType &&i, const parser_callback_t cb=nullptr, const bool allow_exceptions=true, const bool ignore_comments=false)
deserialize from a compatible input
iterator begin() noexcept
returns an iterator to the first element
iterator end() noexcept
returns an iterator to one past the last element
iter_impl< basic_json > iterator
an iterator for a basic_json container
pair< TTree *, uint64_t > TreePE