BmnRoot
Loading...
Searching...
No Matches
BmnMQSink.cxx
Go to the documentation of this file.
1#include "BmnMQSink.h"
2
3using namespace std;
4
5BmnMQSink::BmnMQSink(Int_t OutPort)
6 : // fSender(nullptr)
7 fCtx(nullptr)
8 , fOutSocket(nullptr)
9 , fOutPort(OutPort)
10{}
11
13
15{
16 // if (fSender)
17 // delete fSender;
18}
19
21{
22 printf("BmnMQSink::InitSink\n");
23 fOutChannel = "data-out";
24 // FairMQProgOptions *config = new FairMQProgOptions();
25 // string DstChanName = "chans." + fOutChannel + ".0.address";
26 // //// config->SetValue<string>("out-channel", DstChanName);
27 // //// boost::program_options::options_description options;
28 // //// options.add_options()
29 // //// ("in-channel", bpo::value<std::string>()->default_value("digi"), "Name of the input channel")
30 // //// ("out-channel", bpo::value<std::string>()->default_value("dst"), "Name of the output
31 // channel");
32 // //// // config.AddToCmdLineOptions()
33 // // config->SetValue<string>(DstChanName, "tcp://127.0.0.1:6666");
34 // // config->SetValue<string>("transport", "zeromq");
35 // string arg = "--channel-config
36 // 'name=data-out,type=push,method=connect,rateLogging=1,address=tcp://localhost:5220'"; vector<string> args;
37 // // args.push_back(arg);
38 // args.push_back("");
39 // args.push_back("--mq-config");
40 // args.push_back("../monitor/mq-reco.json");
41 // args.push_back("--id ");
42 // args.push_back("mq-sink ");
43 // args.push_back("--channel-config ");
44 // args.push_back("name=data-out,type=pub,method=connect,rateLogging=1,address=tcp://localhost:5220 ");
45 // config->ParseAll(args, kTRUE);
47 // config->SetProperty<string>(fOutChannel, DstChanName);
48 // config->PrintOptions();
50 // fSender = new FairMQDevice(*config,{0, 0, 0});
51 // // fSender = new FairMQDevice();
52 // // fSender->SetId("mq-sink");
53 // fSender->SetConfig(*config);
54 // // fSender->fChannels = config->GetVarMap();
55 // // fSender->RunStateMachine();
56 // // fSender->ChangeState("INIT_DEVICE");
57 // // fSender->RegisterChannelEndpoints();
58 // // FairMQProgOptions* config = fSender->GetConfig();
59 // // fSender->RegisterChannelEndpoint(fOutChannel);
60 // // if (fConfig.Count("print-channels")) {
62 // printf("BmnMQSink::InitSink PrintRegisteredChannels\n");
63 // // fSender->RunStateMachine();
64 // // printf("BmnMQSink::InitSink RunStateMachine\n");
65 // // fSender.ChangeState(fair::mq::Transition::End);
66 // // return 0;
67 // // }
68 // // fSender.SetConfig(fConfig);
69 // // fSender.RunStateMachine();
70 // // printf("BmnMQSink::InitSink RunStateMachine\n");
71 if (fCtx) {
72 zmq_close(fOutSocket);
73 if (fCtx)
74 zmq_ctx_destroy(fCtx);
75 } // stupid FairRunOnline executes InitSink twice
76
77 fCtx = zmq_ctx_new();
78 fOutSocket = zmq_socket(fCtx, ZMQ_PUB);
79 Int_t sndBuf = 0;
80 size_t vl = sizeof(sndBuf);
81 if (zmq_getsockopt(fOutSocket, ZMQ_SNDBUF, &sndBuf, &vl) == -1)
82 DBG("zmq_getsockopt of ZMQ_RCVBUF")
83 printf("sndbuf = %d\n", sndBuf);
84 sndBuf = 8192; // MAX_BUF_LEN;
85 if (zmq_setsockopt(fOutSocket, ZMQ_SNDBUF, &sndBuf, sizeof(sndBuf)) == -1)
86 DBGERR("zmq_setsockopt of ZMQ_SNDBUF");
87 sndBuf = 0;
88 if (zmq_getsockopt(fOutSocket, ZMQ_RCVBUF, &sndBuf, &vl) == -1)
89 DBGERR("zmq_getsockopt of ZMQ_RCVBUF")
90 printf("sndbuf = %d\n", sndBuf);
91 TString localDecoStr = Form("tcp://*:%d", fOutPort);
92 if (zmq_bind(fOutSocket, localDecoStr.Data()) != 0) {
93 DBGERR("zmq bind")
94 return kFALSE;
95 }
96 printf("Bind %s\n", localDecoStr.Data());
97
98 return kTRUE;
99}
100
102{
103 zmq_close(fOutSocket);
104 if (fCtx)
105 zmq_ctx_destroy(fCtx);
106}
107
108void BmnMQSink::RegisterImpl(const char* name, const char* folderName, void* ob)
109{
110 printf("RegisterImpl name %s foldername %s ob %08lX\n", name, folderName, reinterpret_cast<uintptr_t>(ob));
111 if (std::strstr(name, ".")) {
112 TNamed* obn = static_cast<TNamed*>(ob);
113 printf("casted: name %s class name %s classname %s class_name %s\n", obn->GetName(), obn->Class()->GetName(),
114 obn->ClassName(), obn->Class_Name());
115 TClass* cl = TClass::GetClass(obn->ClassName());
116 printf("clname %s\n", cl->GetName());
117 fObjMap.insert(pair<const char*, TObject*>(name, static_cast<TObject*>(ob)));
118 fParts.AddObject(obn);
119 } else {
120 TClonesArray* tca = static_cast<TClonesArray*>(ob);
121 printf("ar cl %s name %s title %s len %5d\n", tca->GetClass()->GetName(), tca->GetName(), tca->GetTitle(),
122 tca->GetEntriesFast());
123 fArrMap.insert(pair<const char*, TClonesArray*>(
124 name, static_cast<TClonesArray*>(ob))); // new TClonesArray(tca->GetClass())));
125 fParts.AddArray(tca);
126 }
127 // TFolder* folder = 0;
128 // TFolder* f = 0;
129 // f = static_cast<TFolder*>(fOutFolder->FindObjectAny(folderName));
130 // if (f == 0) {
131 // folder = fOutFolder->AddFolder(folderName, folderName);
132 // } else {
133 // folder = f;
134 // }
135 // // ((TNamed*)obj)->SetName(name);
136 // folder->Add((TNamed*)obj);
137
138 // auto& ot = typeid(T*);
139 // auto& pt = typeid(T);
140 // RegisterAny(name, ot, pt, &ob);
141
142 return;
143}
144
145void BmnMQSink::RegisterAny(const char* brname, const std::type_info& oi, const std::type_info& pi, void* obj)
146{
147 fPersistentBranchesMap[brname] = std::unique_ptr<TypeAddressPair const>(new TypeAddressPair(oi, pi, obj));
148 printf("\nRegisterAny brname %s %08lX\n", brname,
149 reinterpret_cast<uintptr_t>(fPersistentBranchesMap[brname].get()));
150}
151
153{
154 printf("IsPersistentBranchAny\n");
155 if (fPersistentBranchesMap.find(name) == fPersistentBranchesMap.end()) {
156 return false;
157 }
158 return true;
159}
160
162{
163 LOGF(info, "CreatePersistentBranchesAny");
164 return false;
165}
166
167void BmnMQSink::EmitPersistentBranchWrongTypeWarning(const char* brname, const char* type1, const char* type2) const
168{
169 // LOG(warn) << "Trying to read from persistent branch " << brname << " with wrong type " << type1
170 // << " (expexted: " << type2 << " )";
171}
172
174{
175 // printf("BmnMQSink::Fill\n");
176 // FairMQParts parts;
177 TBufferFile t(TBuffer::kWrite);
178
179 // for (auto &brEl : fPersistentBranchesMap) {
180 // TClonesArray** tcArray =
181 // GetPersistentBranchAny<TClonesArray**>(brEl.first.c_str());
182 //
183 // if (tcArray) {
184 // LOG(debug) << FairRootManager::Instance()->GetInstanceId() << "] "
185 // << " /// *Array->GetName() " << (*tcArray)->GetName();
186 // // TObject* objClone = (*tcArray)->Clone();
187 // // FairMQMessagePtr mess = unique_ptr<FairMQMessage>(new
188 // FairMQMessage());//make_unique<FairMQMessage>(); FairMQMessagePtr mess(fSender->NewMessage());
189 // fSender->Serialize<RootSerializer>(*mess, tcArray);
190 // // Serializer().Serialize<>
191 // // buf.WriteObject(tcArray);
192 // parts.AddPart(std::move(tcArray));
193 // }
194 //
195 // }
196 // for (auto & it : fObjMap) {
197 // TObject* ob = it.second;
198 // FairMQMessagePtr mess(fSender->NewMessage());
200 // fSender->Serialize<RootSerializer>(*mess, ob);
201 // parts.AddPart(std::move(mess));
202 //
203 // }
204 // for (auto & it : fArrMap) {
205 // TClonesArray* ar = it.second;
206 // FairMQMessagePtr mess = new FairMQMessage();//(fSender->NewMessage());
207 // fSender->Serialize<RootSerializer>(*mess, ar);
208 // parts.AddPart(std::move(mess));
209 //
210 // }
211 // for (auto & ar : fParts.GetArrays()) {
212 // printf("ar %s counts %d\n", ar->GetClass()->GetName(), ar->GetEntriesFast());
213 // }
214
215 t.WriteObject(&fParts);
216 Int_t sendRes = zmq_send(fOutSocket, t.Buffer(), t.Length(), ZMQ_NOBLOCK);
217 // printf("sendRes %d\n", sendRes);
218 t.Reset();
219 if (sendRes == -1) {
220 fprintf(stderr, "Send error # %d #%s\n", errno, zmq_strerror(errno));
221 }
222 // int64_t sent = 0;
223 // printf("fChannels size %lu\n", fSender->fChannels.size());
224 // for (const pair<string, vector < FairMQChannel>> &gr : fSender->fChannels) {
225 // printf("gr: %s \n", gr.first.c_str());
226 // for (const FairMQChannel & ch : gr.second) {
227 // printf("\taddr:%s\n", ch.GetAddress().c_str());
228 // }
229 // }
230 // fSender->Send(fPartsF, "data-out");
231 // printf("sent %ld\n", sent);
232 // sent = fSender->Send(fPartsF, "dst:1:1");
233 // printf("sent %ld\n", sent);
234 // // // sent = fSender->Send(parts, "dst");
235 // // // printf("sent %ld\n", sent);
236 // // sent = fSender->Send(parts, "dst:1");
237 // // printf("sent %ld\n", sent);
238 // // sent = fSender->Send(parts, "dst:0:0");
239 // // printf("sent %ld\n", sent);
240 // sent = fSender->Send(fPartsF, "out-channel");
241 // printf("sent %ld\n", sent);
242}
243
245{
246 return new BmnMQSink(*this);
247}
#define DBGERR(a)
Definition BmnMath.h:25
#define DBG(a)
Definition BmnMath.h:24
virtual void Fill()
virtual Bool_t InitSink()
Definition BmnMQSink.cxx:20
virtual ~BmnMQSink()
Definition BmnMQSink.cxx:14
virtual void RegisterAny(const char *brname, const std::type_info &oi, const std::type_info &pi, void *obj)
virtual bool CreatePersistentBranchesAny()
virtual void Close()
bool IsPersistentBranchAny(const char *name)
virtual void RegisterImpl(const char *, const char *, void *)
BmnMQSink(Int_t OutPort=6666)
Definition BmnMQSink.cxx:5
virtual FairSink * CloneSink()
void AddArray(TClonesArray *ar)
Definition BmnParts.h:46
void AddObject(TNamed *ob)
Definition BmnParts.h:49
STL namespace.