BmnRoot
Loading...
Searching...
No Matches
nica-scheduler.cpp
Go to the documentation of this file.
1//========================================================================================================
2// Name : nica-scheduler.cpp
3// Author : Konstantin Gertsenberger
4// Description : NICA-Scheduler to run processing tasks of the ROOT-based frameworks through batch systems
5//========================================================================================================
6
7// own headers
8#include "batch_command.h"
10#include "function_set.h"
11
12// ROOT headers for work with the Unified Condition Database (UniConDa)
13#include "TSQLResult.h"
14#include "TSQLRow.h"
15#include "TSQLServer.h"
16#include "TString.h"
17
18// C++ headers
19#include <algorithm>
20#include <array>
21#include <condition_variable>
22#include <fstream>
23#include <getopt.h>
24#include <glob.h>
25#include <libxml/parser.h>
26#include <libxml/tree.h>
27#include <map>
28#include <mutex>
29#include <thread>
30#include <unistd.h>
31
32/* Predefined settings for the Unified Condition Database*/
34{
35 string db_host = "";
36 string db_name = "";
37 string db_username = "";
38 string db_password = "";
39
42 {
43 db_host = arg_db_settings.db_host;
44 db_name = arg_db_settings.db_name;
45 db_username = arg_db_settings.db_username;
46 db_password = arg_db_settings.db_password;
47 }
48 struct_database_settings(string arg_db_host, string arg_db_name, string arg_db_username, string arg_db_password)
49 {
50 db_host = arg_db_host;
51 db_name = arg_db_name;
52 db_username = arg_db_username;
53 db_password = arg_db_password;
54 }
56 {
57 if (this != &arg_db_settings) {
58 db_host = arg_db_settings.db_host;
59 db_name = arg_db_settings.db_name;
60 db_username = arg_db_settings.db_username;
61 db_password = arg_db_settings.db_password;
62 }
63 return *this;
64 }
65};
66#include "../settings.h"
67
68// debug flag for extended output
69bool isDebugMode = false;
70
71// C++11 does not include semaphore implementation (only in C++20)
72// therefore there is a manual implementation
74{
75 public:
76 Semaphore(unsigned int resource_count = 0)
77 : count_(resource_count)
78 {}
79
87 void acquire()
88 {
89 unique_lock<mutex> lock(mutex_);
90 condition_.wait(lock, [this] { return count_ > 0; });
91 count_--;
92 }
93
100 void release()
101 {
102 unique_lock<mutex> lock(mutex_);
103 count_++;
104 condition_.notify_one();
105 }
106
107 private:
108 mutex mutex_;
109 condition_variable condition_;
110 unsigned int count_;
111};
112
113// string structure to support null (no) value
115{
116 protected:
117 // string value
118 string strValue;
119 // whether the string is null (no value)
121
122 public:
124 : strValue("")
125 , isNullString(true)
126 {}
127 nullString(nullptr_t)
128 : strValue("")
129 , isNullString(true)
130 {}
131 nullString(char* pchar) { SetValue(pchar); }
132 nullString(string str)
133 : strValue(str)
134 , isNullString(false)
135 {}
137 : strValue(str.strValue)
139 {}
140
142 {
143 strValue = str.strValue;
145 return *this;
146 }
147
148 bool operator==(nullptr_t) const { return isNullString; }
149 bool operator!=(nullptr_t) const { return !isNullString; }
150
151 void SetNull()
152 {
153 strValue = "";
154 isNullString = true;
155 }
156 void SetValue(string str)
157 {
158 strValue = str;
159 isNullString = false;
160 }
161 void SetValue(nullptr_t) { SetNull(); }
162 void SetValue(char* pchar)
163 {
164 if (pchar == nullptr)
165 SetNull();
166 else
167 SetValue((string)pchar);
168 }
169
170 string GetValue() { return strValue; }
171 bool isNull() { return isNullString; }
172};
173
174// structure with information on files to be processed
175// iParallelMode = processor count to parallel event processing of the input file ('-1' - was not set)
176// iMerge: 0 (chain) - save only result TChain with the data tree to the first file name (pointing to other files)
177// 1 (true) - merge to the first file name and delete partial files
178// 2 (preserve) - merge to the first file name and not delete partial files
179// -1 (false) - not to merge (default if 'merge' attribute is not set
181{
182 // input file path
183 string strFileIn;
184 // output file path or null if no output file set
186 // start event and event number to be processed (higher priority than for <macro>)
187 unique_ptr<int> ptrStartEvent, ptrEventCount;
188 // ???
190 // described upper, before the structure
192 // command and path for copying to a temporary storage
194 // command and path for copying from a temporary storage
196 // paths of temporary files to be deleted
197 vector<string> vecCleanPath;
198
200 : strFileIn("")
201 , nstrFileOut(nullptr)
202 , ptrStartEvent(nullptr)
203 , ptrEventCount(nullptr)
204 , strParallelMode("")
205 , iParallelMode(-1)
206 , iMerge(-1)
207 , strPutCommand("")
208 , strPutPath("")
209 , strGetCommand("")
210 , strGetPath("")
211 {}
213};
214
215// structure with information on macro to be executed
216// macro must have the following sequence of parameters: input_file, output_file, start event, event count...
218{
219 // ROOT macro name
221 // ROOT macro path
223 // start event and event number to be processed
224 unique_ptr<int> ptrStartEvent, ptrEventCount;
225 // additional arguments of the macro after 'event count'
227 // file list to be processed
228 vector<unique_ptr<structFilePar>> vecFiles;
230 : nstrMacroName(nullptr)
231 , nstrMacroPath(nullptr)
232 , ptrStartEvent(nullptr)
233 , ptrEventCount(nullptr)
234 , strAddArgs("")
235 {}
237};
238
239// structure with information on job to be run
241{
242 // job name
244 // job name of the parent job, from which the current one is dependent
246 // command line in case of a manual command
248 // run mode: 0 - local (on a multi-core machine), 1 - global (on a cluster), 2 - in container on a cluster
250 // container type
252 // string with processor count to be used
254 // configuration file path
256 // priority of the job in a batch system
258 // string with RAM size requested for one process
260 // whether write a core dump if the task is crashed
262 // log file path
264 // queue name of a batch system
266 // Quality of Service name of a batch system
268 // a list of hosts, on which the job is performed
270 // working directory path
272 // setting of the Unified Condition Database
274 vector<shared_ptr<structMacroPar>> vecMacros;
276 : strJobName("scheduler_job")
278 , nstrCommandline(nullptr)
279 , iRunMode(0)
280 , strContainerType("")
281 , nstrProcCount(nullptr)
282 , strConfigPath("")
283 , nstrPriority(nullptr)
285 , isCoreDump(false)
286 , nstrLogs(nullptr)
287 , nstrQueue(nullptr)
288 , nstrQOS(nullptr)
289 , nstrHosts(nullptr)
290 , nstrWorkDir(nullptr)
291 {}
293};
294
295// structure for parameters to transfer to the main thread in case of the LOCAL mode
297{
298 // job parameters to use in the main thread
299 shared_ptr<structJobPar> sptrJobParameter;
300 // semaphore to synchronize all the threads
301 shared_ptr<Semaphore> semJobSemaphore;
302 // mutex to synchronize all the threads
303 shared_ptr<mutex> mutJobMutex;
304 // thread number starting from 1
306};
307// structure for parameters to transfer from the main thread to sub-threads
308// sub-threads are used to parallelize by events, where threads process individual files
310{
311 // move parameters from the main thread to child ones
312 shared_ptr<structThreadPar> sptrThreadParameter;
313 // macro parameters to use in the child threads
314 shared_ptr<structMacroPar> sptrMacroParameter;
315 // start event for the current sub-thread
317 // event count to be processed by the current sub-thread
319 // real input file path for the sub-thread
320 string strFileIn;
321 // real output file path for the sub-thread
323 // sub-thread number
325};
326
327// declaration of function to parse criteria to select data from the Unified Condition Database
328void ParseDatabaseParameters(string input, TString& sql, bool is_simulation);
329// declaration of function to parse file list for the macro of the job
330bool ParseMacroFiles(xmlNodePtr sub_node,
331 shared_ptr<structJobPar> job_par,
332 shared_ptr<structMacroPar> macro_par,
333 multimap<string, unique_ptr<vector<nullString>>>& map_job_output,
334 batch_commands batch_com);
335
336// special function to color main info and error messages (cyan or red color)
337void info_message(const string msg, bool is_error = 0)
338{
339 bool is_terminal = isatty(fileno(stdout));
340
341 string color_off("\033[0m");
342 string msg_color; // message color
343 if (is_error)
344 msg_color = "\033[0;91m"; // light red color for errors
345 else
346 msg_color = "\033[0;36m"; // cyan color for info message
347
348 if (is_terminal)
349 cout << msg_color << msg << color_off << endl;
350 else
351 cout << msg << endl;
352}
353
354// replace string in the input text with the following options after ':'
355// '~N' - exclude first N-'count' symbols of the new_substring
356// e.g. {file_name_with_ext:~3} - remove first 3 symbols
357// '-N' - exclude last N-'count' symbols of the new_substring
358// e.g. {file_name:-4}- remove last 4 symbols
359void replace_string_in_text_ext(string& text, string old_substring, string new_substring)
360{
361 int start = -1, end, count, last_char = old_substring.length() - 1;
362 do {
363 start = text.find(old_substring.substr(0, last_char), start + 1);
364 if (start > -1) {
365 if (text[start + last_char] == ':') {
366 if (text[start + last_char + 1] == '~') {
367 end = text.find("}", start + last_char + 2);
368 if (end > -1) {
369 count = atoi(text.substr(start + last_char + 2, end - start - last_char - 2).c_str());
370 if (((unsigned int)count) < new_substring.length())
371 text.replace(start, end + 1 - start, new_substring.substr(count).c_str());
372 else
373 cout << "Warning: deletion of the first chars ('~' command symbol) was failed." << endl;
374 }
375 }
376 if (text[start + last_char + 1] == '-') {
377 end = text.find("}", start + last_char + 2);
378 if (end > -1) {
379 count = atoi(text.substr(start + last_char + 2, end - start - last_char - 2).c_str());
380 if (((unsigned int)count) < new_substring.length())
381 text.replace(start, end + 1 - start,
382 new_substring.substr(0, new_substring.length() - count).c_str());
383 else
384 cout << "Warning: deletion of the last chars ('-' command symbol) was failed." << endl;
385 }
386 }
387 } else
388 text.replace(start, last_char + 1, new_substring.c_str());
389 }
390 } while (start > -1);
391}
392
393// form output file name containing special (${}) variables
394string form_file_name(string out_name, string in_name, int counter, string batch_temp_dir)
395{
396 replace_string_in_text(out_name, "${counter}", counter);
397 replace_string_in_text(out_name, "${batch_temp_dir}", batch_temp_dir);
398
399 replace_string_in_text_ext(out_name, "${input}", in_name);
400 replace_string_in_text_ext(out_name, "${file_dir_name}", get_directory_name(in_name));
401 // ${file_name_with_ext} must be BEFORE ${file_name} because of searching without the last '}'
402 replace_string_in_text_ext(out_name, "${file_name_with_ext}", get_file_name_with_ext(in_name));
403 replace_string_in_text_ext(out_name, "${file_name}", get_file_name(in_name));
404
405 // ${first_number_fn} must be BEFORE ${first_number} because of searching without the last '}'
406 replace_string_in_text_ext(out_name, "${first_number_fn}", find_first_number(get_file_name(in_name)));
407 replace_string_in_text_ext(out_name, "${first_number}", find_first_number(in_name));
408 replace_string_in_text_ext(out_name, "${last_number}", find_last_number(in_name));
409
410 if (out_name.find("${user}") != string::npos) {
411 char* login_name = getlogin();
412 if (login_name != nullptr)
413 replace_string_in_text(out_name, "${user}", login_name);
414 }
415
416 return out_name;
417}
418
419// parse regular expression in the path and return a set of result file paths (for UNIX-systems)
420inline vector<string> glob(const string& path)
421{
422 glob_t glob_result;
423 glob(path.c_str(), GLOB_TILDE, nullptr, &glob_result);
424
425 vector<string> ret;
426 for (unsigned int i = 0; i < glob_result.gl_pathc; i++)
427 ret.push_back(string(glob_result.gl_pathv[i]));
428
429 globfree(&glob_result);
430 return ret;
431}
432
433// generate output file name concatenating with 'counter' value (for partial results)
434string GenerateOutputFilePath(string path, int counter)
435{
436 size_t last_point_idx = path.find_last_of(".");
437
438 string add_string = "_";
439 add_string += int_to_string(counter);
440
441 if (string::npos != last_point_idx)
442 return path.insert(last_point_idx, add_string);
443 else
444 return path.append(add_string);
445}
446
447// sub-thread in working thread to process single file
448void SubThreadProcessFile(shared_ptr<structSubThreadPar> subthread_par)
449{
450 shared_ptr<structThreadPar> thread_par = subthread_par->sptrThreadParameter;
451
452 // get necessary parameters transferred to the sub-thread
453 string config_file = thread_par->sptrJobParameter->strConfigPath;
454 string macro_path = subthread_par->sptrMacroParameter->nstrMacroPath.GetValue();
455 string in_file = subthread_par->strFileIn;
456 string add_args = subthread_par->sptrMacroParameter->strAddArgs;
457 string logs = thread_par->sptrJobParameter->nstrLogs.GetValue();
458
459 int start_event = subthread_par->iStartEvent;
460 int event_count = subthread_par->iEventCount;
461 nullString out_file = subthread_par->nstrFileOut;
462
463 int thread_counter = thread_par->iThreadCounter;
464 int subthread_counter = subthread_par->iSubthreadCounter;
465
466 // using the parameters to generate job string to be executed
467 stringstream ssROOTCommand;
468 ssROOTCommand << config_file << "root -b -q '" << macro_path << "(";
469
470 // first macro argument - input file
471 bool is_first = true;
472 if (in_file != "") {
473 ssROOTCommand << "\"" << in_file << "\"";
474 is_first = false;
475 }
476 // second macro argument - output file
477 if (!out_file.isNull()) {
478 if (!is_first)
479 ssROOTCommand << ", ";
480 else
481 is_first = false;
482 ssROOTCommand << "\"" << out_file.GetValue() << "\"";
483 }
484 // third macro argument - start event number
485 if (!is_first)
486 ssROOTCommand << ", ";
487 else
488 is_first = false;
489 ssROOTCommand << to_string(start_event);
490 // fourth macro argument - event count
491 if (!is_first)
492 ssROOTCommand << ", ";
493 else
494 is_first = false;
495 ssROOTCommand << to_string(event_count);
496 // then add other arguments to the macro
497 if (add_args != "") {
498 if (!is_first)
499 ssROOTCommand << ", ";
500 else
501 is_first = false;
502 ssROOTCommand << add_args;
503 }
504
505 // end of the command with writing to the log or not
506 if (logs != "")
507 ssROOTCommand << ")' > " << logs << "_" << to_string(thread_counter) << "_" << to_string(subthread_counter)
508 << " 2>&1";
509 else
510 ssROOTCommand << ")'";
511
512 // write and execute temporary bash file
513 ofstream myfile;
514 long int t = time(nullptr);
515 // generate output name for temporary bash file
516 stringstream ssBashFile;
517 ssBashFile << "temp_" << to_string(thread_counter) << "_" << to_string(subthread_counter) << "_" << to_string(t)
518 << ".sh";
519 // open bash file and run the ROOT macro command
520 myfile.open(ssBashFile.str().c_str());
521 myfile << "export ROOT_HIST=0 " << endl;
522 myfile << ssROOTCommand.str();
523 myfile.close();
524 stringstream ssRunCommand;
525 ssRunCommand << "bash " << ssBashFile.str();
526
527 // lock thread for sequential output
528 thread_par->mutJobMutex->lock();
529 cout << "nica-scheduler$ Subtask " << thread_counter << ":" << subthread_counter << " is running:" << endl
530 << "input - " << in_file << endl
531 << "output - " << (out_file.isNull() ? "<no output>" : out_file.GetValue()) << endl
532 << "start event - " << start_event << endl
533 << "event count - " << event_count << endl
534 << endl;
535 thread_par->mutJobMutex->unlock();
536
537 // run bash file with the job in the sub-thread
538 int system_return = system(ssRunCommand.str().c_str());
539 if (system_return != 0)
540 cout << "nica-scheduler$ WARNING: System call (in SubThreadProcessFile) returned non-zero code: "
541 << system_return << endl;
542
543 // delete temporary bash file
544 if (!isDebugMode) {
545 stringstream ssDeleteCommand;
546 ssDeleteCommand << "rm -rf " << ssBashFile.str();
547 system_return = system(ssDeleteCommand.str().c_str());
548 if (system_return != 0)
549 cout << "nica-scheduler$ WARNING: System call (in SubThreadProcessFile) returned non-zero code: "
550 << system_return << endl;
551 } else
552 cout << "DEBUG nica-scheduler$ '" << ssBashFile.str() << "' bash file was used " << endl;
553
554 thread_par->semJobSemaphore->release();
555}
556
557/* MAIN THREAD for LOCAL execution */
558void ThreadLocalProcess(shared_ptr<structThreadPar> thread_par)
559{
560 // initialization of variables
561 shared_ptr<structJobPar> job_par = thread_par->sptrJobParameter;
562 int thread_counter = thread_par->iThreadCounter;
563
564 // cycle for all macros in the job
565 for (size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++) {
566 shared_ptr<structMacroPar> cur_macro = job_par->vecMacros[ind_macro];
567
568 // get current file for the current thread
569 structFilePar* cur_file = nullptr;
570 if (cur_macro->vecFiles.size() > 0)
571 cur_file = cur_macro->vecFiles[thread_counter - 1].get();
572
573 // if macro use no input files, then add only additional arguments if exist
574 if (cur_file == nullptr) {
575 // generate ROOT command from the job parameters without input and output files
576 stringstream ssROOTCommand;
577 ssROOTCommand << job_par->strConfigPath << "root -b -q '" << cur_macro->nstrMacroPath.GetValue() << "(";
578 if (cur_macro->strAddArgs != "")
579 ssROOTCommand << cur_macro->strAddArgs;
580 if (job_par->nstrLogs != nullptr)
581 ssROOTCommand << ")' > " << job_par->nstrLogs.GetValue() << "_" << to_string(thread_counter) << " 2>&1";
582 else
583 ssROOTCommand << ")'";
584
585 // display local command in DEBUG mode for processing without input files
586 if (isDebugMode)
587 cout << "DEBUG nica-scheduler$ Local ROOT command = " << ssROOTCommand.str() << endl;
588
589 // write and execute temporary bash file
590 ofstream myfile;
591 long int t = time(nullptr);
592 // generate output name for temporary bash file
593 stringstream ssBashFile;
594 ssBashFile << "temp_" << to_string(thread_counter) << "_" << to_string(t) << ".sh";
595 // open bash file and run the ROOT macro command
596 myfile.open(ssBashFile.str().c_str());
597 myfile << "export ROOT_HIST=0 " << endl;
598 myfile << ssROOTCommand.str();
599 myfile.close();
600 stringstream ssRunCommand;
601 ssRunCommand << "bash " << ssBashFile.str();
602
603 // lock thread for sequential output
604 thread_par->mutJobMutex->lock();
605 cout << "nica-scheduler$ Task " << ind_macro + 1 << " is running..." << endl << endl;
606 thread_par->mutJobMutex->unlock();
607
608 // run bash file with the job in the sub-thread
609 int system_return = system(ssRunCommand.str().c_str());
610 if (system_return != 0)
611 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
612 << system_return << endl;
613
614 // delete temporary bash file
615 if (!isDebugMode) {
616 stringstream ssDeleteCommand;
617 ssDeleteCommand << "rm -rf " << ssBashFile.str();
618 system_return = system(ssDeleteCommand.str().c_str());
619 if (system_return != 0)
620 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
621 << system_return << endl;
622 } else
623 cout << "DEBUG nica-scheduler$ '" << ssBashFile.str() << "' bash file was used " << endl;
624
625 } // if (cur_file == nullptr)
626 // if there are input files for the macro
627 else
628 {
629 // strRealInputFile - input file path for ROOT macro (for intermediate copy case)
630 string strRealInputFile = cur_file->strFileIn;
631 // strRealOutputFile - output file path for ROOT macro (for intermediate copy case)
632 nullString nstrRealOutputFile = cur_file->nstrFileOut;
633
634 // copy file to the temporary location if it is set
635 if (cur_file->strPutPath != "") {
636 stringstream ssCopyCommand;
637 ssCopyCommand << cur_file->strPutCommand << " " << cur_file->strFileIn << " " << cur_file->strPutPath;
638 int system_return = system(ssCopyCommand.str().c_str());
639 if (system_return != 0)
640 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
641 << system_return << endl;
642
643 strRealInputFile = cur_file->strPutPath;
644 }
645 // assign real output file on the intermediate storage
646 if (cur_file->strGetPath != "")
647 nstrRealOutputFile.SetValue(cur_file->strGetPath);
648
649 // get start event for the current file
650 int* cur_start_event = cur_macro->ptrStartEvent.get();
651 if (cur_file->ptrStartEvent != nullptr)
652 cur_start_event = cur_file->ptrStartEvent.get();
653
654 // get event number for the current file
655 int* cur_event_count = cur_macro->ptrEventCount.get();
656 if (cur_file->ptrEventCount != nullptr)
657 cur_event_count = cur_file->ptrEventCount.get();
658
659 // if parallel mode and start event is skipped then assign it to 0
660 if ((cur_file->iParallelMode > 1) && (cur_start_event == nullptr)) {
661 cur_file->ptrStartEvent = make_unique<int>(0);
662 cur_start_event = cur_file->ptrStartEvent.get();
663 cout << "nica-scheduler$ WARNING: parallel_mode is used, but start event is not set, so it is assigned "
664 "to 0"
665 << endl;
666 }
667 // if parallel mode and event coun is skipped then assign it to 0
668 if ((cur_file->iParallelMode > 1) && (cur_event_count == nullptr)) {
669 cur_file->ptrEventCount = make_unique<int>(0);
670 cur_event_count = cur_file->ptrEventCount.get();
671 cout << "nica-scheduler$ WARNING: parallel_mode is used, but event count is not set, so it is assigned "
672 "to 0"
673 << endl;
674 }
675
676 // reset parallel mode if start event or event count is not set
677 if ((cur_file->iParallelMode > 1) && ((cur_start_event == nullptr) || (cur_event_count == nullptr))) {
678 cur_file->iParallelMode = 1;
679 cout << "nica-scheduler$ WARNING: parallel_mode must be used with 'start_event' and 'event_count' "
680 "parameters, parallel_mode is ignored!"
681 << endl;
682 }
683
684 int real_thread_count = 1;
685 // concatenation of output file paths to wait for
686 string strOutputUnion = nstrRealOutputFile.GetValue();
687
688 // variables to combine parallel mode > 1 amd = 1 in one cycle
689 int start_event = -1, event_count = -1;
690 if (cur_start_event != nullptr)
691 start_event = *cur_start_event;
692 if (cur_event_count != nullptr)
693 event_count = *cur_event_count;
694
695 // if parallel mode and event count is 0, then trying to get event number from the input file
696 if ((cur_file->iParallelMode > 1) && (event_count == 0)) {
697 string config_paths = job_par->strConfigPath;
698 // send output of the configuration files to /dev/null if no log file is set
699 if (job_par->nstrLogs == nullptr)
700 replace_string_in_text(config_paths, ";", " > /dev/null 2>&1;");
701 stringstream ssEventNumberCommand;
702 ssEventNumberCommand << config_paths << "show_event_count auto \"" << strRealInputFile
703 << "\" 2> /dev/null";
704 if (isDebugMode)
705 cout << "DEBUG nica-scheduler$ trying to get event count from the input file: "
706 << ssEventNumberCommand.str() << endl;
707
708 array<char, 128> bufferCommand;
709 string str_event_number = "";
710 FILE* stream = popen(ssEventNumberCommand.str().c_str(), "r");
711 while (fgets(bufferCommand.data(), static_cast<int>(bufferCommand.size()), stream) != nullptr)
712 str_event_number += bufferCommand.data();
713 pclose(stream);
714 str_event_number = find_first_number(str_event_number);
715 if (str_event_number == "") {
716 info_message(TString::Format("ERROR nica-scheduler$ parallel_mode is used, but event_count is not "
717 "set and cannot be automatically defined. "
718 "'%s' file will be skipped!",
719 strRealInputFile.c_str())
720 .Data(),
721 1);
722 break;
723 }
724 event_count = atoi(str_event_number.c_str());
725 if (isDebugMode)
726 cout << "DEBUG nica-scheduler$ " << event_count << " event(s) found in the input file" << endl;
727 }
728
729 // if parallel mode and start event is 0, then trying to get start event from the input file
730 vector<nullString> vecPartialOutputFiles;
731 /***********************************************************************/
732 /* if parallel mode > 1 - parallel local processing for one input file */
733 /***********************************************************************/
734 if (cur_file->iParallelMode > 1) {
735 // create sub-threads to additionally parallelize by events in case of 'parallel_mode'
736 vector<thread> vecSubThreads;
737
738 // fill sub-threads parameters and run them to parallelize by events
739 for (int i = 0; i < cur_file->iParallelMode; i++) {
740 int event_per_thread = (event_count + i) / cur_file->iParallelMode;
741 if (event_per_thread != 0) {
742 nullString nstr_output_file = nstrRealOutputFile;
743 // generate output file name for particular processing by events
744 if (nstr_output_file != nullptr)
745 nstr_output_file.SetValue(
746 GenerateOutputFilePath(nstr_output_file.GetValue(), real_thread_count));
747
748 // fill parameters for the current sub-thread (with the current event portion)
749 shared_ptr<structSubThreadPar> subthread_par = make_shared<structSubThreadPar>();
750 subthread_par->sptrThreadParameter = thread_par;
751 subthread_par->sptrMacroParameter = cur_macro;
752 subthread_par->iStartEvent = start_event;
753 subthread_par->iEventCount = event_per_thread;
754 subthread_par->strFileIn = strRealInputFile;
755 subthread_par->nstrFileOut = nstr_output_file;
756 subthread_par->iSubthreadCounter = i + 1;
757
758 // release the semaphore for the current thread to organize parallelization in sub-threads
759 if (i == 0)
760 thread_par->semJobSemaphore->release();
761
762 thread_par->semJobSemaphore->acquire();
763
764 // create sub-threads to parallelize one file processing by events
765 try {
766 vecSubThreads.emplace_back(SubThreadProcessFile, subthread_par);
767 } catch (const system_error& e) {
768 info_message((TString::Format("ERROR nica-scheduler$ error while creating sub-thread with "
769 "code = %d and message = %s",
770 e.code().value(), e.what()))
771 .Data(),
772 1);
773 exit(-1);
774 }
775
776 if (nstr_output_file != nullptr) {
777 strOutputUnion += " " + nstr_output_file.GetValue();
778 vecPartialOutputFiles.push_back(nstr_output_file);
779 }
780
781 real_thread_count++;
782 } // if (event_per_thread != 0){
783
784 start_event = start_event + event_per_thread;
785 } // for (int i = 0; i < cur_file->iParallelMode; i++)
786
787 // waiting for finishing of the child threads
788 if (isDebugMode)
789 cout << "nica-scheduler$ Waiting for " << real_thread_count << " subtask(s) to finish..." << endl;
790 for (auto& sub_thread : vecSubThreads) {
791 try {
792 sub_thread.join();
793 } catch (const system_error& e) {
795 (TString::Format("ERROR nica-scheduler$ Sub-thread failed with code = %d and message = %s",
796 e.code().value(), e.what()))
797 .Data(),
798 1);
799 }
800 }
801 cout << "nica-scheduler$ Subtasks " << ind_macro + 1 << ":[] has been finished" << endl;
802 thread_par->semJobSemaphore->acquire();
803
804 // merge result files or write to single TChain object
805 // iMerge: 0 - save result TChain; 1 - merge to the origin file name and delete partial files;
806 // 2 - merge to the origin file name and NOT delete partial files
807 if ((cur_file->iMerge >= 0) && (strOutputUnion != "")) {
808 string UNIONc_path = get_app_dir_linux() + "merge_result.C";
809
810 // form command for merging result files
811 stringstream ssMergeCommand;
812 ssMergeCommand << job_par->strConfigPath << "root -b -q '" << UNIONc_path << "(\"" << strOutputUnion
813 << "\", " << to_string(cur_file->iMerge) << ")'";
814
815 // save merging log to file if it is set
816 if (job_par->nstrLogs != nullptr)
817 ssMergeCommand << " >> " << job_par->nstrLogs.GetValue() << " 2>&1";
818
819 // run merge_result.C macro in bash
820 ofstream myfile;
821 long int t = time(nullptr);
822 stringstream ssBashFile;
823 ssBashFile << "temp_union_" << to_string(t) << ".sh";
824 myfile.open(ssBashFile.str().c_str());
825 myfile << ssMergeCommand.str();
826 myfile.close();
827 stringstream ssBashCommand;
828 ssBashCommand << "bash " << ssBashFile.str().c_str();
829
830 cout << endl
831 << "nica-scheduler$ Merging the result of subtasks " << ind_macro + 1 << ":[]..." << endl
832 << endl;
833 int system_return = system(ssBashCommand.str().c_str());
834 if (system_return != 0)
835 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
836 << system_return << endl;
837
838 // delete temporary bash file
839 if (!isDebugMode) {
840 stringstream ssRemoveCommand;
841 ssRemoveCommand << "rm -rf " << ssBashFile.str();
842 system_return = system(ssRemoveCommand.str().c_str());
843 if (system_return != 0)
844 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero "
845 "code: "
846 << system_return << endl;
847 } else
848 cout << "DEBUG nica-scheduler$ '" << ssBashFile.str() << "' bash file was used " << endl;
849 } // if - merge result files
850 } // if (parallel_mode > 1)
851 /************************/
852 /* if parallel mode = 1 */
853 /************************/
854 else
855 {
856 // using the parameters to generate job string to be executed
857 stringstream ssROOTCommand;
858 ssROOTCommand << job_par->strConfigPath << "root -b -q '" << cur_macro->nstrMacroPath.GetValue() << "(";
859
860 // first macro argument - input file
861 bool is_first = true;
862 if (strRealInputFile != "") {
863 ssROOTCommand << "\"" << strRealInputFile << "\"";
864 is_first = false;
865 }
866 // second macro argument - output file
867 if (nstrRealOutputFile != nullptr) {
868 if (!is_first)
869 ssROOTCommand << ", ";
870 else
871 is_first = false;
872 ssROOTCommand << "\"" << nstrRealOutputFile.GetValue() << "\"";
873 }
874 // third macro argument - start event number
875 if (cur_start_event != nullptr) {
876 if (!is_first)
877 ssROOTCommand << ", ";
878 else
879 is_first = false;
880 ssROOTCommand << to_string(*cur_start_event);
881 }
882 // fourth macro argument - event count
883 if (cur_event_count != nullptr) {
884 if (!is_first)
885 ssROOTCommand << ", ";
886 else
887 is_first = false;
888 ssROOTCommand << to_string(*cur_event_count);
889 }
890 // then add other arguments to the macro
891 if (cur_macro->strAddArgs != "") {
892 if (!is_first)
893 ssROOTCommand << ", ";
894 else
895 is_first = false;
896 ssROOTCommand << cur_macro->strAddArgs;
897 }
898
899 // end of the command with writing to the log or not
900 if (job_par->nstrLogs != nullptr)
901 ssROOTCommand << ")' > " << job_par->nstrLogs.GetValue() << "_" << to_string(thread_counter)
902 << " 2>&1";
903 else
904 ssROOTCommand << ")'";
905
906 // display local ROOT command in debug mode for processing with input files
907 if (isDebugMode)
908 cout << "DEBUG nica-scheduler$ Local command = " << ssROOTCommand.str() << endl;
909
910 // write and execute temporary bash file
911 ofstream myfile;
912 long int t = time(nullptr);
913 // generate output name for temporary bash file
914 stringstream ssBashFile;
915 ssBashFile << "temp_" << to_string(thread_counter) << "_" << "_" << to_string(t) << ".sh";
916 // open bash file and run the ROOT macro command
917 myfile.open(ssBashFile.str().c_str());
918 myfile << "export ROOT_HIST=0 " << endl;
919 myfile << ssROOTCommand.str();
920 myfile.close();
921 stringstream ssRunCommand;
922 ssRunCommand << "bash " << ssBashFile.str();
923
924 // lock thread for sequential output
925 thread_par->mutJobMutex->lock();
926 cout << "nica-scheduler$ Task " << thread_counter << " is running..." << endl;
927 if (strRealInputFile != "")
928 cout << "input data - " << strRealInputFile << endl;
929 if (nstrRealOutputFile != nullptr)
930 cout << "output data - " << nstrRealOutputFile.GetValue() << endl;
931 cout << endl;
932 thread_par->mutJobMutex->unlock();
933
934 // run bash file with the job in the sub-thread
935 int system_return = system(ssRunCommand.str().c_str());
936 if (system_return != 0)
937 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
938 << system_return << endl;
939
940 // delete temporary bash file
941 if (!isDebugMode) {
942 stringstream ssRemoveCommand;
943 ssRemoveCommand << "rm -rf " << ssBashFile.str();
944 system_return = system(ssRemoveCommand.str().c_str());
945 if (system_return != 0)
946 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
947 << system_return << endl;
948 } else
949 cout << "DEBUG nica-scheduler$ '" << ssBashFile.str() << "' bash file was used " << endl;
950 } // else (parallel_mode = 1)
951
952 // copy file (or files in case of parallel_mode > 1) from the temporary location to the final output path
953 if ((cur_file->strGetPath != "") && (cur_file->nstrFileOut != nullptr)) {
954 stringstream ssCopyCommand;
955 ssCopyCommand << cur_file->strGetCommand << " " << cur_file->strGetPath << " "
956 << cur_file->nstrFileOut.GetValue();
957 int system_return = system(ssCopyCommand.str().c_str());
958 if (system_return != 0)
959 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
960 << system_return << endl;
961
962 // also copy partial result files for parallel_mode > 1
963 if ((cur_file->iParallelMode > 1) && ((cur_file->iMerge == 0) || (cur_file->iMerge == 2))) {
964 for (nullString& cur_partial_file : vecPartialOutputFiles) {
965 stringstream ssCopyPartialsCommand;
966 ssCopyPartialsCommand << cur_file->strGetCommand << " " << cur_partial_file.GetValue() << " "
967 << get_directory_path(cur_file->nstrFileOut.GetValue()) + string("/")
968 + get_file_name_with_ext(cur_partial_file.GetValue());
969 system_return = system(ssCopyPartialsCommand.str().c_str());
970 if (system_return != 0)
971 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero "
972 "code: "
973 << system_return << endl;
974 }
975 } // if iParallelMode > 1 && iMerge = 0,2 then copy all partial result files
976 } // copy file from the temporary location to the output path
977
978 // remove the temporary input files from <put>
979 if (cur_file->strPutPath != "") {
980 stringstream ssRemoveCommand;
981 ssRemoveCommand << "rm -rf " << cur_file->strPutPath;
982 int system_return = system(ssRemoveCommand.str().c_str());
983 if (system_return != 0)
984 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
985 << system_return << endl;
986 }
987 // remove the temporary output files from <get>
988 if (cur_file->strGetPath != "") {
989 stringstream ssRemoveCommand;
990 ssRemoveCommand << "rm -rf " << cur_file->strGetPath;
991 int system_return = system(ssRemoveCommand.str().c_str());
992 if (system_return != 0)
993 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
994 << system_return << endl;
995
996 // remove partial result files in case of the parallel mode for by-event processing
997 if ((cur_file->iParallelMode > 1) && ((cur_file->iMerge == 0) || (cur_file->iMerge == 2))) {
998 for (nullString& cur_partial_file : vecPartialOutputFiles) {
999 stringstream ssRemovePartialCommand;
1000 ssRemovePartialCommand << "rm -rf " << cur_partial_file.GetValue();
1001 system_return = system(ssRemovePartialCommand.str().c_str());
1002 if (system_return != 0)
1003 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero "
1004 "code: "
1005 << system_return << endl;
1006 }
1007 } // if iParallelMode > 1 && iMerge = 0,2 then delete all partial result files
1008 } // if (cur_file->strGetPath != "")
1009
1010 // remove the temporary files from <clean>
1011 if (!cur_file->vecCleanPath.empty()) {
1012 stringstream ssRemoveCommand;
1013 for (const string& cur_clean_path : cur_file->vecCleanPath)
1014 ssRemoveCommand << "rm -rf " << cur_clean_path << "; ";
1015 int system_return = system(ssRemoveCommand.str().c_str());
1016 if (system_return != 0)
1017 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
1018 << system_return << endl;
1019 }
1020
1021 if (cur_file->iParallelMode == 1)
1022 cout << "nica-scheduler$ Task " << thread_counter << " has been finished" << endl;
1023 } // if (cur_file != nullptr)
1024 } // for (size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++)
1025
1026 thread_par->semJobSemaphore->release();
1027}
1028
1029// options for running the NICA-Scheduler
1030static struct option long_options[] = {{"help", no_argument, 0, 'h'}, {"file", required_argument, 0, 'f'},
1031 {"debug", no_argument, 0, 'd'}, {"db", required_argument, 0, 0},
1032 {"database", required_argument, 0, 0}, {0, 0, 0, 0}};
1033
1034// help message for the NICA-Scheduler
1036{
1037 cout << "Usage: nica-scheduler job_description.xml [options]" << endl
1038 << endl
1039 << "Options:" << endl
1040 << " -h/--help : produce help message" << endl
1041 << " -f/--file arg : set XML file path with a job description" << endl
1042 << " -d/--debug : debug mode with detailed output" << endl
1043 << " --db/--database arg : predefined settings for the Condition Database (now 'bmn' supported only)"
1044 << endl
1045 << endl;
1046}
1047
1048/* Main Function of the NICA Scheduler*/
1049int main(int argc, char** argv)
1050{
1051 // if no necessary arguments are given for the NICA-Scheduler
1052 if (argc < 2) {
1054 info_message("ERROR nica-scheduler$ XML file path (containing job description) must be set", 1);
1055 return 1;
1056 }
1057
1058 // parsing command line arguments
1059 string xml_file_path = "", exp_database = "";
1060 int option_index = 0, c;
1061 while ((c = getopt_long(argc, argv, "hf:d", long_options, &option_index)) != -1) {
1062 switch (c) {
1063 case 0: {
1064 if (long_options[option_index].flag != 0)
1065 break;
1066 printf("option %s", long_options[option_index].name);
1067 if (optarg)
1068 printf("with arg %s", optarg);
1069 printf("\n");
1070
1071 if (strcmp(long_options[option_index].name, "db") == 0)
1072 exp_database = optarg;
1073 if (strcmp(long_options[option_index].name, "database") == 0)
1074 exp_database = optarg;
1075 break;
1076 }
1077 case 'h':
1079 return 0;
1080 case 'f':
1081 xml_file_path = optarg;
1082 break;
1083 case 'd':
1084 isDebugMode = true;
1085 break;
1086 case '?':
1087 /* getopt_long already printed an error message. */
1088 break;
1089 }
1090 }
1091 // remaining command line arguments (not options)
1092 if (optind < argc) {
1093 if (((argc - optind) == 1) && (xml_file_path == ""))
1094 xml_file_path = argv[optind];
1095 else {
1096 info_message("ERROR nica-scheduler$ Argument list is incorrect.\nPlease, check information on command line "
1097 "arguments in the 'README.md' file",
1098 1);
1099 return 1;
1100 }
1101 }
1102
1103 // check that XML job file is not empty
1104 if (xml_file_path == "") {
1105 info_message("ERROR nica-scheduler$ XML file path (containing job description) must be set", 1);
1106 return 1;
1107 }
1108
1109 // high means the high priority value of the parameter (a console parameter has the highest priority for the
1110 // settings)
1111 bool isDBSettingsHigh = false;
1112 struct_database_settings dbSettingsPre;
1113 if (exp_database != "") {
1114 auto iter_db_settings = predefined_database_settings.find(exp_database);
1115 if (iter_db_settings == predefined_database_settings.end()) {
1116 info_message((TString::Format("ERROR nica-scheduler$ Predefined database settings were not found for '%s'",
1117 exp_database.c_str()))
1118 .Data(),
1119 1);
1120 return 1;
1121 } else {
1122 dbSettingsPre = iter_db_settings->second;
1123 isDBSettingsHigh = true;
1124 }
1125 }
1126
1127 // obtain pointer to XML document for a given input file path
1128 xmlDocPtr doc = xmlReadFile(xml_file_path.c_str(), nullptr, 0);
1129 if (doc == nullptr) {
1131 (TString::Format("ERROR nica-scheduler$ Failed to open input XML file: '%s'", xml_file_path.c_str()))
1132 .Data(),
1133 1);
1134 return 2;
1135 }
1136
1137 // get root element of the XML job description
1138 xmlNodePtr root = xmlDocGetRootElement(doc);
1139
1140 // map with Job Name -> ID
1141 multimap<string, string> mapJobName2ID;
1142 // map with Job Name -> Output List
1143 multimap<string, unique_ptr<vector<nullString>>> mapJobName2FileOut;
1144
1145 // path to executable's directory ('/' - last char)
1146 string exe_dir = get_app_dir_linux();
1147
1148 // initialization before job cycle
1149 srand(time(nullptr));
1150 BATCH_SYSTEM_NAME batch_system = SLURM_BATCH_SYSTEM; // default value: SLURM batch system
1151
1152 // cycle for all jobs in the XML description
1153 xmlNodePtr cur_node = root;
1154 while (cur_node) {
1155 // if tag is XML_ELEMENT_NODE then continue
1156 if (cur_node->type == XML_ELEMENT_NODE) {
1157 char* pc_temp;
1158 int i_temp;
1159 // JOBS TAG (array of jobs) then step down and continue
1160 if (strcmp((char*)cur_node->name, "jobs") == 0) {
1161 // read BATCH system name in the XML description
1162 pc_temp = (char*)xmlGetProp(cur_node, (unsigned char*)"batch");
1163 if (pc_temp != nullptr) {
1164 char* pc_batch_system = convert_pchar_to_lowercase_new(pc_temp);
1165 if (strcmp(pc_batch_system, "sge") == 0)
1166 batch_system = SGE_BATCH_SYSTEM;
1167 else {
1168 if (strcmp(pc_batch_system, "torque") == 0)
1169 batch_system = TORQUE_BATCH_SYSTEM;
1170 }
1171 free(pc_temp);
1172 free(pc_batch_system);
1173 }
1174 // read UniConDa database settings in the XML description
1175 pc_temp = (char*)xmlGetProp(cur_node, (unsigned char*)"database");
1176 if ((pc_temp != nullptr) && (!isDBSettingsHigh)) {
1177 auto iter_db_settings = predefined_database_settings.find(pc_temp);
1178 if (iter_db_settings == predefined_database_settings.end())
1179 info_message((TString::Format("WARNING nica-scheduler$ Predefined database settings were not "
1180 "found for '%s'. The parameter will be ignored.",
1181 pc_temp))
1182 .Data(),
1183 1);
1184 else
1185 dbSettingsPre = iter_db_settings->second;
1186 free(pc_temp);
1187 }
1188
1189 cur_node = cur_node->children;
1190 }
1191
1192 // JOB TAG
1193 if (strcmp((char*)cur_node->name, "job") == 0) {
1194 bool isError = false;
1195 // bool isPuts = false, isGets = false, isCleans = false;
1196 shared_ptr<structJobPar> job_par = make_shared<structJobPar>();
1197
1198 // read Job Name in the XML description
1199 pc_temp = (char*)xmlGetProp(cur_node, (unsigned char*)"name");
1200 if (pc_temp != nullptr) {
1201 job_par->strJobName = pc_temp;
1202 free(pc_temp);
1203 }
1204
1205 struct_database_settings dbSettings(dbSettingsPre);
1206 // read UniConDa database settings in the XML description
1207 pc_temp = (char*)xmlGetProp(cur_node, (unsigned char*)"database");
1208 if ((pc_temp != nullptr) && (!isDBSettingsHigh)) {
1209 auto iter_db_settings = predefined_database_settings.find(pc_temp);
1210 if (iter_db_settings == predefined_database_settings.end())
1211 info_message((TString::Format("WARNING nica-scheduler$ Predefined database settings were not "
1212 "found for '%s'. The parameter will be ignored.",
1213 pc_temp))
1214 .Data(),
1215 1);
1216 else
1217 dbSettings = iter_db_settings->second;
1218 free(pc_temp);
1219 }
1220 if (dbSettings.db_name == "") {
1221 dbSettings.db_host = UNI_DB_HOST;
1222 dbSettings.db_name = UNI_DB_NAME;
1223 dbSettings.db_username = UNI_DB_USERNAME;
1224 dbSettings.db_password = UNI_DB_PASSWORD;
1225 }
1226 job_par->dbSettings = dbSettings;
1227
1228 // read BATCH system name in the XML description
1229 pc_temp = (char*)xmlGetProp(cur_node, (unsigned char*)"batch");
1230 if (pc_temp != nullptr) {
1231 char* pc_batch_system = convert_pchar_to_lowercase_new(pc_temp);
1232 if (strcmp(pc_batch_system, "slurm") == 0)
1233 batch_system = SLURM_BATCH_SYSTEM;
1234 else {
1235 if (strcmp(pc_batch_system, "torque") == 0)
1236 batch_system = TORQUE_BATCH_SYSTEM;
1237 }
1238 free(pc_temp);
1239 free(pc_batch_system);
1240 }
1241 batch_commands batch_com = mapBS[batch_system];
1242
1243 // fill BASH file content to be executed in the BATCH system
1244 // transferring individual parameters to the JOB ARRAY using DECLARE bash variable
1245 // DECLARE method can lead to the overflow of the content of the variables (mb better "CAT << EOF" files
1246 // method)
1247 string strBatchFile = batch_com.job_begin, strBatchInitInputFile = "declare -a InputFileArray=(",
1248 strBatchInitOutputFile = "declare -a OutputFileArray=(",
1249 strBatchInitStartEvent = "declare -a StartEventArray=(",
1250 strBatchInitEventCount = "declare -a EventCountArray=(",
1251 strBatchInitAddArgs = "declare -a AddArgsArray=(", strBatchInitPut = "declare -a PutArray=(",
1252 strBatchInitGet = "declare -a GetArray=(", strBatchInitClean = "declare -a CleanArray=(";
1253
1254 // read Job Dependency in the XML description
1255 pc_temp = (char*)xmlGetProp(cur_node, (unsigned char*)"dependency");
1256 if (pc_temp != nullptr) {
1257 multimap<string, string>::iterator it = mapJobName2ID.find(pc_temp);
1258 if (it == mapJobName2ID.end())
1259 cout << "nica-scheduler$ WARNING: the following dependency for '" << job_par->strJobName
1260 << "' job was not found: " << pc_temp << endl;
1261 else
1262 job_par->strDdependencyName = batch_com.array_dependency_option + it->second;
1263 free(pc_temp);
1264 }
1265
1266 /* PARSING TAGS of the JOB */
1267 xmlNodePtr sub_node = cur_node->children;
1268 while (sub_node) {
1269 if (sub_node->type == XML_ELEMENT_NODE) {
1270 // COMMAND LINE TAG
1271 if (strcmp((char*)sub_node->name, "command") == 0) {
1272 if (job_par->nstrCommandline != nullptr)
1273 cout << "nica-scheduler$ WARNING: job command line is reassigned" << endl;
1274
1275 job_par->nstrCommandline.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"line"));
1276 if (job_par->nstrCommandline == nullptr) {
1277 isError = true;
1278 info_message("ERROR nica-scheduler$ 'line' attribute was not set for the command!", 1);
1279 break;
1280 }
1281
1282 sub_node = sub_node->next;
1283 continue;
1284 }
1285
1286 // MACRO NAME TAG
1287 if (strcmp((char*)sub_node->name, "macro") == 0) {
1288 shared_ptr<structMacroPar> macro_par = make_shared<structMacroPar>();
1289
1290 // read Macro Name in the XML description (for dependency from macro)
1291 // macro_par->nstrMacroName.SetValue((char*) xmlGetProp(sub_node, (unsigned char*)"name"));
1292 // read Macro Path in the XML description
1293 macro_par->nstrMacroPath.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"path"));
1294 if (macro_par->nstrMacroPath == nullptr) {
1295 isError = true;
1297 "ERROR nica-scheduler$ Macro path was not set (please, add 'path' attribute)!", 1);
1298 break;
1299 }
1300
1301 // read Global Start Event value in the XML description
1302 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"start_event");
1303 if (pc_temp != nullptr) {
1304 macro_par->ptrStartEvent = make_unique<int>(atoi(pc_temp));
1305 free(pc_temp);
1306 }
1307
1308 // read Global Event Count value in the XML description
1309 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"event_count");
1310 if (pc_temp != nullptr) {
1311 if (!is_string_number(pc_temp)) {
1312 isError = true;
1313 info_message("ERROR nica-scheduler$ Event count must be a number!", 1);
1314 free(pc_temp);
1315 break;
1316 }
1317 macro_par->ptrEventCount = make_unique<int>(atoi(pc_temp));
1318 if (*macro_par->ptrEventCount < 0) {
1319 isError = true;
1320 info_message("ERROR nica-scheduler$ Event count must be a positive number or 0 "
1321 "(for all events)!",
1322 1);
1323 free(pc_temp);
1324 break;
1325 }
1326 free(pc_temp);
1327 }
1328
1329 // read Additional Arguments of the macro in the XML description
1330 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"add_args");
1331 if (pc_temp != nullptr) {
1332 macro_par->strAddArgs = pc_temp;
1333 free(pc_temp);
1334 // replace ''' symbols for '"'
1335 replace_string_in_text(macro_par->strAddArgs, "'", "\"");
1336 }
1337
1338 // read File List to process by the macro in the XML description
1339 if (ParseMacroFiles(sub_node->children, job_par, macro_par, mapJobName2FileOut, batch_com)
1340 == true)
1341 isError = true;
1342
1343 job_par->vecMacros.push_back(macro_par);
1344
1345 sub_node = sub_node->next;
1346 continue;
1347 }
1348
1349 // RUN TYPE TAG
1350 if (strcmp((char*)sub_node->name, "run") == 0) {
1351 // read Run Mode in the XML description: local, global or container
1352 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"mode");
1353 if (pc_temp != nullptr) {
1354 if (strcmp(pc_temp, "global") == 0)
1355 job_par->iRunMode = 1;
1356 else if (strcmp(pc_temp, "container") == 0)
1357 job_par->iRunMode = 2;
1358
1359 free(pc_temp);
1360 }
1361
1362 // read Processor Count to use in the XML description
1363 // it will be corrected according to count of subtasks
1364 job_par->nstrProcCount.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"count"));
1365 // read Operative Memory required for 1 task
1366 // the RAM size is converted to MegaBytes
1367 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"memory1");
1368 if (pc_temp != nullptr) {
1369 i_temp = floor(byte_size_to_double(pc_temp, 'M') + 0.5);
1370 if (i_temp > 0)
1371 job_par->strOperativeMemory1 = int_to_string(i_temp);
1372 else
1373 cout << "nica-scheduler$ WARNING: 'memory1' field contains incorrect format, then "
1374 "it is ignored"
1375 << endl;
1376
1377 free(pc_temp);
1378 }
1379 // read Configuration File in the XML description
1380 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"config");
1381 if (pc_temp != nullptr) {
1382 job_par->strConfigPath = pc_temp;
1383 free(pc_temp);
1384 }
1385 // read Job Priority in the XML description
1386 job_par->nstrPriority.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"priority"));
1387 // read Log File Path in the XML description
1388 job_par->nstrLogs.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"logs"));
1389 // read Queue Name of the batch system in the XML description
1390 job_par->nstrQueue.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"queue"));
1391 // read Quality-Of-Service Name of the batch system in the XML description
1392 job_par->nstrQOS.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"qos"));
1393 // read Working Directory of the batch system in the XML description
1394 job_par->nstrWorkDir.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"work_dir"));
1395 // read Container OS in the XML description
1396 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"container_os");
1397 if (pc_temp != nullptr) {
1398 job_par->strContainerType = pc_temp;
1399 free(pc_temp);
1400 }
1401
1402 // read Host List, on which the job will be executed
1403 job_par->nstrHosts.SetValue((char*)xmlGetProp(sub_node, (unsigned char*)"hosts"));
1404 // change comma separator with a specific symbol corresponding to the batch system
1405 if ((job_par->nstrHosts != nullptr) && (batch_com.sheduler_hosts_separation != ','))
1406 replace(job_par->nstrHosts.GetValue().begin(), job_par->nstrHosts.GetValue().end(), ',',
1407 batch_com.sheduler_hosts_separation);
1408
1409 // read Core Dump Flag (whether write a core dump in case of errors)
1410 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"core_dump");
1411 if (pc_temp != nullptr) {
1412 if (strcmp(pc_temp, "true") == 0)
1413 job_par->isCoreDump = true;
1414 free(pc_temp);
1415 }
1416
1417 sub_node = sub_node->next;
1418 continue;
1419 }
1420 }
1421
1422 sub_node = sub_node->next;
1423 } // while PARSING TAGS
1424
1425 /*****************************************************/
1426 /* START MAIN EXECUTION AFTER PARSING XML PARAMETERS */
1427 /*****************************************************/
1428 // check that there are macros to execute and it is not a command line
1429 if ((job_par->nstrCommandline == nullptr) && (job_par->vecMacros.size() == 0)) {
1430 info_message("ERROR nica-scheduler$ No macros in the job!", 1);
1431 isError = true;
1432 }
1433 // if errors are present while parsing parameters then continue
1434 if (isError) {
1435 info_message("ERROR nica-scheduler$ This job will be skipped because of errors above!", 1);
1436 cur_node = cur_node->next;
1437 continue;
1438 }
1439
1440 // store list with output file paths for the whole job (the last macro)
1441 unique_ptr<vector<nullString>> vecJobFileOut = make_unique<vector<nullString>>();
1442 // 1. define total processor count and fill 'parallel_mode' parameter for each file of the job
1443 // 2. save list of output files of the whole job (output of the last macro) to mapJobName2FileOut
1444 int globalParallelCount = 0;
1445 if (job_par->nstrCommandline != nullptr)
1446 globalParallelCount = 1;
1447 // cycle by macros of the job
1448 for (size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++) {
1449 int iTotalParallelCount;
1450 // if the job contain a command line then no parallelization
1451 if (job_par->nstrCommandline != nullptr)
1452 iTotalParallelCount = 1;
1453 else
1454 iTotalParallelCount = 0;
1455
1456 shared_ptr<structMacroPar> cur_macro = job_par->vecMacros[ind_macro];
1457 // cycle by files of the current macro
1458 for (const auto& iter : cur_macro->vecFiles) {
1459 structFilePar* filePar = iter.get();
1460 if (filePar->strParallelMode != "") {
1461 // if parallel mode is set to 0, than all cores are used
1462 if (filePar->strParallelMode == "0") {
1463 // in the local mode, all CPU cores are used
1464 if (job_par->iRunMode == 0)
1466 else {
1467 // in the global mode, all queue slots are used
1468 if (job_par->nstrQueue == nullptr)
1469 filePar->iParallelMode = get_batch_processor_count(batch_system);
1470 else
1471 filePar->iParallelMode =
1472 get_batch_processor_count(batch_system, job_par->nstrQueue.GetValue());
1473 }
1474
1475 // parallel mode is 1 by default (if incorrect values set)
1476 if (filePar->iParallelMode < 1)
1477 filePar->iParallelMode = 1;
1478 } else {
1479 filePar->iParallelMode = atoi(filePar->strParallelMode.c_str());
1480 // parallel mode is 1 by default (if incorrect values set)
1481 if (filePar->iParallelMode < 1)
1482 filePar->iParallelMode = 1;
1483 }
1484 } else
1485 filePar->iParallelMode = 1;
1486
1487 // total processor count for all files of the macro
1488 iTotalParallelCount += filePar->iParallelMode;
1489
1490 // fill list with output file paths for the whole job (the last macro)
1491 if (ind_macro == (job_par->vecMacros.size() - 1))
1492 vecJobFileOut->push_back(filePar->nstrFileOut);
1493 } // for all files of the current macro
1494
1495 if ((ind_macro > 0) && (globalParallelCount != iTotalParallelCount)) {
1496 info_message("ERROR nica-scheduler$ The file count in macros of the job is not the same. This "
1497 "job will be skipped!",
1498 1);
1499 cur_node = cur_node->next;
1500 continue;
1501 }
1502 globalParallelCount = iTotalParallelCount;
1503 } // for job macros
1504
1505 // deleting vecJobFileOut -> do not use later
1506 mapJobName2FileOut.insert(
1507 pair<string, unique_ptr<vector<nullString>>>(job_par->strJobName, std::move(vecJobFileOut)));
1508
1509 // form command to source ('.' is alias) all configuration files separated by ',' or ';'
1510 // the result form: ". config1; . config2; "
1511 job_par->strConfigPath = trim(job_par->strConfigPath);
1512 if (job_par->strConfigPath != "") {
1513 replace_string_in_text(job_par->strConfigPath, ",", ";");
1514 if (job_par->strConfigPath[job_par->strConfigPath.length() - 1] == ';')
1515 job_par->strConfigPath = job_par->strConfigPath.substr(0, job_par->strConfigPath.length() - 1);
1516 replace_string_in_text(job_par->strConfigPath, ";", ";. ");
1517 job_par->strConfigPath = ". " + job_par->strConfigPath + "; ";
1518 // if log file is set then add redirection of the config output to the log file
1519 if (job_par->nstrLogs != nullptr)
1520 replace_string_in_text(job_par->strConfigPath, ";",
1521 (string) " > " + job_par->nstrLogs.GetValue() + (string) " 2>&1;");
1522 }
1523
1524 /*****************************************/
1525 /* GLOBAL SCHEDULING W/WO CONTAINER MODE */
1526 /*****************************************/
1527 FILE* stream = nullptr;
1528 // if not local run mode
1529 if (job_par->iRunMode > 0) {
1530 // if container mode, define container batch file
1531 string strApptainerRunFile = "#!/bin/bash\n\n";
1532 if (job_par->iRunMode == 2) {
1533 strApptainerRunFile += "echo \"Starting Apptainer Job\"\n";
1534 if (job_par->strConfigPath != "")
1535 strApptainerRunFile += job_par->strConfigPath + "\n\n";
1536 strApptainerRunFile += "RUN_CMD=\"root -q -b \"\n";
1537 strApptainerRunFile += "RUN_CMD+=\"${@}\"\n\n";
1538 if (isDebugMode)
1539 strApptainerRunFile +=
1540 "echo \"DEBUG nica-scheduler: ROOT command in container = $RUN_CMD\"\n";
1541 strApptainerRunFile += "eval $RUN_CMD\n";
1542 strApptainerRunFile += "exit 0\n";
1543 }
1544
1545 // define process count for the job
1546 int proc_count = 1;
1547 if (job_par->nstrProcCount != nullptr) {
1548 // if processor count was set to 0 then use all the available processors
1549 if ((job_par->nstrProcCount.GetValue()[0] == '0')
1550 && (job_par->nstrProcCount.GetValue().length() == 1))
1551 {
1552 if (job_par->nstrQueue == nullptr)
1553 proc_count = get_batch_processor_count(batch_system);
1554 else
1555 proc_count = get_batch_processor_count(batch_system, job_par->nstrQueue.GetValue());
1556
1557 if (proc_count == 0) {
1558 info_message("ERROR nica-scheduler$ Batch processors were not defined. This job will "
1559 "be skipped!",
1560 1);
1561 cur_node = cur_node->next;
1562 continue;
1563 }
1564
1565 cout << "nica-scheduler$ The batch queue has " << proc_count
1566 << " processor cores being available for user jobs." << endl;
1567 } else
1568 proc_count = atoi(job_par->nstrProcCount.GetValue().c_str());
1569 } else
1570 proc_count = globalParallelCount;
1571
1572 if (proc_count == 0) {
1574 "ERROR nica-scheduler$ Processor count can not be equal 0. This job will be skipped!", 1);
1575 cur_node = cur_node->next;
1576 continue;
1577 }
1578
1579 // disable dumping core in case of failures
1580 if (job_par->isCoreDump)
1581 strBatchFile += "ulimit -c 0\n\n"; //"set ulimit -c 0\n\n";
1582
1583 // load configuration file for environment
1584 if ((job_par->iRunMode != 2) && (job_par->strConfigPath != ""))
1585 strBatchFile += job_par->strConfigPath + "\n\n";
1586
1587 // define priority of the job (default: 0)
1588 int iPriority = 0;
1589 if (job_par->nstrPriority != nullptr)
1590 iPriority = atoi(job_par->nstrPriority.GetValue().c_str());
1591
1592 // process count is assigned to input file count (taking parallel_mode in account) if greater
1593 if ((globalParallelCount > 0) && (proc_count > globalParallelCount))
1594 proc_count = globalParallelCount;
1595
1596 // store merge modes and partial outputs to organize merging for result files
1597 vector<int> vecMergeMode;
1598 vector<nullString> vecPartialOutputFiles;
1599
1600 // if there are files to process then write processing info to the special variables
1601 if (globalParallelCount > 0) {
1602 // cycle by macros of the job
1603 for (const auto& cur_macro : job_par->vecMacros) {
1604 // cycle by files of the current macro
1605 for (const auto& cur_file : cur_macro->vecFiles) {
1606 // input file path for the ROOT macro
1607 string strRealInputFile = cur_file->strFileIn;
1608 // output file path for the ROOT macro
1609 nullString nstrRealOutputFile = cur_file->nstrFileOut;
1610
1611 // in case of using a fast storage
1612 // set real input file path for the ROOT macro
1613 if (cur_file->strPutPath != "")
1614 strRealInputFile = cur_file->strPutPath;
1615 // in case of using a fast storage
1616 // set real output file path for the ROOT macro
1617 if (cur_file->strGetPath != "")
1618 nstrRealOutputFile.SetValue(cur_file->strGetPath);
1619
1620 // get start event for the current file
1621 int* cur_start_event = cur_macro->ptrStartEvent.get();
1622 if (cur_file->ptrStartEvent != nullptr)
1623 cur_start_event = cur_file->ptrStartEvent.get();
1624
1625 // get event count for the current file
1626 int* cur_event_count = cur_macro->ptrEventCount.get();
1627 if (cur_file->ptrEventCount != nullptr)
1628 cur_event_count = cur_file->ptrEventCount.get();
1629
1630 // if parallel mode and start event is skipped then assign it to 0
1631 if ((cur_file->iParallelMode > 1) && (cur_start_event == nullptr)) {
1632 cur_file->ptrStartEvent = make_unique<int>(0);
1633 cur_start_event = cur_file->ptrStartEvent.get();
1634 cout << "nica-scheduler$ WARNING: parallel_mode is used, but start event is not "
1635 "set, so it is assigned to 0"
1636 << endl;
1637 }
1638 // if parallel mode and event count is skipped then assign it to 0
1639 if ((cur_file->iParallelMode > 1) && (cur_event_count == nullptr)) {
1640 cur_file->ptrEventCount = make_unique<int>(0);
1641 cur_event_count = cur_file->ptrEventCount.get();
1642 cout << "nica-scheduler$ WARNING: parallel_mode is used, but event count is not "
1643 "set, so it is assigned to 0"
1644 << endl;
1645 }
1646
1647 // generate output name for output paths
1648 string strOutputUnion = nstrRealOutputFile.GetValue();
1649
1650 // variables to combine parallel mode > 1 amd = 1 in one cycle
1651 int start_event = -1, event_count = -1;
1652 if (cur_start_event != nullptr)
1653 start_event = *cur_start_event;
1654 if (cur_event_count != nullptr)
1655 event_count = *cur_event_count;
1656
1657 // if parallel mode and event count is 0
1658 // then trying to get event number from the input file
1659 if ((cur_file->iParallelMode > 1) && (event_count == 0)) {
1660 string config_paths = job_par->strConfigPath;
1661 // send output of the configuration files to /dev/null if no log file is set
1662 if (job_par->nstrLogs == nullptr)
1663 replace_string_in_text(config_paths, ";", " > /dev/null 2>&1;");
1664 stringstream ssEventNumberCommand;
1665 ssEventNumberCommand << config_paths << "show_event_count auto \""
1666 << strRealInputFile << "\" 2> /dev/null";
1667 if (isDebugMode)
1668 cout << "DEBUG nica-scheduler$ trying to get event count from the input file: "
1669 << ssEventNumberCommand.str() << endl;
1670
1671 array<char, 128> bufferCommand;
1672 string str_event_number = "";
1673 stream = popen(ssEventNumberCommand.str().c_str(), "r");
1674 while (fgets(bufferCommand.data(), static_cast<int>(bufferCommand.size()), stream)
1675 != nullptr)
1676 str_event_number += bufferCommand.data();
1677 pclose(stream);
1678 str_event_number = find_first_number(str_event_number);
1679 if (str_event_number == "") {
1680 info_message(TString::Format(
1681 "ERROR nica-scheduler$ parallel_mode is used, but event_count "
1682 "is not set and cannot be automatically defined. "
1683 "'%s' file will be skipped!",
1684 strRealInputFile.c_str())
1685 .Data(),
1686 1);
1687 continue;
1688 }
1689 event_count = atoi(str_event_number.c_str());
1690 if (isDebugMode)
1691 cout << "DEBUG nica-scheduler$ " << event_count
1692 << " event(s) found in the input file" << endl;
1693 }
1694
1695 int merge_proc_count = 0;
1696 // fill parameters and parallelize the current file by events (if parallel_mode)
1697 for (int j = 0; j < cur_file->iParallelMode; j++) {
1698 int event_per_proc = event_count;
1699 if (cur_file->iParallelMode > 1) {
1700 event_per_proc = (event_count + j) / proc_count;
1701 if (event_per_proc == 0) {
1702 start_event = start_event + event_per_proc;
1703 continue;
1704 }
1705 }
1706
1707 merge_proc_count++;
1708
1709 nullString nstr_output_file = nstrRealOutputFile;
1710
1711 // generate variable with input file array
1712 strBatchInitInputFile += "\"";
1713 strBatchInitInputFile += strRealInputFile;
1714 strBatchInitInputFile += "\" ";
1715
1716 // generate variable with output file array
1717 if (nstr_output_file != nullptr) {
1718 // in case of parallel_mode - for particular processing by events
1719 if (cur_file->iParallelMode > 1)
1720 nstr_output_file.SetValue(
1721 GenerateOutputFilePath(nstr_output_file.GetValue(), merge_proc_count));
1722 strBatchInitOutputFile += "\"";
1723 strBatchInitOutputFile += nstr_output_file.GetValue();
1724 strBatchInitOutputFile += "\" ";
1725 } else
1726 strBatchInitOutputFile += "\">\" ";
1727
1728 // generate variable with start event array
1729 if (cur_start_event != nullptr) {
1730 string strInt = int_to_string(start_event);
1731 strBatchInitStartEvent += "\"";
1732 strBatchInitStartEvent += strInt;
1733 strBatchInitStartEvent += "\" ";
1734 } else
1735 strBatchInitStartEvent += "\">\" ";
1736
1737 // generate variable with event number array
1738 if (cur_event_count != nullptr) {
1739 string strInt = int_to_string(event_per_proc);
1740 strBatchInitEventCount += "\"";
1741 strBatchInitEventCount += strInt;
1742 strBatchInitEventCount += "\" ";
1743 } else
1744 strBatchInitEventCount += "\">\" ";
1745
1746 // generate variable with additional_argiments array
1747 if (cur_macro->strAddArgs != "") {
1748 strBatchInitAddArgs += "\"";
1749 string mask_string = cur_macro->strAddArgs;
1750 replace_string_in_text(mask_string, "\"", "\\\"");
1751 strBatchInitAddArgs += mask_string;
1752 strBatchInitAddArgs += "\" ";
1753 } else
1754 strBatchInitAddArgs += "\">\" ";
1755
1756 bool is_clean = false; // flag if there are objects to clean
1757 // in case of using a fast storage
1758 // add commands to put files to the intermediate storage
1759 if (cur_file->strPutPath != "") {
1760 if (j == 0) {
1761 strBatchInitPut += "\"";
1762 strBatchInitPut += cur_file->strPutCommand;
1763 strBatchInitPut += " ";
1764 strBatchInitPut += cur_file->strFileIn;
1765 strBatchInitPut += " ";
1766 strBatchInitPut += cur_file->strPutPath;
1767 // create input_file_name.lock to synchronize with other tasks
1768 if (cur_file->iParallelMode > 1) {
1769 strBatchInitPut += "; touch ";
1770 strBatchInitPut += cur_file->strPutPath + (string) ".lock\" ";
1771
1772 if (is_clean == false)
1773 strBatchInitClean += "\"";
1774 else
1775 strBatchInitClean += " ";
1776 strBatchInitClean += cur_file->strPutPath + (string) ".lock\" ";
1777 is_clean = true;
1778 }
1779 strBatchInitPut += "\" ";
1780
1781 }
1782 // wait for the first task will put input file to the fast storage
1783 else
1784 {
1785 strBatchInitPut += "\"";
1786 strBatchInitPut += "wait ";
1787 strBatchInitPut += cur_file->strPutPath;
1788 strBatchInitPut += ".lock\" ";
1789 }
1790 } else
1791 strBatchInitPut += "\">\" ";
1792
1793 // generate variable to copy file from the temporary location to the output path
1794 if (cur_file->strGetPath != "") {
1795 strBatchInitGet += "\"";
1796 strBatchInitGet += cur_file->strGetCommand;
1797 strBatchInitGet += " ";
1798 if (cur_file->iParallelMode > 1) {
1799 strBatchInitGet += nstr_output_file.GetValue();
1800 strBatchInitGet += " ";
1801 strBatchInitGet += get_directory_path(cur_file->nstrFileOut.GetValue())
1802 + string("/")
1803 + get_file_name_with_ext(nstr_output_file.GetValue());
1804 } else {
1805 strBatchInitGet += cur_file->strGetPath;
1806 strBatchInitGet += " ";
1807 strBatchInitGet += cur_file->nstrFileOut.GetValue();
1808 }
1809 strBatchInitGet += "\" ";
1810 } else
1811 strBatchInitGet += "\">\" ";
1812
1813 // fill variable to clean the temporary files
1814 // from <put>, <get> (always now) and from <clean>
1815 if (cur_file->strPutPath != "") {
1816 if (is_clean == false)
1817 strBatchInitClean += "\"";
1818 else
1819 strBatchInitClean += " ";
1820 strBatchInitClean += cur_file->strPutPath;
1821
1822 is_clean = true;
1823 }
1824 if (cur_file->strGetPath != "") {
1825 if (is_clean == false)
1826 strBatchInitClean += "\"";
1827 else
1828 strBatchInitClean += " ";
1829 strBatchInitClean += cur_file->strGetPath;
1830
1831 is_clean = true;
1832 }
1833 for (const auto& str_clean_path : cur_file->vecCleanPath) {
1834 if (is_clean == false)
1835 strBatchInitClean += "\"";
1836 else
1837 strBatchInitClean += " ";
1838 strBatchInitClean += str_clean_path;
1839
1840 is_clean = true;
1841 }
1842 if (is_clean)
1843 strBatchInitClean += "\" ";
1844
1845 // form output file list (in one string) to merge if necessary
1846 if (nstr_output_file != nullptr)
1847 strOutputUnion += " " + nstr_output_file.GetValue();
1848
1849 start_event = start_event + event_per_proc;
1850 } // for (int j = 0; j < i_parallel_mode; j++)
1851
1852 if ((merge_proc_count > 1) && (cur_file->iMerge >= 0)) {
1853 // if (isDebugMode)
1854 // cout<<"DEBUG nica-scheduler$ Adding "<< strOutputUnion << " to the merge
1855 // list" <<endl;
1856 vecPartialOutputFiles.push_back(strOutputUnion);
1857 vecMergeMode.push_back(cur_file->iMerge);
1858 }
1859 } // for files of the current macro
1860 } // for macros of the current job
1861 } // if there are files to process then write processing info to the special variables
1862
1863 // GENERATE JOB: *.sh file for a batch system
1864 if (job_par->iRunMode == 2) // global container mode
1865 {
1866 strBatchFile += "export CONTAINER_PATH=";
1867 auto it = mapContainerType.find(job_par->strContainerType);
1868 if (it != mapContainerType.end())
1869 strBatchFile += it->second;
1870 else {
1871 info_message((TString::Format("ERROR nica-scheduler$ Undefined container type: '%s'",
1872 job_par->strContainerType.c_str()))
1873 .Data(),
1874 1);
1875 exit(-1);
1876 }
1877 strBatchFile += "\n\n";
1878 }
1879
1880 // if just a command line
1881 if (job_par->nstrCommandline != nullptr) {
1882 strBatchFile += job_par->nstrCommandline.GetValue();
1883 strBatchFile += "\n";
1884 }
1885 // if a job with ROOT macros
1886 else
1887 {
1888 // add all generated variables to to the bash file
1889 strBatchFile += strBatchInitInputFile;
1890 strBatchFile += ")\n\n";
1891 strBatchFile += strBatchInitOutputFile;
1892 strBatchFile += ")\n\n";
1893 strBatchFile += strBatchInitStartEvent;
1894 strBatchFile += ")\n\n";
1895 strBatchFile += strBatchInitEventCount;
1896 strBatchFile += ")\n\n";
1897 strBatchFile += strBatchInitAddArgs;
1898 strBatchFile += ")\n\n";
1899 strBatchFile += strBatchInitPut, strBatchFile += ")\n\n";
1900 strBatchFile += strBatchInitGet, strBatchFile += ")\n\n";
1901 strBatchFile += strBatchInitClean, strBatchFile += ")\n\n";
1902 strBatchFile += "\n";
1903
1904 // cycle by all macros of the job
1905 for (size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++) {
1906 structMacroPar* cur_macro = job_par->vecMacros[ind_macro].get();
1907
1908 // get line number in the job script to obtain variables corresponding the task
1909 int start_line_number = ind_macro * cur_macro->vecFiles.size();
1910 strBatchFile += "let \"array_index = \"";
1911 strBatchFile += batch_com.batch_task_id;
1912 strBatchFile += "\" - 1 + ";
1913 strBatchFile += to_string(start_line_number);
1914 strBatchFile += "\"\n\n";
1915
1916 // copy file to the temporary storage if the current element is not equal to '>'
1917 // in case of parallelizing by events - other waits when the first put the input file
1918 strBatchFile += "element=${PutArray[${array_index}]}\n"
1919 "if [ \"${element}\" != \">\" ]\nthen\n"
1920 " # Check if PutElement contains \"wait <file_path>\"\n"
1921 " if [[ \"${element}\" == *\"wait \"* ]]; then\n"
1922 " # Extract file path after \"wait \"\n"
1923 " lock_file_path=\"${1#wait }\"\n"
1924 " # Wait until file appears\n"
1925 " while [ ! -f \"$lock_file_path\" ]; do\n";
1926 if (isDebugMode)
1927 strBatchFile += " echo \"DEBUG nica-scheduler: wait while putting file\"\n";
1928 strBatchFile += " sleep 5\n"
1929 " done\n"
1930 " else\n"
1931 " set -f # avoid globbing (expansion of *)\n"
1932 " # arr=(${element// / })\n"
1933 " # comm=\"${arr[0]} ${arr[1]} ${arr[2]}\"\n"
1934 " # eval $comm\n";
1935 if (isDebugMode)
1936 strBatchFile += " echo \"DEBUG nica-scheduler: PUT command = ${element}\"\n";
1937 strBatchFile += " eval ${element}\n"
1938 " fi\n"
1939 "fi\n\n";
1940
1941 // run macro by ROOT
1942 strBatchFile += "element=${InputFileArray[${array_index}]}\n";
1943 // global run mode on a cluster
1944 if (job_par->iRunMode == 1) {
1945 strBatchFile += "export ROOT_HIST=0\n";
1946 strBatchFile += "comm=\"root -b -q '";
1947 strBatchFile += cur_macro->nstrMacroPath.GetValue();
1948 strBatchFile += "(\\\"${element}\\\"\"\n"
1949 "element=${OutputFileArray[${array_index}]}\n"
1950 "if [ \"${element}\" != \">\" ]\nthen\n"
1951 " comm=\"${comm},\\\"${element}\\\"\"\n";
1952 }
1953 // run mode^ 2 - global container/apptainer mode
1954 else
1955 {
1956 strBatchFile += "comm=\"'";
1957 strBatchFile += cur_macro->nstrMacroPath.GetValue();
1958 // using triple escaped quotes to pass it to apptainer_run.sh file
1959 strBatchFile += "(\\\\\\\"${element}\\\\\\\"\"\n"
1960 "element=${OutputFileArray[${array_index}]}\n"
1961 "if [ \"${element}\" != \">\" ]\nthen\n"
1962 " comm=\"${comm},\\\\\\\"${element}\\\\\\\"\"\n";
1963 }
1964 // add start event, event count and additional arguments to ROOT macro
1965 strBatchFile += "fi\n"
1966 "element=${StartEventArray[${array_index}]}\n"
1967 "if [ \"${element}\" != \">\" ]\nthen\n"
1968 " comm=\"${comm},${element}\"\n"
1969 "fi\n"
1970 "element=${EventCountArray[${array_index}]}\n"
1971 "if [ \"${element}\" != \">\" ]\nthen\n"
1972 " comm=\"${comm},${element}\"\n"
1973 "fi\n"
1974 "element=${AddArgsArray[${array_index}]}\n"
1975 "if [ \"${element}\" != \">\" ]\nthen\n"
1976 " comm=\"${comm},${element}\"\n"
1977 "fi\n"
1978 "comm=\"$comm)'\"\n\n";
1979 if (isDebugMode)
1980 strBatchFile += " echo \"DEBUG nica-scheduler: Run Command = $comm\"\n";
1981 if (job_par->iRunMode == 1) // global mode, not container
1982 strBatchFile += "eval $comm\n\n";
1983
1984 // copy file from the temporary location to the output path
1985 strBatchFile += "element=${GetArray[${array_index}]}\n"
1986 "if [ \"${element}\" != \">\" ]\nthen\n"
1987 " set -f # avoid globbing (expansion of *)\n"
1988 "# arr=(${element// / })\n"
1989 "# comm=\"${arr[0]} ${arr[1]} ${arr[2]}\"\n"
1990 "# eval $comm\n";
1991 if (isDebugMode)
1992 strBatchFile += " echo \"DEBUG nica-scheduler: GET command = ${element}\"\n";
1993 strBatchFile += " eval ${element}\n"
1994 "fi\n\n";
1995
1996 // clean the temporary files from <put>, <get> and from <clean>
1997 strBatchFile += "element=${CleanArray[${array_index}]}\n"
1998 "if [ \"${element}\" != \">\" ]\nthen\n"
1999 " set -f # avoid globbing (expansion of *)\n"
2000 " arr=(${element// / })\n"
2001 " comm=\"rm -rf \"\n"
2002 " for i in \"${arr[@]}\"\n"
2003 " do\n"
2004 " comm=\"${comm}${i} \"\n"
2005 " done\n";
2006 if (isDebugMode)
2007 strBatchFile += " echo \"DEBUG nica-scheduler: CLEAN command = $comm\"\n";
2008 strBatchFile += " eval $comm\n"
2009 "fi\n\n";
2010 } // for macros in the job
2011 }
2012
2013 // if global mode, echo the current datetime
2014 if (job_par->iRunMode == 1)
2015 // if (isDebugMode) strBatchFile += "echo \" Current DIR ls: `ls -alh`\"\n";
2016 strBatchFile += "echo \" End date: `date`\"\nexit 0\n";
2017
2018 // write the job a to temporary file
2019 if (job_par->iRunMode == 2) // for container/apptainer mode
2020 {
2021 strBatchFile += batch_com.container_run_command;
2022 strBatchFile += " apptainer run -B \"/eos,/cvmfs\" --fakeroot \"$CONTAINER_PATH\" ";
2023 strBatchFile += exe_dir + "apptainer_run.sh " + "\"$@\" \"$comm\" \n\n";
2024 strBatchFile += "echo \" End date: `date`\"\nexit 0\n";
2025
2026 string apptainer_run_file_path = exe_dir + (string) "apptainer_run" + (string) ".sh";
2027 FILE* pJobContFile = fopen(apptainer_run_file_path.c_str(), "wt");
2028 if (pJobContFile == nullptr) {
2029 info_message(TString::Format("ERROR nica-scheduler$ Container file was not created: %s. "
2030 "This job will be skipped!",
2031 apptainer_run_file_path.c_str())
2032 .Data(),
2033 1);
2034 cur_node = cur_node->next;
2035 continue;
2036 }
2037
2038 fwrite(strApptainerRunFile.c_str(), strApptainerRunFile.length(), sizeof(char), pJobContFile);
2039 fclose(pJobContFile);
2040 // Set executable permission
2041 chmod(apptainer_run_file_path.c_str(), S_IRWXU); // 0700
2042 }
2043
2044 // string input_file_path = exe_dir + (string)"input_" + buffer + (string)".txt";
2045 // string output_file_path = exe_dir + (string)"output" + buffer + (string)".txt";
2046
2047 // create main shell file with all the commands to be executed (default path: 'nica-scheduler'
2048 // location)
2049 stringstream ssMainScriptFile;
2050 ssMainScriptFile << "job_" << rand() << rand() << ".sh";
2051 stringstream ssMainScriptPath;
2052 ssMainScriptPath << exe_dir << ssMainScriptFile.str();
2053 FILE* pJobFile = fopen(ssMainScriptPath.str().c_str(), "wt");
2054 if (pJobFile == nullptr) {
2055 string error_path = ssMainScriptPath.str();
2056 // if not created, change to the current location
2057 ssMainScriptPath.str(""); // Clear the content
2058 ssMainScriptPath.clear(); // Clear the state flags
2059 ssMainScriptPath << ssMainScriptFile.str();
2060
2061 pJobFile = fopen(ssMainScriptPath.str().c_str(), "wt");
2062 if (pJobFile == nullptr) {
2063 error_path += (string) " || " + ssMainScriptPath.str();
2064 // if not created, change to the XML-file location
2065 ssMainScriptPath.str(""); // Clear the content
2066 ssMainScriptPath.clear(); // Clear the state flags
2067 ssMainScriptPath << get_directory_path(xml_file_path) << ssMainScriptFile.str();
2068 pJobFile = fopen(ssMainScriptPath.str().c_str(), "wt");
2069 if (pJobFile == nullptr) {
2070 error_path += (string) " || " + ssMainScriptPath.str();
2071 info_message(TString::Format("ERROR nica-scheduler$ Job file was not created at: %s. "
2072 "This job will be skipped!",
2073 error_path.c_str())
2074 .Data(),
2075 1);
2076 cur_node = cur_node->next;
2077 continue;
2078 } // if (pJobFile == nullptr)
2079 } // if (pJobFile == nullptr)
2080 } // if (pJobFile == nullptr)
2081 fwrite(strBatchFile.c_str(), strBatchFile.length(), sizeof(char), pJobFile);
2082 fclose(pJobFile);
2083
2084 // output qsub file content in debug mode
2085 // if (isDebugMode) cout<<"DEBUG nica-scheduler$ QSUB file content:"<<endl<<strBatchFile<<endl;
2086
2087 // form batch command to run our jobs
2088 stringstream ssBatchCommand;
2089 // fill batch command, job name, dependency, task count, batch slot count
2090 ssBatchCommand << TString::Format(batch_com.scheduler_run_job.c_str(), job_par->strJobName.c_str(),
2091 job_par->strDdependencyName.c_str(), globalParallelCount,
2092 proc_count)
2093 .Data();
2094 // add priority of the job if given
2095 if (iPriority != 0)
2096 ssBatchCommand << TString::Format(batch_com.sheduler_priority_option.c_str(), iPriority).Data();
2097 // add queue name if given
2098 if (job_par->nstrQueue != nullptr)
2099 ssBatchCommand << TString::Format(batch_com.sheduler_queue_option.c_str(),
2100 job_par->nstrQueue.GetValue().c_str())
2101 .Data();
2102 // add Quality-Of-Service if given
2103 if (job_par->nstrQOS != nullptr)
2104 ssBatchCommand << TString::Format(batch_com.sheduler_qos_option.c_str(),
2105 job_par->nstrQOS.GetValue().c_str())
2106 .Data();
2107 // add host list if given
2108 if (job_par->nstrHosts != nullptr) {
2109 // host list to exclude from processing
2110 if (job_par->nstrHosts.GetValue()[0] == '~')
2111 ssBatchCommand << TString::Format(batch_com.sheduler_exclude_hosts_option.c_str(),
2112 job_par->nstrHosts.GetValue().substr(1).c_str())
2113 .Data();
2114 // host list for processing
2115 else
2116 ssBatchCommand << TString::Format(batch_com.sheduler_hosts_option.c_str(),
2117 job_par->nstrHosts.GetValue().c_str())
2118 .Data();
2119 }
2120 // add working directory if given
2121 if (job_par->nstrWorkDir != nullptr)
2122 ssBatchCommand << TString::Format(batch_com.sheduler_workdir_option.c_str(),
2123 job_par->nstrWorkDir.GetValue().c_str())
2124 .Data();
2125 // add maximum operative memory size per one process
2126 if (job_par->strOperativeMemory1 != "")
2127 ssBatchCommand << TString::Format(batch_com.operative_memory1_option.c_str(),
2128 job_par->strOperativeMemory1.c_str())
2129 .Data();
2130
2131 // add script path to be executed
2132 ssBatchCommand << " " << ssMainScriptPath.str();
2133 if (isDebugMode)
2134 // main command
2135 cout << "DEBUG nica-scheduler$ Batch command: " << ssBatchCommand.str() << endl;
2136
2137 array<char, 128> bufferBatch;
2138 string ID = "";
2139 stream = popen(ssBatchCommand.str().c_str(), "r");
2140 while (fgets(bufferBatch.data(), static_cast<int>(bufferBatch.size()), stream) != nullptr)
2141 ID += bufferBatch.data();
2142 pclose(stream);
2143
2144 if (isDebugMode)
2145 // batch command result
2146 cout << "DEBUG nica-scheduler$ Batch command output = " << ID << endl;
2147
2148 ID = find_first_number(ID);
2149 if (ID == "") {
2150 info_message("ERROR nica-scheduler$ Job ID was not found: "
2151 "an error occurred or there is no batch system. This job will be skipped!",
2152 1);
2153 cur_node = cur_node->next;
2154 continue;
2155 }
2156
2157 // current time
2158 // time_t t = time(nullptr);
2159 // struct tm tm = *localtime(&t);
2160 // printf("now: %d-%d-%d %d:%d:%d\n", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
2161 // tm.tm_min, tm.tm_sec);
2162
2163 info_message(TString::Format("nica-scheduler$ The job '%s' has been submitted with ID: %s. "
2164 "Enter '%s' command to check status.",
2165 job_par->strJobName.c_str(), ID.c_str(),
2166 batch_com.check_status_command.c_str())
2167 .Data());
2168 mapJobName2ID.insert(pair<string, string>(job_par->strJobName, ID));
2169
2170 // add jobs to merge partial files if vecPartialOutputFiles is not empty, i.e. parallel_mode is set
2171 if (!vecPartialOutputFiles.empty()) {
2172 // cycle: one vector record contains output file list separated by space
2173 // where first element of the list is the result output file
2174 for (size_t i = 0; i < vecPartialOutputFiles.size(); i++) {
2175 // merge.sh - special batch job to merge files
2176 // !!! need to rewrite - create merge.sh dynamically
2177 string MERGEjob_path = exe_dir + "merge.sh";
2178 stringstream ssMergeCommand;
2179 ssMergeCommand << batch_com.run_command << " " << batch_com.job_dependency_option << ID
2180 << batch_com.set_job_variables << "\"MergeFiles"
2181 << vecPartialOutputFiles.at(i).GetValue() << "\",MergeMode=\""
2182 << vecMergeMode.at(i) << "\",ConfigFiles=\"" << job_par->strConfigPath
2183 << "\" " << MERGEjob_path;
2184
2185 array<char, 128> bufferMerge;
2186 string localID = "";
2187 stream = popen(ssMergeCommand.str().c_str(), "r");
2188 while (fgets(bufferMerge.data(), static_cast<int>(bufferMerge.size()), stream) != nullptr)
2189 localID += bufferMerge.data();
2190 pclose(stream);
2191
2192 localID = find_first_number(localID);
2193 if (localID == "") {
2194 info_message("ERROR nica-scheduler$ Union job ID was not found: an error occurred or "
2195 "there is no batch system.",
2196 1);
2197 continue;
2198 }
2199
2200 cout << "nica-scheduler$ Task for merging parallel files has ID: " << localID << endl;
2201 } // for vecPartialOutputFiles elements
2202 } // if (!vecPartialOutputFiles.empty())
2203 }
2204 /********************/
2205 /* LOCAL SCHEDULING */
2206 /********************/
2207 else
2208 {
2209 // dummy - no dependency property for local jobs because it waits every time for all child threads
2210 mapJobName2ID.insert(pair<string, string>(job_par->strJobName, "dummy"));
2211
2212 // define process count for the job
2213 int proc_count = 1;
2214 if (job_par->nstrProcCount != nullptr) {
2215 if ((job_par->nstrProcCount.GetValue()[0] == '0')
2216 && (job_par->nstrProcCount.GetValue().length()) == 1)
2217 {
2218 proc_count = get_linux_processor_count();
2219
2220 if (proc_count == 0) {
2222 "ERROR nica-scheduler$ Core number was not defined. This job will be skipped!", 1);
2223 cur_node = cur_node->next;
2224 continue;
2225 }
2226 } else
2227 proc_count = atoi(job_par->nstrProcCount.GetValue().c_str());
2228 } else
2229 proc_count = globalParallelCount == 0 ? 1 : globalParallelCount;
2230
2231 if (proc_count == 0) {
2233 "ERROR nica-scheduler$ Process count can not be equal 0. This job will be skipped!", 1);
2234 cur_node = cur_node->next;
2235 continue;
2236 }
2237
2238 // process count is equal count of input files if more
2239 if (proc_count > globalParallelCount)
2240 proc_count = globalParallelCount;
2241
2242 vector<thread> vecThreads;
2243 // semaphore and mutex to synchronize all parallel threads and sub-threads
2244 shared_ptr<Semaphore> job_semaphore = make_shared<Semaphore>(proc_count);
2245 shared_ptr<mutex> job_mutex = make_shared<mutex>(); // PTHREAD_MUTEX_RECURSIVE_NP
2246
2247 // console command - not ROOT macro execution
2248 if (job_par->nstrCommandline != nullptr) {
2249 // generate bash command string
2250 stringstream ssBashCommand;
2251 ssBashCommand << job_par->strConfigPath << job_par->nstrCommandline.GetValue();
2252 if (job_par->nstrLogs != nullptr)
2253 ssBashCommand << " > " << job_par->nstrLogs.GetValue() << " 2>&1";
2254
2255 // generate output name for temporary bash file
2256 stringstream ssBashFile;
2257 long int t = time(nullptr);
2258 ssBashFile << "temp_" << t << ".sh";
2259 // create temporary bash file and write the command into it
2260 ofstream bash_file;
2261 bash_file.open(ssBashFile.str().c_str());
2262 bash_file << ssBashCommand.str();
2263 bash_file.close();
2264
2265 cout << "nica-scheduler$ Task 1 is running: command line - "
2266 << job_par->nstrCommandline.GetValue() << endl;
2267
2268 // run bash command
2269 stringstream ssBashRun;
2270 ssBashRun << "bash " << ssBashFile.str();
2271 int system_return = system(ssBashRun.str().c_str());
2272 if (system_return != 0)
2273 cout << "nica-scheduler$ WARNING: System call (in main) returned non-zero code: "
2274 << system_return << endl;
2275
2276 // delete temporary bash file
2277 if (!isDebugMode) {
2278 stringstream ssRemoveCommand;
2279 ssRemoveCommand << "rm -rf " << ssBashFile.str();
2280 system_return = system(ssRemoveCommand.str().c_str());
2281 if (system_return != 0)
2282 cout << "nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned "
2283 "non-zero code: "
2284 << system_return << endl;
2285 } else
2286 cout << "DEBUG nica-scheduler$ '" << ssBashFile.str() << "' bash file was used " << endl;
2287 }
2288 // parallelizing of file processing in ROOT macro on one multi-core machine
2289 else
2290 {
2291 unsigned int thread_count = 1;
2292 if (job_par->vecMacros[0]->vecFiles.size() > 1)
2293 thread_count = job_par->vecMacros[0]->vecFiles.size();
2294
2295 // fill parameters for processing threads
2296 for (unsigned int i = 0; i < thread_count; i++) {
2297 shared_ptr<structThreadPar> thread_par = make_shared<structThreadPar>();
2298 thread_par->sptrJobParameter = job_par;
2299 thread_par->semJobSemaphore = job_semaphore;
2300 thread_par->mutJobMutex = job_mutex;
2301 thread_par->iThreadCounter = i + 1;
2302
2303 job_semaphore->acquire();
2304
2305 // create and run threads to local processing - function ThreadLocalProcess
2306 try {
2307 vecThreads.emplace_back(ThreadLocalProcess, thread_par);
2308 } catch (const system_error& e) {
2309 info_message((TString::Format("ERROR nica-scheduler$ error while creating thread with "
2310 "code = %d and message = %s",
2311 e.code().value(), e.what()))
2312 .Data(),
2313 1);
2314 exit(-1);
2315 }
2316 }
2317
2318 // waiting for finishing threads
2319 // cout<<"Waiting for finishing threads "<<vecFiles.size()<<endl;
2320 for (auto& cur_thread : vecThreads) {
2321 try {
2322 cur_thread.join();
2323 } catch (const system_error& e) {
2324 info_message((TString::Format(
2325 "ERROR nica-scheduler$ Thread failed with code = %d and message = %s",
2326 e.code().value(), e.what()))
2327 .Data(),
2328 1);
2329 }
2330 }
2331 } // else job_par->nstrCommandline != nullptr
2332
2333 cout << "nica-scheduler$ Local job has been finished" << endl;
2334 } // else local scheduling
2335 } // JOB tag was processed
2336 } // if tag means JOB
2337
2338 cur_node = cur_node->next;
2339 } // while - cycle for all jobs
2340
2341 mapJobName2ID.clear();
2342 mapJobName2FileOut.clear();
2343 xmlFreeDoc(doc);
2344 return 0;
2345}
2346
2347// PARSE DATABASE PARAMETERS
2348void ParseDatabaseParameters(string input, TString& sql, bool isSimulation)
2349{
2350 istringstream ss(input);
2351 string token;
2352
2353 if (isSimulation) {
2354 // variables for DB
2355 bool isGen = false, isEnergy = false, isMinEnergy = false, isMaxEnergy = false, isBeam = false,
2356 isTarget = false, isDesc = false, isPath = false;
2357 string strGen, strBeam, strTarget, strDesc, strPath;
2358 double fEnergy, fMaxEnergy;
2359 // parse tokens by comma separated
2360 while (getline(ss, token, ',')) {
2361 // to lowercase
2362 transform(token.begin(), token.end(), token.begin(), ::tolower);
2363
2364 // generator name parsing
2365 if ((token.length() > 4) && (token.substr(0, 4) == "gen=")) {
2366 isGen = true;
2367 strGen = token.substr(4);
2368 } else {
2369 // energy parsing
2370 if ((token.length() > 7) && (token.substr(0, 7) == "energy=")) {
2371 token = token.substr(7);
2372
2373 size_t indDash = token.find_first_of('-');
2374 if (indDash != string::npos) {
2375 stringstream stream;
2376 stream << token.substr(0, indDash);
2377 double dVal;
2378 if (stream >> dVal) {
2379 isEnergy = true;
2380 isMinEnergy = true;
2381 fEnergy = dVal;
2382 }
2383 if (token.length() > indDash) {
2384 stringstream stream2;
2385 stream2 << token.substr(indDash + 1);
2386 if (stream2 >> dVal) {
2387 isEnergy = true;
2388 isMaxEnergy = true;
2389 fMaxEnergy = dVal;
2390 }
2391 }
2392 } // if (indDash > -1)
2393 // if exact energy value
2394 else
2395 {
2396 stringstream stream;
2397 stream << token;
2398 double dVal;
2399 if (stream >> dVal) {
2400 isEnergy = true;
2401 fEnergy = dVal;
2402 }
2403 } // else
2404 } // if ((token.length() > 7) && (token.substr(0,7) == "energy="))
2405 // particles' names in collision parsing
2406 else
2407 {
2408 if ((token.length() > 5) && (token.substr(0, 5) == "beam=")) {
2409 isBeam = true;
2410 strBeam = token.substr(5);
2411 } else {
2412 // search text in description string
2413 if ((token.length() > 5) && (token.substr(0, 5) == "desc=")) {
2414 isDesc = true;
2415 strDesc = token.substr(5);
2416 } else {
2417 // type of data parsing
2418 if ((token.length() > 7) && (token.substr(0, 7) == "target=")) {
2419 isTarget = true;
2420 strTarget = token.substr(7);
2421 } else {
2422 // path parsing
2423 if ((token.length() > 5) && (token.substr(0, 5) == "path=")) {
2424 isPath = true;
2425 strPath = token.substr(5);
2426 }
2427 } // else TYPE of data
2428 } // else DESC
2429 } // else PARTICLE
2430 } // else ENERGY
2431 } // else GEN
2432 } // while(getline(ss, token, ','))
2433
2434 // READ PATH FROM DATABASE
2435 sql = "select file_path "
2436 "from simulation_file";
2437
2438 bool isWhere = false;
2439 // if event generator selection
2440 if (isGen == true) {
2441 if (isWhere)
2442 sql += TString::Format(" AND lower(generator_name) = '%s'", strGen.data());
2443 else {
2444 isWhere = true;
2445 sql += TString::Format(" "
2446 "where lower(generator_name) = '%s'",
2447 strGen.data());
2448 }
2449 }
2450 // if energy selection
2451 if (isEnergy == true) {
2452 if (isWhere)
2453 sql += " AND ";
2454 else {
2455 isWhere = true;
2456 sql += " "
2457 "where ";
2458 }
2459
2460 if (isMinEnergy) {
2461 sql += TString::Format("energy >= %f", fEnergy);
2462 if (isMaxEnergy)
2463 sql += TString::Format(" AND energy <= %f", fMaxEnergy);
2464 } else {
2465 if (isMaxEnergy)
2466 sql += TString::Format("energy <= %f", fMaxEnergy);
2467 else
2468 sql += TString::Format("energy = %f", fEnergy);
2469 }
2470 }
2471 // if beam particle was selected
2472 if (isBeam == true) {
2473 if (isWhere)
2474 sql += TString::Format(" AND lower(beam_particle) = '%s'", strBeam.data());
2475 else {
2476 isWhere = true;
2477 sql += TString::Format(" "
2478 "where lower(beam_particle) = '%s'",
2479 strBeam.data());
2480 }
2481 }
2482 // if beam particle was selected
2483 if (isTarget == true) {
2484 if (isWhere)
2485 sql += TString::Format(" AND lower(target_particle) = '%s'", strTarget.data());
2486 else {
2487 isWhere = true;
2488 sql += TString::Format(" "
2489 "where lower(target_particle) = '%s'",
2490 strTarget.data());
2491 }
2492 }
2493 if (isDesc == true) {
2494 if (isWhere)
2495 sql += TString::Format(" AND lower(file_desc) like '%%%s%%'", strDesc.data());
2496 else {
2497 isWhere = true;
2498 sql += TString::Format(" "
2499 "where lower(file_desc) like '%%%s%%'",
2500 strDesc.data());
2501 }
2502 }
2503 if (isPath == true) {
2504 if (isWhere)
2505 sql += TString::Format(" AND lower(file_path) like '%%%s%%'", strPath.data());
2506 else {
2507 isWhere = true;
2508 sql += TString::Format(" "
2509 "where lower(file_path) like '%%%s%%'",
2510 strPath.data());
2511 }
2512 }
2513 sql += " order by generator_name,energy";
2514 }
2515 // else if experimental data
2516 else
2517 {
2518 bool isPeriod = false, isMinPeriod = false, isMaxPeriod = false, isRun = false, isMinRun = false,
2519 isMaxRun = false, isBeam = false, isTarget = false, isEnergy = false, isMinEnergy = false,
2520 isMaxEnergy = false, isEvents = false, isMinEvents = false, isMaxEvents = false, isTime = false,
2521 isMinTime = false, isMaxTime = false, isField = false, isMinField = false, isMaxField = false,
2522 isSize = false, isMinSize = false, isMaxSize = false, isPath = false;
2523 string strBeam, strTarget, strTime, strMaxTime, strPath;
2524 int iPeriod, iMaxPeriod, iRun, iMaxRun, iEvents, iMaxEvents, iField, iMaxField;
2525 double fEnergy, fMaxEnergy, fSize, fMaxSize;
2526 // parse tokens by comma separated
2527 while (getline(ss, token, ',')) {
2528 // to lowercase
2529 transform(token.begin(), token.end(), token.begin(), ::tolower);
2530
2531 // period number parsing
2532 if ((token.length() > 7) && (token.substr(0, 7) == "period=")) {
2533 token = token.substr(7);
2534
2535 size_t indDash = token.find_first_of('-');
2536 if (indDash != string::npos) {
2537 stringstream stream;
2538 stream << token.substr(0, indDash);
2539 int iVal;
2540 if (stream >> iVal) {
2541 isPeriod = true;
2542 isMinPeriod = true;
2543 iPeriod = iVal;
2544 }
2545 if (token.length() > indDash) {
2546 stringstream stream2;
2547 stream2 << token.substr(indDash + 1);
2548 if (stream2 >> iVal) {
2549 isPeriod = true;
2550 isMaxPeriod = true;
2551 iMaxPeriod = iVal;
2552 }
2553 }
2554 } // if (indDash > -1)
2555 // if exact period number
2556 else
2557 {
2558 stringstream stream;
2559 stream << token;
2560 int iVal;
2561 if (stream >> iVal) {
2562 isPeriod = true;
2563 iPeriod = iVal;
2564 }
2565 } // else
2566 } // if ((token.length() > 7) && (token.substr(0,7) == "period="))
2567 else
2568 {
2569 // run number parsing
2570 if ((token.length() > 4) && (token.substr(0, 4) == "run=")) {
2571 token = token.substr(4);
2572
2573 size_t indDash = token.find_first_of('-');
2574 if (indDash != string::npos) {
2575 stringstream stream;
2576 stream << token.substr(0, indDash);
2577 int iVal;
2578 if (stream >> iVal) {
2579 isRun = true;
2580 isMinRun = true;
2581 iRun = iVal;
2582 }
2583 if (token.length() > indDash) {
2584 stringstream stream2;
2585 stream2 << token.substr(indDash + 1);
2586 if (stream2 >> iVal) {
2587 isRun = true;
2588 isMaxRun = true;
2589 iMaxRun = iVal;
2590 }
2591 }
2592 } // if (indDash > -1)
2593 // if exact run number
2594 else
2595 {
2596 stringstream stream;
2597 stream << token;
2598 int iVal;
2599 if (stream >> iVal) {
2600 isRun = true;
2601 iRun = iVal;
2602 }
2603 } // else
2604 } // if ((token.length() > 4) && (token.substr(0,4) == "run="))
2605 else
2606 {
2607 // beam particle's name in collision parsing
2608 if ((token.length() > 5) && (token.substr(0, 5) == "beam=")) {
2609 isBeam = true;
2610 strBeam = token.substr(5);
2611 } else {
2612 // target particle's name in collision parsing
2613 if ((token.length() > 7) && (token.substr(0, 7) == "target=")) {
2614 isTarget = true;
2615 strTarget = token.substr(7);
2616 } else {
2617 // energy parsing
2618 if ((token.length() > 7) && (token.substr(0, 7) == "energy=")) {
2619 token = token.substr(7);
2620
2621 size_t indDash = token.find_first_of('-');
2622 if (indDash != string::npos) {
2623 stringstream stream;
2624 stream << token.substr(0, indDash);
2625 double dVal;
2626 if (stream >> dVal) {
2627 isEnergy = true;
2628 isMinEnergy = true;
2629 fEnergy = dVal;
2630 }
2631 if (token.length() > indDash) {
2632 stringstream stream2;
2633 stream2 << token.substr(indDash + 1);
2634 if (stream2 >> dVal) {
2635 isEnergy = true;
2636 isMaxEnergy = true;
2637 fMaxEnergy = dVal;
2638 }
2639 }
2640 } // if (indDash > -1)
2641 // if exact energy value
2642 else
2643 {
2644 stringstream stream;
2645 stream << token;
2646 double dVal;
2647 if (stream >> dVal) {
2648 isEnergy = true;
2649 fEnergy = dVal;
2650 }
2651 } // else
2652 } // if ((token.length() > 7) && (token.substr(0,7) == "energy="))
2653 else
2654 {
2655 // event count parsing
2656 if ((token.length() > 6) && (token.substr(0, 6) == "events=")) {
2657 token = token.substr(6);
2658
2659 size_t indDash = token.find_first_of('-');
2660 if (indDash != string::npos) {
2661 stringstream stream;
2662 stream << token.substr(0, indDash);
2663 int iVal;
2664 if (stream >> iVal) {
2665 isEvents = true;
2666 isMinEvents = true;
2667 iEvents = iVal;
2668 }
2669 if (token.length() > indDash) {
2670 stringstream stream2;
2671 stream2 << token.substr(indDash + 1);
2672 if (stream2 >> iVal) {
2673 isEvents = true;
2674 isMaxEvents = true;
2675 iMaxEvents = iVal;
2676 }
2677 }
2678 } // if (indDash > -1)
2679 // if exact event count
2680 else
2681 {
2682 stringstream stream;
2683 stream << token;
2684 int iVal;
2685 if (stream >> iVal) {
2686 isEvents = true;
2687 iEvents = iVal;
2688 }
2689 } // else
2690 } // if ((token.length() > 6) && (token.substr(0,6) == "events="))
2691 else
2692 {
2693 // time parsing
2694 if ((token.length() > 5) && (token.substr(0, 5) == "time=")) {
2695 token = token.substr(5);
2696
2697 size_t indDash = token.find_first_of('-');
2698 if (indDash != string::npos) {
2699 if (indDash > 0) {
2700 isTime = true;
2701 isMinTime = true;
2702 strTime = token.substr(0, indDash);
2703 }
2704 if (token.length() > indDash) {
2705 isTime = true;
2706 isMaxTime = true;
2707 strMaxTime = token.substr(indDash + 1);
2708 }
2709 } // if (indDash > -1)
2710 // if exact time
2711 else
2712 {
2713 isTime = true;
2714 strTime = token;
2715 } // else
2716 } // if ((token.length() > 5) && (token.substr(0,5) == "time="))
2717 else
2718 {
2719 // field current parsing
2720 if ((token.length() > 6) && (token.substr(0, 6) == "field=")) {
2721 token = token.substr(6);
2722
2723 size_t indDash = token.find_first_of('-');
2724 if (indDash != string::npos) {
2725 stringstream stream;
2726 stream << token.substr(0, indDash);
2727 int iVal;
2728 if (stream >> iVal) {
2729 isField = true;
2730 isMinField = true;
2731 iField = iVal;
2732 }
2733 if (token.length() > indDash) {
2734 stringstream stream2;
2735 stream2 << token.substr(indDash + 1);
2736 if (stream2 >> iVal) {
2737 isField = true;
2738 isMaxField = true;
2739 iMaxField = iVal;
2740 }
2741 }
2742 } // if (indDash > -1)
2743 // if exact field current
2744 else
2745 {
2746 stringstream stream;
2747 stream << token;
2748 int iVal;
2749 if (stream >> iVal) {
2750 isField = true;
2751 iField = iVal;
2752 }
2753 } // else
2754 } // if ((token.length() > 6) && (token.substr(0,6) == "field="))
2755 else
2756 {
2757 // file size parsing
2758 if ((token.length() > 5) && (token.substr(0, 5) == "size=")) {
2759 token = token.substr(5);
2760
2761 size_t indDash = token.find_first_of('-');
2762 if (indDash != string::npos) {
2763 stringstream stream;
2764 stream << token.substr(0, indDash);
2765 double dVal;
2766 if (stream >> dVal) {
2767 isSize = true;
2768 isMinSize = true;
2769 fSize = dVal;
2770 }
2771 if (token.length() > indDash) {
2772 stringstream stream2;
2773 stream2 << token.substr(indDash + 1);
2774 if (stream2 >> dVal) {
2775 isSize = true;
2776 isMaxSize = true;
2777 fMaxSize = dVal;
2778 }
2779 }
2780 } // if (indDash > -1)
2781 // if exact file size
2782 else
2783 {
2784 stringstream stream;
2785 stream << token;
2786 double dVal;
2787 if (stream >> dVal) {
2788 isSize = true;
2789 fSize = dVal;
2790 }
2791 } // else
2792 } // if ((token.length() > 5) && (token.substr(0,5) == "size="))
2793 else
2794 {
2795 // path parsing
2796 if ((token.length() > 5) && (token.substr(0, 5) == "path=")) {
2797 isPath = true;
2798 strPath = token.substr(5);
2799 }
2800 } // else SIZE
2801 } // else FIELD
2802 } // else TIME
2803 } // else EVENTS
2804 } // else ENERGY
2805 } // else TARGER
2806 } // else BEAM
2807 } // else RUN
2808 } // else PERIOD
2809 } // while(getline(ss, token, ','))
2810
2811 // generate query
2812 sql = "select file_path "
2813 "from run_";
2814
2815 bool isWhere = false;
2816 // if period selection
2817 if (isPeriod == true) {
2818 if (isWhere)
2819 sql += " AND ";
2820 else {
2821 isWhere = true;
2822 sql += " "
2823 "where ";
2824 }
2825
2826 if (isMinPeriod) {
2827 sql += TString::Format("period_number >= %d", iPeriod);
2828 if (isMaxPeriod)
2829 sql += TString::Format(" AND period_number <= %d", iMaxPeriod);
2830 } else {
2831 if (isMaxPeriod)
2832 sql += TString::Format("period_number <= %d", iMaxPeriod);
2833 else
2834 sql += TString::Format("period_number = %d", iPeriod);
2835 }
2836 }
2837 // if run selection
2838 if (isRun == true) {
2839 if (isWhere)
2840 sql += " AND ";
2841 else {
2842 isWhere = true;
2843 sql += " "
2844 "where ";
2845 }
2846
2847 if (isMinRun) {
2848 sql += TString::Format("run_number >= %d", iRun);
2849 if (isMaxRun)
2850 sql += TString::Format(" AND run_number <= %d", iMaxRun);
2851 } else {
2852 if (isMaxRun)
2853 sql += TString::Format("run_number <= %d", iMaxRun);
2854 else
2855 sql += TString::Format("run_number = %d", iRun);
2856 }
2857 }
2858 // if beam particle was selected
2859 if (isBeam == true) {
2860 if (isWhere)
2861 sql += TString::Format(" AND lower(beam_particle) = '%s'", strBeam.data());
2862 else {
2863 isWhere = true;
2864 sql += TString::Format(" "
2865 "where lower(beam_particle) = '%s'",
2866 strBeam.data());
2867 }
2868 }
2869 // if target particle was selected
2870 if (isTarget == true) {
2871 if (isWhere)
2872 sql += TString::Format(" AND lower(target_particle) = '%s'", strTarget.data());
2873 else {
2874 isWhere = true;
2875 sql += TString::Format(" "
2876 "where lower(target_particle) = '%s'",
2877 strTarget.data());
2878 }
2879 }
2880 // if energy selection
2881 if (isEnergy == true) {
2882 if (isWhere)
2883 sql += " AND ";
2884 else {
2885 isWhere = true;
2886 sql += " "
2887 "where ";
2888 }
2889
2890 if (isMinEnergy) {
2891 sql += TString::Format("energy >= %f", fEnergy);
2892 if (isMaxEnergy)
2893 sql += TString::Format(" AND energy <= %f", fMaxEnergy);
2894 } else {
2895 if (isMaxEnergy)
2896 sql += TString::Format("energy <= %f", fMaxEnergy);
2897 else
2898 sql += TString::Format("energy = %f", fEnergy);
2899 }
2900 }
2901 // if event count selection
2902 if (isEvents == true) {
2903 if (isWhere)
2904 sql += " AND ";
2905 else {
2906 isWhere = true;
2907 sql += " "
2908 "where ";
2909 }
2910
2911 if (isMinEvents) {
2912 sql += TString::Format("event_count >= %d", iEvents);
2913 if (isMaxEvents)
2914 sql += TString::Format(" AND event_count <= %d", iMaxEvents);
2915 } else {
2916 if (isMaxEvents)
2917 sql += TString::Format("event_count <= %d", iMaxEvents);
2918 else
2919 sql += TString::Format("event_count = %d", iEvents);
2920 }
2921 }
2922 // if time selection
2923 if (isTime == true) {
2924 if (isWhere)
2925 sql += " AND ";
2926 else {
2927 isWhere = true;
2928 sql += " "
2929 "where ";
2930 }
2931
2932 // check correct format
2933 struct tm tm;
2934 if (!strptime(strMaxTime.c_str(), "%Y-%m-%d %H:%M:%S", &tm)) {
2935 isMaxTime = false;
2936 cout << "Error: " << strMaxTime
2937 << " is not correct datetime. DateTime format should be yyyy-mm-dd 24hh:mm:ss." << endl;
2938 }
2939 if (!strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S", &tm)) {
2940 isMinTime = false;
2941 cout << "Error: " << strTime
2942 << " is not correct datetime. DateTime format should be yyyy-mm-dd 24hh:mm:ss." << endl;
2943 if (!isMaxTime)
2944 isTime = false;
2945 }
2946
2947 if (isMinTime) {
2948 sql += TString::Format("end_datetime >= '%s'", strTime.c_str());
2949 if (isMaxTime)
2950 sql += TString::Format(" AND start_datetime <= '%s'", strMaxTime.c_str());
2951 } else {
2952 if (isMaxTime)
2953 sql += TString::Format("start_datetime <= '%s'", strMaxTime.c_str());
2954 else {
2955 if (isTime)
2956 sql += TString::Format("start_datetime <= '%s' AND end_datetime >= '%s'", strTime.c_str(),
2957 strTime.c_str());
2958 }
2959 }
2960 }
2961 // if field current selection
2962 if (isField == true) {
2963 if (isWhere)
2964 sql += " AND ";
2965 else {
2966 isWhere = true;
2967 sql += " "
2968 "where ";
2969 }
2970
2971 if (isMinField) {
2972 sql += TString::Format("field_voltage >= %d", iField);
2973 if (isMaxField)
2974 sql += TString::Format(" AND field_voltage <= %d", iMaxField);
2975 } else {
2976 if (isMaxField)
2977 sql += TString::Format("field_voltage <= %d", iMaxField);
2978 else
2979 sql += TString::Format("field_voltage = %d", iField);
2980 }
2981 }
2982 // if file size selection
2983 if (isSize == true) {
2984 if (isWhere)
2985 sql += " AND ";
2986 else {
2987 isWhere = true;
2988 sql += " "
2989 "where ";
2990 }
2991
2992 if (isMinSize) {
2993 sql += TString::Format("file_size >= %f", fSize);
2994 if (isMaxSize)
2995 sql += TString::Format(" AND file_size <= %f", fMaxSize);
2996 } else {
2997 if (isMaxSize)
2998 sql += TString::Format("file_size <= %f", fMaxSize);
2999 else
3000 sql += TString::Format("file_size = %f", fSize);
3001 }
3002 }
3003 if (isPath == true) {
3004 if (isWhere)
3005 sql += TString::Format(" AND lower(file_path) like '%%%s%%'", strPath.data());
3006 else {
3007 isWhere = true;
3008 sql += TString::Format(" "
3009 "where lower(file_path) like '%%%s%%'",
3010 strPath.data());
3011 }
3012 }
3013
3014 sql += " order by period_number,run_number";
3015 }
3016
3017 return;
3018}
3019
3020// function to parse file list and parameters for the current macro of the job
3021bool ParseMacroFiles(xmlNodePtr sub_node,
3022 shared_ptr<structJobPar> job_par,
3023 shared_ptr<structMacroPar> macro_par,
3024 multimap<string, unique_ptr<vector<nullString>>>& mapJobName2FileOut,
3025 batch_commands batch_com)
3026{
3027 char* pc_temp = nullptr;
3028 while (sub_node) {
3029 // FILE TAG
3030 if (strcmp((char*)sub_node->name, "file") == 0) {
3031 bool isStartEvent = false, isEventCount = false;
3032 int start_event, event_count;
3033 string sim_db = "", exp_db = "", job_input = "", macro_input = "", file_input = "", put_command = "cp",
3034 put_path = "", get_command = "cp", get_path = "", get_output = "", clean_path = "",
3035 parallel_mode = "";
3036 nullString nstr_file_in, nstr_file_out;
3037
3038 // read Input File Path(s) to process by the macro in the XML description
3039 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"input");
3040 if (pc_temp != nullptr) {
3041 nstr_file_in.SetValue((string)pc_temp);
3042 free(pc_temp);
3043 }
3044 // read Output File Path(s) to save processing results by the macro
3045 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"output");
3046 if (pc_temp != nullptr) {
3047 nstr_file_out.SetValue((string)pc_temp);
3048 free(pc_temp);
3049 }
3050
3051 // read Start Event value in the XML description
3052 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"start_event");
3053 if (pc_temp != nullptr) {
3054 start_event = atoi(pc_temp);
3055 free(pc_temp);
3056 isStartEvent = true;
3057 }
3058
3059 // read Event Count value in the XML description
3060 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"event_count");
3061 if (pc_temp != nullptr) {
3062 if (!is_string_number(pc_temp)) {
3063 info_message("ERROR nica-scheduler$ Event count must be a number!", 1);
3064 free(pc_temp);
3065 return true;
3066 }
3067 event_count = atoi(pc_temp);
3068 free(pc_temp);
3069 if (event_count < 0) {
3070 info_message("ERROR nica-scheduler$ Event count must be a positive number or 0 (for all events)!",
3071 1);
3072 return true;
3073 }
3074 isEventCount = true;
3075 }
3076
3077 // read Merge Flag: whether merge files containing partial results
3078 // possible values: false = -1 (default), merge to the origin file name and delete partial files = 1
3079 // chain = 0 - store an additional TChain file containing links to all the partial files,
3080 // preserve = 2 - as' true' value, but not delete partial result files
3081 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"merge");
3082 int iMerge = -1; // false - not to merge
3083 if (pc_temp != nullptr) {
3084 char* lower_merge = convert_pchar_to_lowercase_new(pc_temp);
3085 if (strcmp(lower_merge, "true") == 0)
3086 iMerge = 1;
3087 if (strcmp(lower_merge, "chain") == 0)
3088 iMerge = 0;
3089 if (strcmp(lower_merge, "preserve") == 0)
3090 iMerge = 2;
3091
3092 free(pc_temp);
3093 delete lower_merge;
3094 }
3095 // read Parallel Mode value: whether parallelize processing of the file by events
3096 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"parallel_mode");
3097 if (pc_temp != nullptr) {
3098 parallel_mode = pc_temp;
3099 free(pc_temp);
3100 }
3101
3102 // form Input File List from the Simulation Catalogue of the UniConDa database
3103 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"sim_input");
3104 if (pc_temp != nullptr) {
3105 sim_db = pc_temp;
3106 free(pc_temp);
3107 }
3108 // form Input File List from the Experimental File Catalogue of the UniConDa database
3109 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"exp_input");
3110 if (pc_temp != nullptr) {
3111 exp_db = pc_temp;
3112 free(pc_temp);
3113 }
3114 // get Input File List from a previously executed job with the given name
3115 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"job_input");
3116 if (pc_temp != nullptr) {
3117 job_input = pc_temp;
3118 free(pc_temp);
3119 }
3120 // get Input File List from a previously executed macro with the given name
3121 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"macro_input");
3122 if (pc_temp != nullptr) {
3123 macro_input = pc_temp;
3124 free(pc_temp);
3125 }
3126 // form Input File List from the text file containing input file paths by lines
3127 pc_temp = (char*)xmlGetProp(sub_node, (unsigned char*)"file_input");
3128 if (pc_temp != nullptr) {
3129 file_input = pc_temp;
3130 free(pc_temp);
3131 }
3132
3133 // parse information containing preparation of the files to be processed
3134 // operations: '<put>', '<get>', '<clean>'
3135 xmlNodePtr sub_file_node = sub_node->children;
3136 while (sub_file_node) {
3137 // if TAG is XML_ELEMENT_NODE then continue
3138 if (sub_file_node->type == XML_ELEMENT_NODE) {
3139 // if TAG means <put>: put the files to the special place before the processing
3140 if (strcmp((char*)sub_file_node->name, "put") == 0) {
3141 // read Copy Command to put the files on the intermediate storage
3142 pc_temp = (char*)xmlGetProp(sub_file_node, (unsigned char*)"command");
3143 if (pc_temp != nullptr) {
3144 put_command = pc_temp;
3145 free(pc_temp);
3146 }
3147 // read Directory Path, to which put the files on the intermediate storage
3148 pc_temp = (char*)xmlGetProp(sub_file_node, (unsigned char*)"directory");
3149 if (pc_temp == nullptr) {
3150 info_message("ERROR nica-scheduler$ There is no 'directory' property for the <put/> tag!",
3151 1);
3152 sub_file_node = sub_file_node->next;
3153 continue;
3154 }
3155 put_path = pc_temp;
3156 free(pc_temp);
3157 } // if <put/> attribute
3158
3159 // if tag means <get>: get the files from the temporary place after the processing
3160 if (strcmp((char*)sub_file_node->name, "get") == 0) {
3161 // read Copy Command to get result files from the intermediate storage
3162 pc_temp = (char*)xmlGetProp(sub_file_node, (unsigned char*)"command");
3163 if (pc_temp != nullptr) {
3164 get_command = pc_temp;
3165 free(pc_temp);
3166 }
3167 // read Directory Path, to which save the intermediate results and from where copy to the
3168 // destination storage
3169 pc_temp = (char*)xmlGetProp(sub_file_node, (unsigned char*)"directory");
3170 if (pc_temp == nullptr) {
3171 info_message("ERROR nica-scheduler$ There is no 'directory' property for the <get/> tag!",
3172 1);
3173 sub_file_node = sub_file_node->next;
3174 continue;
3175 }
3176 get_path = pc_temp;
3177 free(pc_temp);
3178 } // if <get/> attribute
3179
3180 // if tag means <clean>: clean temporary files after the processing
3181 if (strcmp((char*)sub_file_node->name, "clean") == 0) {
3182 pc_temp = (char*)xmlGetProp(sub_file_node, (unsigned char*)"path");
3183 if (pc_temp != nullptr) {
3184 clean_path = pc_temp;
3185 free(pc_temp);
3186 }
3187 } // if <clean> attribute
3188 } // if sub_file_node->type == XML_ELEMENT_NODE
3189
3190 sub_file_node = sub_file_node->next;
3191 } // if file should be pre/post copied
3192
3193 vector<string> vecInputFiles;
3194 // PATH TO <INPUT> FILE (one or with a regular expression)
3195 if (nstr_file_in != nullptr) {
3196 string str_file_in = nstr_file_in.GetValue();
3197 vecInputFiles.push_back(str_file_in);
3198
3199 // check whether file contains wildcards (?,*)
3200 if ((str_file_in.find('?') != std::string::npos) || (str_file_in.find('*') != std::string::npos)
3201 || (str_file_in.find('+') != std::string::npos))
3202 {
3203 vecInputFiles = glob(str_file_in.c_str());
3204 if (isDebugMode)
3205 cout << "DEBUG nica-scheduler$ File count in the regular expression: " << vecInputFiles.size()
3206 << endl;
3207 }
3208 } // if there is <input> file
3209
3210 // LIST OF INPUT FILES FROM THE UNIFIED CONDITION DATABASE
3211 if ((sim_db != "") || (exp_db != "")) {
3212 TString sql, strConnection = "pgsql://" + (TString)job_par->dbSettings.db_host + "/"
3213 + (TString)job_par->dbSettings.db_name;
3214 if (sim_db == "")
3215 ParseDatabaseParameters(exp_db, sql, false);
3216 else
3217 ParseDatabaseParameters(sim_db, sql, true);
3218 TSQLServer* pSQLServer = TSQLServer::Connect(strConnection, job_par->dbSettings.db_username.c_str(),
3219 job_par->dbSettings.db_password.c_str());
3220 if (pSQLServer == 0x00) {
3221 info_message("ERROR nica-scheduler$ Connection to the database was not established!", 1);
3222 return true;
3223 }
3224
3225 if (isDebugMode)
3226 cout << "DEBUG nica-scheduler$ SQL query: " << sql << endl;
3227
3228 TSQLResult* res = pSQLServer->Query(sql);
3229 if (res == 0x00) {
3230 info_message("ERROR nica-scheduler$ An error was occurred during the SQL query!", 1);
3231 return true;
3232 }
3233 int nrows = res->GetRowCount();
3234 if (nrows == 0)
3235 info_message("nica-scheduler$ WARNING: there are no records for these parameters", 1);
3236 else {
3237 TSQLRow* row;
3238 while ((row = res->Next()) != nullptr) {
3239 string cur_file_path((char*)row->GetField(0));
3240#ifdef MICC_NO_MANAGEMENT
3241 if ((simDB != "") && (batch_com.run_command == "sbatch"))
3242 replace_string_in_text(cur_file_path, "eos", "eos/eos.jinr.ru");
3243#endif
3244 vecInputFiles.push_back(cur_file_path);
3245 delete row;
3246 } // while (row = res->Next())
3247 } // else nrows > 0
3248 delete res;
3249 if (pSQLServer)
3250 delete pSQLServer;
3251 } // if ((sim_db != nullptr) || (exp_db != nullptr))
3252
3253 // LIST OF INPUT FILES FROM A PREVIOUS JOB WITH THE GIVEN NAME
3254 if (job_input != "") {
3255 // search for output list in the map
3256 auto it = mapJobName2FileOut.find(job_input);
3257 if (it == mapJobName2FileOut.end())
3258 cout << "nica-scheduler$ WARNING: job (output list) was not found: " << job_input << endl;
3259 else {
3260 vector<nullString>* vec_string = it->second.get();
3261 for (vector<nullString>::iterator iter_output = vec_string->begin();
3262 iter_output != vec_string->end(); iter_output++)
3263 vecInputFiles.push_back((*iter_output).GetValue());
3264 }
3265 } // if (job_input != nullptr)
3266
3267 // LIST OF INPUT FILES FROM A PREVIOUS MACRO WITH THE GIVEN NAME
3268 if (macro_input != "") {
3269 // search for the macro with a given name
3270 bool is_found = false;
3271 for (const auto& iter : job_par->vecMacros) {
3272 structMacroPar* prev_macro = iter.get();
3273 if (strcmp(prev_macro->nstrMacroName.GetValue().c_str(), macro_input.c_str()) == 0) {
3274 for (const auto& iter_output : prev_macro->vecFiles) {
3275 if (iter_output.get()->nstrFileOut.isNull())
3276 continue;
3277 vecInputFiles.push_back(iter_output.get()->nstrFileOut.GetValue());
3278 } // for
3279
3280 is_found = true;
3281 break;
3282 } // if (strcmp(prev_macro->macro_name, macro_input) == 0)
3283 } // search for the macro with a given name
3284
3285 if (!is_found)
3286 cout << "nica-scheduler$ WARNING: macro (output list) was not found: " << macro_input << endl;
3287 } // if (macro_input != nullptr)
3288
3289 // LIST OF INPUT FILES (separated by newline symbols) FROM A TEXT FILE WITH THE GIVEN PATH
3290 if (file_input != "") {
3291 // open text file with input list
3292 string str_file_input = expand_path(file_input);
3293 ifstream listFile(str_file_input.c_str());
3294 if (listFile) {
3295 string list_line;
3296 while (getline(listFile, list_line))
3297 vecInputFiles.push_back(list_line);
3298 // cout<<"Current file: "<<list_line<<endl;
3299 } else
3300 info_message("ERROR nica-scheduler$ Can not open text file with file input list!", 1);
3301 }
3302
3303 // FORM INFORMATION ON INPUT, OUTPUT and INTERMEDIATE FILES of the macro
3304 bool isFileFound = false;
3305 for (unsigned int i = 0; i < vecInputFiles.size(); i++) {
3306 unique_ptr<structFilePar> filePar = make_unique<structFilePar>();
3307 // add input file
3308 filePar->strFileIn = trim(vecInputFiles[i]);
3309 // add output file
3310 if (nstr_file_out != nullptr)
3311 filePar->nstrFileOut.SetValue(form_file_name(nstr_file_out.GetValue(), filePar->strFileIn, i + 1,
3312 batch_com.batch_temp_directory));
3313 // add start event number
3314 if (isStartEvent)
3315 filePar->ptrStartEvent = make_unique<int>(start_event);
3316 // add event count
3317 if (isEventCount)
3318 filePar->ptrEventCount = make_unique<int>(event_count);
3319 // set parallel mode and the merge option
3320 if (parallel_mode != "")
3321 filePar->strParallelMode = parallel_mode;
3322
3323 filePar->iMerge = iMerge;
3324 // form put path
3325 if (put_path != "") {
3326 filePar->strPutCommand = put_command;
3327 filePar->strPutPath =
3328 form_file_name(put_path, filePar->strFileIn, i + 1, batch_com.batch_temp_directory);
3329 filePar->strPutPath =
3330 filePar->strPutPath + string("/") + get_file_name_with_ext(filePar->strFileIn);
3331 }
3332 // form get path
3333 if (get_path != "") {
3334 filePar->strGetCommand = get_command;
3335 filePar->strGetPath =
3336 form_file_name(get_path, filePar->strFileIn, i + 1, batch_com.batch_temp_directory);
3337 if (filePar->nstrFileOut == nullptr) {
3338 info_message("ERROR nica-scheduler$ <get> must employ output file path, but it is not set!", 1);
3339 return true;
3340 }
3341 filePar->strGetPath =
3342 filePar->strGetPath + string("/") + get_file_name_with_ext(filePar->nstrFileOut.GetValue());
3343 }
3344 // form clean path
3345 if (clean_path != "") {
3346 vector<string> vec_clean_path = string_to_vector(clean_path);
3347 for (const string& cur_clean_path : vec_clean_path)
3348 filePar->vecCleanPath.push_back(
3349 form_file_name(cur_clean_path, filePar->strFileIn, i + 1, batch_com.batch_temp_directory));
3350 }
3351
3352 macro_par->vecFiles.push_back(std::move(filePar));
3353 isFileFound = true;
3354 }
3355
3356 if (!isFileFound) {
3357 info_message("ERROR nica-scheduler$ No input files were found (possible tags: "
3358 "<input>, <file_input>, <sim_input>, <exp_input>, <job_input>, <macro_input>!",
3359 1);
3360 return true;
3361 }
3362 } // FILE TAG
3363
3364 sub_node = sub_node->next;
3365 } // while (sub_node)
3366
3367 return false;
3368}
int i
Definition P4_F32vec4.h:22
map< BATCH_SYSTEM_NAME, batch_commands > mapBS
void release()
Releases a resource from the semaphore.
Semaphore(unsigned int resource_count=0)
void acquire()
Acquires a resource from the semaphore.
string GetValue()
nullString & operator=(const nullString &str)
nullString(const nullString &str)
void SetValue(string str)
void SetValue(nullptr_t)
bool operator==(nullptr_t) const
void SetValue(char *pchar)
nullString(string str)
nullString(char *pchar)
bool operator!=(nullptr_t) const
nullString(nullptr_t)
map< string, string_view > mapContainerType
const char *const UNI_DB_HOST
const char *const UNI_DB_NAME
const char *const UNI_DB_PASSWORD
const char *const UNI_DB_USERNAME
string int_to_string(int number)
char * convert_pchar_to_lowercase_new(char *input_char_array)
BATCH_SYSTEM_NAME
@ TORQUE_BATCH_SYSTEM
@ SGE_BATCH_SYSTEM
@ SLURM_BATCH_SYSTEM
string find_first_number(string const &str, bool isOnlyPositive=true)
string expand_path(string path)
string get_file_name(string path)
string find_last_number(string const &str, bool isOnlyPositive=true)
string get_directory_path(string file_path)
string get_file_name_with_ext(string path)
bool is_string_number(const string &s)
int get_linux_processor_count()
string get_app_dir_linux()
string trim(const string &str, const string &whitespace=" \t\r")
double byte_size_to_double(string byte_size_in_string, char convert_to='B')
void replace_string_in_text(string &text, string old_substring, string new_substring)
int get_batch_processor_count(BATCH_SYSTEM_NAME batch_system, string queue_name="")
string get_directory_name(string file_path)
void ThreadLocalProcess(shared_ptr< structThreadPar > thread_par)
void replace_string_in_text_ext(string &text, string old_substring, string new_substring)
void ShowHelpMessage()
void info_message(const string msg, bool is_error=0)
vector< string > glob(const string &path)
string form_file_name(string out_name, string in_name, int counter, string batch_temp_dir)
bool ParseMacroFiles(xmlNodePtr sub_node, shared_ptr< structJobPar > job_par, shared_ptr< structMacroPar > macro_par, multimap< string, unique_ptr< vector< nullString > > > &map_job_output, batch_commands batch_com)
string GenerateOutputFilePath(string path, int counter)
void SubThreadProcessFile(shared_ptr< structSubThreadPar > subthread_par)
bool isDebugMode
void ParseDatabaseParameters(string input, TString &sql, bool is_simulation)
int main()
vector< string > string_to_vector(string str, const string &delimiters=",;")
std::map< string, struct_database_settings > predefined_database_settings
Definition settings.h:16
string sheduler_exclude_hosts_option
string check_status_command
string job_dependency_option
string operative_memory1_option
string sheduler_hosts_option
string array_dependency_option
string container_run_command
char sheduler_hosts_separation
string sheduler_queue_option
string set_job_variables
string sheduler_priority_option
string batch_temp_directory
string sheduler_qos_option
string scheduler_run_job
string sheduler_workdir_option
unique_ptr< int > ptrEventCount
nullString nstrFileOut
unique_ptr< int > ptrStartEvent
vector< string > vecCleanPath
nullString nstrQOS
nullString nstrProcCount
nullString nstrLogs
nullString nstrWorkDir
nullString nstrHosts
struct_database_settings dbSettings
nullString nstrQueue
nullString nstrCommandline
string strOperativeMemory1
nullString nstrPriority
vector< shared_ptr< structMacroPar > > vecMacros
string strDdependencyName
unique_ptr< int > ptrEventCount
unique_ptr< int > ptrStartEvent
nullString nstrMacroName
vector< unique_ptr< structFilePar > > vecFiles
nullString nstrMacroPath
shared_ptr< structMacroPar > sptrMacroParameter
shared_ptr< structThreadPar > sptrThreadParameter
shared_ptr< Semaphore > semJobSemaphore
shared_ptr< structJobPar > sptrJobParameter
shared_ptr< mutex > mutJobMutex
struct_database_settings & operator=(const struct_database_settings &arg_db_settings)
struct_database_settings(string arg_db_host, string arg_db_name, string arg_db_username, string arg_db_password)
struct_database_settings(const struct_database_settings &arg_db_settings)