BmnRoot
Loading...
Searching...
No Matches
BmnOnlineSource.cxx
Go to the documentation of this file.
1#include "BmnOnlineSource.h"
2
3#include <FairRootManager.h>
4#include <TBranchElement.h>
5
6BmnOnlineSource::BmnOnlineSource(TString inputTransportAddress)
7 : fEndpoint(inputTransportAddress),
8 fZmqContext(nullptr),
9 fZmqSubSocket(nullptr),
10 fAreBranchesRegistered(kFALSE) {}
11
13
15 LOG(info) << "BmnOnlineSource::Init()";
16
17 fZmqContext.reset(zmq_ctx_new(), zmq_ctx_term);
18 fZmqSubSocket.reset(zmq_socket(fZmqContext.get(), ZMQ_SUB), zmq_close);
19
20 if (zmq_connect(fZmqSubSocket.get(), fEndpoint.Data()) != 0) {
21 LOG(info) << "BmnOnlineSource::Init() | An error occurred while "
22 "connecting the socket to the address '"
23 << fEndpoint << "': " << zmq_strerror(errno);
24 return kFALSE;
25 }
26
27 if (zmq_setsockopt(fZmqSubSocket.get(), ZMQ_SUBSCRIBE, "", 0) != 0) {
28 LOG(error) << "BmnOnlineSource::Init() | An error occurred while "
29 "subscribing the socket to the address '"
30 << fEndpoint << "': " << zmq_strerror(errno);
31 return kFALSE;
32 }
33
34 return kTRUE;
35}
36
38 zmq_msg_t _message;
39 zmq_msg_init(&_message);
40
41 const auto _messageSize = zmq_msg_recv(&_message, fZmqSubSocket.get(), 0);
42
43 if (_messageSize == -1) {
44 LOG(warn) << "BmnOnlineSource::ReadEvent() | An error occurred while "
45 "receiving a message: "
46 << zmq_strerror(errno);
47 zmq_msg_close(&_message);
48 return kFALSE;
49 } else {
50 LOG(info) << "BmnOnlineSource::ReadEvent() | Message received: "
51 << _messageSize << " B";
52 }
53
54 std::unique_ptr<TTree> tree;
55
56 DeserializeZmqMessage(&_message, tree);
57 RegisterBranches(tree);
59
60 zmq_msg_close(&_message);
61
62 return 0;
63}
64
66 fZmqSubSocket.reset();
67 fZmqContext.reset();
68
69 fRegisteredBranches.clear();
70}
71
72void BmnOnlineSource::RegisterBranches(std::unique_ptr<TTree>& tree) {
73 if (fAreBranchesRegistered) {
74 return;
75 }
76
77 const auto _branches = tree->GetListOfBranches();
78 for (Long64_t branchIndex = 0; branchIndex < _branches->GetEntriesFast();
79 branchIndex++) {
80 const auto _branch =
81 static_cast<TBranchElement*>(_branches->At(branchIndex));
82 const auto _branchName = _branch->GetName();
83
84 if (fRegisteredBranches.find(_branchName) ==
85 fRegisteredBranches.end() &&
86 strcmp(_branch->GetClassName(), "TClonesArray") == 0) {
87 auto _branch_data =
88 std::make_unique<TClonesArray>(_branch->GetClonesName());
89
90 FairRootManager::Instance()->RegisterInputObject(
91 _branchName, _branch_data.get());
92
93 fRegisteredBranches[_branchName] = std::move(_branch_data);
94
95 LOG(info) << "BmnOnlineSource::RegisterBranches() | Register "
96 << _branchName;
97 }
98 }
99
100 fAreBranchesRegistered = kTRUE;
101}
102
104 std::unique_ptr<TTree>& tree) {
105 for (auto& _branch : fRegisteredBranches) {
106 _branch.second->Delete();
107 auto tmp = _branch.second.get();
108 if (tree->SetBranchAddress(_branch.first.c_str(), &tmp)) {
109 LOG(error) << "BmnOnlineSource::UploadData(): Branch '"
110 << _branch.first << "' not found!";
111 }
112 }
113
114 tree->GetEntry(0);
115}
std::shared_ptr< void > fZmqContext
static void DeserializeZmqMessage(zmq_msg_t *zmq_message, std::unique_ptr< T > &output)
std::shared_ptr< void > fZmqSubSocket
virtual Int_t ReadEvent(UInt_t=0)
std::unordered_map< std::string, std::unique_ptr< TClonesArray > > fRegisteredBranches
void RegisterBranches(std::unique_ptr< TTree > &tree)
void UploadDataToRegisteredBranches(std::unique_ptr< TTree > &tree)
virtual ~BmnOnlineSource()
virtual Bool_t Init()
BmnOnlineSource(TString endpoint="tcp://localhost:5555")
virtual void Close()