561 shared_ptr<structJobPar> job_par = thread_par->sptrJobParameter;
562 int thread_counter = thread_par->iThreadCounter;
565 for (
size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++) {
566 shared_ptr<structMacroPar> cur_macro = job_par->vecMacros[ind_macro];
570 if (cur_macro->vecFiles.size() > 0)
571 cur_file = cur_macro->vecFiles[thread_counter - 1].get();
574 if (cur_file ==
nullptr) {
576 stringstream ssROOTCommand;
577 ssROOTCommand << job_par->strConfigPath <<
"root -b -q '" << cur_macro->nstrMacroPath.GetValue() <<
"(";
578 if (cur_macro->strAddArgs !=
"")
579 ssROOTCommand << cur_macro->strAddArgs;
580 if (job_par->nstrLogs !=
nullptr)
581 ssROOTCommand <<
")' > " << job_par->nstrLogs.GetValue() <<
"_" << to_string(thread_counter) <<
" 2>&1";
583 ssROOTCommand <<
")'";
587 cout <<
"DEBUG nica-scheduler$ Local ROOT command = " << ssROOTCommand.str() << endl;
591 long int t = time(
nullptr);
593 stringstream ssBashFile;
594 ssBashFile <<
"temp_" << to_string(thread_counter) <<
"_" << to_string(t) <<
".sh";
596 myfile.open(ssBashFile.str().c_str());
597 myfile <<
"export ROOT_HIST=0 " << endl;
598 myfile << ssROOTCommand.str();
600 stringstream ssRunCommand;
601 ssRunCommand <<
"bash " << ssBashFile.str();
604 thread_par->mutJobMutex->lock();
605 cout <<
"nica-scheduler$ Task " << ind_macro + 1 <<
" is running..." << endl << endl;
606 thread_par->mutJobMutex->unlock();
609 int system_return = system(ssRunCommand.str().c_str());
610 if (system_return != 0)
611 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
612 << system_return << endl;
616 stringstream ssDeleteCommand;
617 ssDeleteCommand <<
"rm -rf " << ssBashFile.str();
618 system_return = system(ssDeleteCommand.str().c_str());
619 if (system_return != 0)
620 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
621 << system_return << endl;
623 cout <<
"DEBUG nica-scheduler$ '" << ssBashFile.str() <<
"' bash file was used " << endl;
630 string strRealInputFile = cur_file->
strFileIn;
636 stringstream ssCopyCommand;
638 int system_return = system(ssCopyCommand.str().c_str());
639 if (system_return != 0)
640 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
641 << system_return << endl;
650 int* cur_start_event = cur_macro->ptrStartEvent.get();
655 int* cur_event_count = cur_macro->ptrEventCount.get();
660 if ((cur_file->
iParallelMode > 1) && (cur_start_event ==
nullptr)) {
663 cout <<
"nica-scheduler$ WARNING: parallel_mode is used, but start event is not set, so it is assigned "
668 if ((cur_file->
iParallelMode > 1) && (cur_event_count ==
nullptr)) {
671 cout <<
"nica-scheduler$ WARNING: parallel_mode is used, but event count is not set, so it is assigned "
677 if ((cur_file->
iParallelMode > 1) && ((cur_start_event ==
nullptr) || (cur_event_count ==
nullptr))) {
679 cout <<
"nica-scheduler$ WARNING: parallel_mode must be used with 'start_event' and 'event_count' "
680 "parameters, parallel_mode is ignored!"
684 int real_thread_count = 1;
686 string strOutputUnion = nstrRealOutputFile.
GetValue();
689 int start_event = -1, event_count = -1;
690 if (cur_start_event !=
nullptr)
691 start_event = *cur_start_event;
692 if (cur_event_count !=
nullptr)
693 event_count = *cur_event_count;
697 string config_paths = job_par->strConfigPath;
699 if (job_par->nstrLogs ==
nullptr)
701 stringstream ssEventNumberCommand;
702 ssEventNumberCommand << config_paths <<
"show_event_count auto \"" << strRealInputFile
703 <<
"\" 2> /dev/null";
705 cout <<
"DEBUG nica-scheduler$ trying to get event count from the input file: "
706 << ssEventNumberCommand.str() << endl;
708 array<char, 128> bufferCommand;
709 string str_event_number =
"";
710 FILE* stream = popen(ssEventNumberCommand.str().c_str(),
"r");
711 while (fgets(bufferCommand.data(),
static_cast<int>(bufferCommand.size()), stream) !=
nullptr)
712 str_event_number += bufferCommand.data();
715 if (str_event_number ==
"") {
716 info_message(TString::Format(
"ERROR nica-scheduler$ parallel_mode is used, but event_count is not "
717 "set and cannot be automatically defined. "
718 "'%s' file will be skipped!",
719 strRealInputFile.c_str())
724 event_count = atoi(str_event_number.c_str());
726 cout <<
"DEBUG nica-scheduler$ " << event_count <<
" event(s) found in the input file" << endl;
730 vector<nullString> vecPartialOutputFiles;
736 vector<thread> vecSubThreads;
740 int event_per_thread = (event_count +
i) / cur_file->
iParallelMode;
741 if (event_per_thread != 0) {
742 nullString nstr_output_file = nstrRealOutputFile;
744 if (nstr_output_file !=
nullptr)
749 shared_ptr<structSubThreadPar> subthread_par = make_shared<structSubThreadPar>();
750 subthread_par->sptrThreadParameter = thread_par;
751 subthread_par->sptrMacroParameter = cur_macro;
752 subthread_par->iStartEvent = start_event;
753 subthread_par->iEventCount = event_per_thread;
754 subthread_par->strFileIn = strRealInputFile;
755 subthread_par->nstrFileOut = nstr_output_file;
756 subthread_par->iSubthreadCounter =
i + 1;
760 thread_par->semJobSemaphore->release();
762 thread_par->semJobSemaphore->acquire();
767 }
catch (
const system_error& e) {
768 info_message((TString::Format(
"ERROR nica-scheduler$ error while creating sub-thread with "
769 "code = %d and message = %s",
770 e.code().value(), e.what()))
776 if (nstr_output_file !=
nullptr) {
777 strOutputUnion +=
" " + nstr_output_file.
GetValue();
778 vecPartialOutputFiles.push_back(nstr_output_file);
784 start_event = start_event + event_per_thread;
789 cout <<
"nica-scheduler$ Waiting for " << real_thread_count <<
" subtask(s) to finish..." << endl;
790 for (
auto& sub_thread : vecSubThreads) {
793 }
catch (
const system_error& e) {
795 (TString::Format(
"ERROR nica-scheduler$ Sub-thread failed with code = %d and message = %s",
796 e.code().value(), e.what()))
801 cout <<
"nica-scheduler$ Subtasks " << ind_macro + 1 <<
":[] has been finished" << endl;
802 thread_par->semJobSemaphore->acquire();
807 if ((cur_file->
iMerge >= 0) && (strOutputUnion !=
"")) {
811 stringstream ssMergeCommand;
812 ssMergeCommand << job_par->strConfigPath <<
"root -b -q '" << UNIONc_path <<
"(\"" << strOutputUnion
813 <<
"\", " << to_string(cur_file->
iMerge) <<
")'";
816 if (job_par->nstrLogs !=
nullptr)
817 ssMergeCommand <<
" >> " << job_par->nstrLogs.GetValue() <<
" 2>&1";
821 long int t = time(
nullptr);
822 stringstream ssBashFile;
823 ssBashFile <<
"temp_union_" << to_string(t) <<
".sh";
824 myfile.open(ssBashFile.str().c_str());
825 myfile << ssMergeCommand.str();
827 stringstream ssBashCommand;
828 ssBashCommand <<
"bash " << ssBashFile.str().c_str();
831 <<
"nica-scheduler$ Merging the result of subtasks " << ind_macro + 1 <<
":[]..." << endl
833 int system_return = system(ssBashCommand.str().c_str());
834 if (system_return != 0)
835 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
836 << system_return << endl;
840 stringstream ssRemoveCommand;
841 ssRemoveCommand <<
"rm -rf " << ssBashFile.str();
842 system_return = system(ssRemoveCommand.str().c_str());
843 if (system_return != 0)
844 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero "
846 << system_return << endl;
848 cout <<
"DEBUG nica-scheduler$ '" << ssBashFile.str() <<
"' bash file was used " << endl;
857 stringstream ssROOTCommand;
858 ssROOTCommand << job_par->strConfigPath <<
"root -b -q '" << cur_macro->nstrMacroPath.GetValue() <<
"(";
861 bool is_first =
true;
862 if (strRealInputFile !=
"") {
863 ssROOTCommand <<
"\"" << strRealInputFile <<
"\"";
867 if (nstrRealOutputFile !=
nullptr) {
869 ssROOTCommand <<
", ";
872 ssROOTCommand <<
"\"" << nstrRealOutputFile.
GetValue() <<
"\"";
875 if (cur_start_event !=
nullptr) {
877 ssROOTCommand <<
", ";
880 ssROOTCommand << to_string(*cur_start_event);
883 if (cur_event_count !=
nullptr) {
885 ssROOTCommand <<
", ";
888 ssROOTCommand << to_string(*cur_event_count);
891 if (cur_macro->strAddArgs !=
"") {
893 ssROOTCommand <<
", ";
896 ssROOTCommand << cur_macro->strAddArgs;
900 if (job_par->nstrLogs !=
nullptr)
901 ssROOTCommand <<
")' > " << job_par->nstrLogs.GetValue() <<
"_" << to_string(thread_counter)
904 ssROOTCommand <<
")'";
908 cout <<
"DEBUG nica-scheduler$ Local command = " << ssROOTCommand.str() << endl;
912 long int t = time(
nullptr);
914 stringstream ssBashFile;
915 ssBashFile <<
"temp_" << to_string(thread_counter) <<
"_" <<
"_" << to_string(t) <<
".sh";
917 myfile.open(ssBashFile.str().c_str());
918 myfile <<
"export ROOT_HIST=0 " << endl;
919 myfile << ssROOTCommand.str();
921 stringstream ssRunCommand;
922 ssRunCommand <<
"bash " << ssBashFile.str();
925 thread_par->mutJobMutex->lock();
926 cout <<
"nica-scheduler$ Task " << thread_counter <<
" is running..." << endl;
927 if (strRealInputFile !=
"")
928 cout <<
"input data - " << strRealInputFile << endl;
929 if (nstrRealOutputFile !=
nullptr)
930 cout <<
"output data - " << nstrRealOutputFile.
GetValue() << endl;
932 thread_par->mutJobMutex->unlock();
935 int system_return = system(ssRunCommand.str().c_str());
936 if (system_return != 0)
937 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
938 << system_return << endl;
942 stringstream ssRemoveCommand;
943 ssRemoveCommand <<
"rm -rf " << ssBashFile.str();
944 system_return = system(ssRemoveCommand.str().c_str());
945 if (system_return != 0)
946 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
947 << system_return << endl;
949 cout <<
"DEBUG nica-scheduler$ '" << ssBashFile.str() <<
"' bash file was used " << endl;
954 stringstream ssCopyCommand;
957 int system_return = system(ssCopyCommand.str().c_str());
958 if (system_return != 0)
959 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
960 << system_return << endl;
964 for (
nullString& cur_partial_file : vecPartialOutputFiles) {
965 stringstream ssCopyPartialsCommand;
966 ssCopyPartialsCommand << cur_file->
strGetCommand <<
" " << cur_partial_file.GetValue() <<
" "
969 system_return = system(ssCopyPartialsCommand.str().c_str());
970 if (system_return != 0)
971 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero "
973 << system_return << endl;
980 stringstream ssRemoveCommand;
981 ssRemoveCommand <<
"rm -rf " << cur_file->
strPutPath;
982 int system_return = system(ssRemoveCommand.str().c_str());
983 if (system_return != 0)
984 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
985 << system_return << endl;
989 stringstream ssRemoveCommand;
990 ssRemoveCommand <<
"rm -rf " << cur_file->
strGetPath;
991 int system_return = system(ssRemoveCommand.str().c_str());
992 if (system_return != 0)
993 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
994 << system_return << endl;
998 for (
nullString& cur_partial_file : vecPartialOutputFiles) {
999 stringstream ssRemovePartialCommand;
1000 ssRemovePartialCommand <<
"rm -rf " << cur_partial_file.GetValue();
1001 system_return = system(ssRemovePartialCommand.str().c_str());
1002 if (system_return != 0)
1003 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero "
1005 << system_return << endl;
1012 stringstream ssRemoveCommand;
1013 for (
const string& cur_clean_path : cur_file->
vecCleanPath)
1014 ssRemoveCommand <<
"rm -rf " << cur_clean_path <<
"; ";
1015 int system_return = system(ssRemoveCommand.str().c_str());
1016 if (system_return != 0)
1017 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned non-zero code: "
1018 << system_return << endl;
1022 cout <<
"nica-scheduler$ Task " << thread_counter <<
" has been finished" << endl;
1026 thread_par->semJobSemaphore->release();
1054 info_message(
"ERROR nica-scheduler$ XML file path (containing job description) must be set", 1);
1059 string xml_file_path =
"", exp_database =
"";
1060 int option_index = 0, c;
1061 while ((c = getopt_long(argc, argv,
"hf:d", long_options, &option_index)) != -1) {
1064 if (long_options[option_index].flag != 0)
1066 printf(
"option %s", long_options[option_index].name);
1068 printf(
"with arg %s", optarg);
1071 if (strcmp(long_options[option_index].name,
"db") == 0)
1072 exp_database = optarg;
1073 if (strcmp(long_options[option_index].name,
"database") == 0)
1074 exp_database = optarg;
1081 xml_file_path = optarg;
1092 if (optind < argc) {
1093 if (((argc - optind) == 1) && (xml_file_path ==
""))
1094 xml_file_path = argv[optind];
1096 info_message(
"ERROR nica-scheduler$ Argument list is incorrect.\nPlease, check information on command line "
1097 "arguments in the 'README.md' file",
1104 if (xml_file_path ==
"") {
1105 info_message(
"ERROR nica-scheduler$ XML file path (containing job description) must be set", 1);
1111 bool isDBSettingsHigh =
false;
1113 if (exp_database !=
"") {
1116 info_message((TString::Format(
"ERROR nica-scheduler$ Predefined database settings were not found for '%s'",
1117 exp_database.c_str()))
1122 dbSettingsPre = iter_db_settings->second;
1123 isDBSettingsHigh =
true;
1128 xmlDocPtr doc = xmlReadFile(xml_file_path.c_str(),
nullptr, 0);
1129 if (doc ==
nullptr) {
1131 (TString::Format(
"ERROR nica-scheduler$ Failed to open input XML file: '%s'", xml_file_path.c_str()))
1138 xmlNodePtr root = xmlDocGetRootElement(doc);
1141 multimap<string, string> mapJobName2ID;
1143 multimap<string, unique_ptr<vector<nullString>>> mapJobName2FileOut;
1149 srand(time(
nullptr));
1153 xmlNodePtr cur_node = root;
1156 if (cur_node->type == XML_ELEMENT_NODE) {
1160 if (strcmp((
char*)cur_node->name,
"jobs") == 0) {
1162 pc_temp = (
char*)xmlGetProp(cur_node, (
unsigned char*)
"batch");
1163 if (pc_temp !=
nullptr) {
1165 if (strcmp(pc_batch_system,
"sge") == 0)
1168 if (strcmp(pc_batch_system,
"torque") == 0)
1172 free(pc_batch_system);
1175 pc_temp = (
char*)xmlGetProp(cur_node, (
unsigned char*)
"database");
1176 if ((pc_temp !=
nullptr) && (!isDBSettingsHigh)) {
1179 info_message((TString::Format(
"WARNING nica-scheduler$ Predefined database settings were not "
1180 "found for '%s'. The parameter will be ignored.",
1185 dbSettingsPre = iter_db_settings->second;
1189 cur_node = cur_node->children;
1193 if (strcmp((
char*)cur_node->name,
"job") == 0) {
1194 bool isError =
false;
1196 shared_ptr<structJobPar> job_par = make_shared<structJobPar>();
1199 pc_temp = (
char*)xmlGetProp(cur_node, (
unsigned char*)
"name");
1200 if (pc_temp !=
nullptr) {
1201 job_par->strJobName = pc_temp;
1207 pc_temp = (
char*)xmlGetProp(cur_node, (
unsigned char*)
"database");
1208 if ((pc_temp !=
nullptr) && (!isDBSettingsHigh)) {
1211 info_message((TString::Format(
"WARNING nica-scheduler$ Predefined database settings were not "
1212 "found for '%s'. The parameter will be ignored.",
1217 dbSettings = iter_db_settings->second;
1220 if (dbSettings.
db_name ==
"") {
1226 job_par->dbSettings = dbSettings;
1229 pc_temp = (
char*)xmlGetProp(cur_node, (
unsigned char*)
"batch");
1230 if (pc_temp !=
nullptr) {
1232 if (strcmp(pc_batch_system,
"slurm") == 0)
1235 if (strcmp(pc_batch_system,
"torque") == 0)
1239 free(pc_batch_system);
1247 string strBatchFile = batch_com.
job_begin, strBatchInitInputFile =
"declare -a InputFileArray=(",
1248 strBatchInitOutputFile =
"declare -a OutputFileArray=(",
1249 strBatchInitStartEvent =
"declare -a StartEventArray=(",
1250 strBatchInitEventCount =
"declare -a EventCountArray=(",
1251 strBatchInitAddArgs =
"declare -a AddArgsArray=(", strBatchInitPut =
"declare -a PutArray=(",
1252 strBatchInitGet =
"declare -a GetArray=(", strBatchInitClean =
"declare -a CleanArray=(";
1255 pc_temp = (
char*)xmlGetProp(cur_node, (
unsigned char*)
"dependency");
1256 if (pc_temp !=
nullptr) {
1257 multimap<string, string>::iterator it = mapJobName2ID.find(pc_temp);
1258 if (it == mapJobName2ID.end())
1259 cout <<
"nica-scheduler$ WARNING: the following dependency for '" << job_par->strJobName
1260 <<
"' job was not found: " << pc_temp << endl;
1267 xmlNodePtr sub_node = cur_node->children;
1269 if (sub_node->type == XML_ELEMENT_NODE) {
1271 if (strcmp((
char*)sub_node->name,
"command") == 0) {
1272 if (job_par->nstrCommandline !=
nullptr)
1273 cout <<
"nica-scheduler$ WARNING: job command line is reassigned" << endl;
1275 job_par->nstrCommandline.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"line"));
1276 if (job_par->nstrCommandline ==
nullptr) {
1278 info_message(
"ERROR nica-scheduler$ 'line' attribute was not set for the command!", 1);
1282 sub_node = sub_node->next;
1287 if (strcmp((
char*)sub_node->name,
"macro") == 0) {
1288 shared_ptr<structMacroPar> macro_par = make_shared<structMacroPar>();
1293 macro_par->nstrMacroPath.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"path"));
1294 if (macro_par->nstrMacroPath ==
nullptr) {
1297 "ERROR nica-scheduler$ Macro path was not set (please, add 'path' attribute)!", 1);
1302 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"start_event");
1303 if (pc_temp !=
nullptr) {
1304 macro_par->ptrStartEvent = make_unique<int>(atoi(pc_temp));
1309 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"event_count");
1310 if (pc_temp !=
nullptr) {
1313 info_message(
"ERROR nica-scheduler$ Event count must be a number!", 1);
1317 macro_par->ptrEventCount = make_unique<int>(atoi(pc_temp));
1318 if (*macro_par->ptrEventCount < 0) {
1320 info_message(
"ERROR nica-scheduler$ Event count must be a positive number or 0 "
1321 "(for all events)!",
1330 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"add_args");
1331 if (pc_temp !=
nullptr) {
1332 macro_par->strAddArgs = pc_temp;
1339 if (
ParseMacroFiles(sub_node->children, job_par, macro_par, mapJobName2FileOut, batch_com)
1343 job_par->vecMacros.push_back(macro_par);
1345 sub_node = sub_node->next;
1350 if (strcmp((
char*)sub_node->name,
"run") == 0) {
1352 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"mode");
1353 if (pc_temp !=
nullptr) {
1354 if (strcmp(pc_temp,
"global") == 0)
1355 job_par->iRunMode = 1;
1356 else if (strcmp(pc_temp,
"container") == 0)
1357 job_par->iRunMode = 2;
1364 job_par->nstrProcCount.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"count"));
1367 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"memory1");
1368 if (pc_temp !=
nullptr) {
1373 cout <<
"nica-scheduler$ WARNING: 'memory1' field contains incorrect format, then "
1380 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"config");
1381 if (pc_temp !=
nullptr) {
1382 job_par->strConfigPath = pc_temp;
1386 job_par->nstrPriority.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"priority"));
1388 job_par->nstrLogs.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"logs"));
1390 job_par->nstrQueue.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"queue"));
1392 job_par->nstrQOS.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"qos"));
1394 job_par->nstrWorkDir.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"work_dir"));
1396 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"container_os");
1397 if (pc_temp !=
nullptr) {
1398 job_par->strContainerType = pc_temp;
1403 job_par->nstrHosts.SetValue((
char*)xmlGetProp(sub_node, (
unsigned char*)
"hosts"));
1406 replace(job_par->nstrHosts.GetValue().begin(), job_par->nstrHosts.GetValue().end(),
',',
1410 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"core_dump");
1411 if (pc_temp !=
nullptr) {
1412 if (strcmp(pc_temp,
"true") == 0)
1413 job_par->isCoreDump =
true;
1417 sub_node = sub_node->next;
1422 sub_node = sub_node->next;
1429 if ((job_par->nstrCommandline ==
nullptr) && (job_par->vecMacros.size() == 0)) {
1430 info_message(
"ERROR nica-scheduler$ No macros in the job!", 1);
1435 info_message(
"ERROR nica-scheduler$ This job will be skipped because of errors above!", 1);
1436 cur_node = cur_node->next;
1441 unique_ptr<vector<nullString>> vecJobFileOut = make_unique<vector<nullString>>();
1444 int globalParallelCount = 0;
1445 if (job_par->nstrCommandline !=
nullptr)
1446 globalParallelCount = 1;
1448 for (
size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++) {
1449 int iTotalParallelCount;
1451 if (job_par->nstrCommandline !=
nullptr)
1452 iTotalParallelCount = 1;
1454 iTotalParallelCount = 0;
1456 shared_ptr<structMacroPar> cur_macro = job_par->vecMacros[ind_macro];
1458 for (
const auto& iter : cur_macro->vecFiles) {
1464 if (job_par->iRunMode == 0)
1468 if (job_par->nstrQueue ==
nullptr)
1491 if (ind_macro == (job_par->vecMacros.size() - 1))
1495 if ((ind_macro > 0) && (globalParallelCount != iTotalParallelCount)) {
1496 info_message(
"ERROR nica-scheduler$ The file count in macros of the job is not the same. This "
1497 "job will be skipped!",
1499 cur_node = cur_node->next;
1502 globalParallelCount = iTotalParallelCount;
1506 mapJobName2FileOut.insert(
1507 pair<
string, unique_ptr<vector<nullString>>>(job_par->strJobName, std::move(vecJobFileOut)));
1511 job_par->strConfigPath =
trim(job_par->strConfigPath);
1512 if (job_par->strConfigPath !=
"") {
1514 if (job_par->strConfigPath[job_par->strConfigPath.length() - 1] ==
';')
1515 job_par->strConfigPath = job_par->strConfigPath.substr(0, job_par->strConfigPath.length() - 1);
1517 job_par->strConfigPath =
". " + job_par->strConfigPath +
"; ";
1519 if (job_par->nstrLogs !=
nullptr)
1521 (
string)
" > " + job_par->nstrLogs.GetValue() + (
string)
" 2>&1;");
1527 FILE* stream =
nullptr;
1529 if (job_par->iRunMode > 0) {
1531 string strApptainerRunFile =
"#!/bin/bash\n\n";
1532 if (job_par->iRunMode == 2) {
1533 strApptainerRunFile +=
"echo \"Starting Apptainer Job\"\n";
1534 if (job_par->strConfigPath !=
"")
1535 strApptainerRunFile += job_par->strConfigPath +
"\n\n";
1536 strApptainerRunFile +=
"RUN_CMD=\"root -q -b \"\n";
1537 strApptainerRunFile +=
"RUN_CMD+=\"${@}\"\n\n";
1539 strApptainerRunFile +=
1540 "echo \"DEBUG nica-scheduler: ROOT command in container = $RUN_CMD\"\n";
1541 strApptainerRunFile +=
"eval $RUN_CMD\n";
1542 strApptainerRunFile +=
"exit 0\n";
1547 if (job_par->nstrProcCount !=
nullptr) {
1549 if ((job_par->nstrProcCount.GetValue()[0] ==
'0')
1550 && (job_par->nstrProcCount.GetValue().length() == 1))
1552 if (job_par->nstrQueue ==
nullptr)
1557 if (proc_count == 0) {
1558 info_message(
"ERROR nica-scheduler$ Batch processors were not defined. This job will "
1561 cur_node = cur_node->next;
1565 cout <<
"nica-scheduler$ The batch queue has " << proc_count
1566 <<
" processor cores being available for user jobs." << endl;
1568 proc_count = atoi(job_par->nstrProcCount.GetValue().c_str());
1570 proc_count = globalParallelCount;
1572 if (proc_count == 0) {
1574 "ERROR nica-scheduler$ Processor count can not be equal 0. This job will be skipped!", 1);
1575 cur_node = cur_node->next;
1580 if (job_par->isCoreDump)
1581 strBatchFile +=
"ulimit -c 0\n\n";
1584 if ((job_par->iRunMode != 2) && (job_par->strConfigPath !=
""))
1585 strBatchFile += job_par->strConfigPath +
"\n\n";
1589 if (job_par->nstrPriority !=
nullptr)
1590 iPriority = atoi(job_par->nstrPriority.GetValue().c_str());
1593 if ((globalParallelCount > 0) && (proc_count > globalParallelCount))
1594 proc_count = globalParallelCount;
1597 vector<int> vecMergeMode;
1598 vector<nullString> vecPartialOutputFiles;
1601 if (globalParallelCount > 0) {
1603 for (
const auto& cur_macro : job_par->vecMacros) {
1605 for (
const auto& cur_file : cur_macro->vecFiles) {
1607 string strRealInputFile = cur_file->strFileIn;
1609 nullString nstrRealOutputFile = cur_file->nstrFileOut;
1613 if (cur_file->strPutPath !=
"")
1614 strRealInputFile = cur_file->strPutPath;
1617 if (cur_file->strGetPath !=
"")
1618 nstrRealOutputFile.
SetValue(cur_file->strGetPath);
1621 int* cur_start_event = cur_macro->ptrStartEvent.get();
1622 if (cur_file->ptrStartEvent !=
nullptr)
1623 cur_start_event = cur_file->ptrStartEvent.get();
1626 int* cur_event_count = cur_macro->ptrEventCount.get();
1627 if (cur_file->ptrEventCount !=
nullptr)
1628 cur_event_count = cur_file->ptrEventCount.get();
1631 if ((cur_file->iParallelMode > 1) && (cur_start_event ==
nullptr)) {
1632 cur_file->ptrStartEvent = make_unique<int>(0);
1633 cur_start_event = cur_file->ptrStartEvent.get();
1634 cout <<
"nica-scheduler$ WARNING: parallel_mode is used, but start event is not "
1635 "set, so it is assigned to 0"
1639 if ((cur_file->iParallelMode > 1) && (cur_event_count ==
nullptr)) {
1640 cur_file->ptrEventCount = make_unique<int>(0);
1641 cur_event_count = cur_file->ptrEventCount.get();
1642 cout <<
"nica-scheduler$ WARNING: parallel_mode is used, but event count is not "
1643 "set, so it is assigned to 0"
1648 string strOutputUnion = nstrRealOutputFile.
GetValue();
1651 int start_event = -1, event_count = -1;
1652 if (cur_start_event !=
nullptr)
1653 start_event = *cur_start_event;
1654 if (cur_event_count !=
nullptr)
1655 event_count = *cur_event_count;
1659 if ((cur_file->iParallelMode > 1) && (event_count == 0)) {
1660 string config_paths = job_par->strConfigPath;
1662 if (job_par->nstrLogs ==
nullptr)
1664 stringstream ssEventNumberCommand;
1665 ssEventNumberCommand << config_paths <<
"show_event_count auto \""
1666 << strRealInputFile <<
"\" 2> /dev/null";
1668 cout <<
"DEBUG nica-scheduler$ trying to get event count from the input file: "
1669 << ssEventNumberCommand.str() << endl;
1671 array<char, 128> bufferCommand;
1672 string str_event_number =
"";
1673 stream = popen(ssEventNumberCommand.str().c_str(),
"r");
1674 while (fgets(bufferCommand.data(),
static_cast<int>(bufferCommand.size()), stream)
1676 str_event_number += bufferCommand.data();
1679 if (str_event_number ==
"") {
1681 "ERROR nica-scheduler$ parallel_mode is used, but event_count "
1682 "is not set and cannot be automatically defined. "
1683 "'%s' file will be skipped!",
1684 strRealInputFile.c_str())
1689 event_count = atoi(str_event_number.c_str());
1691 cout <<
"DEBUG nica-scheduler$ " << event_count
1692 <<
" event(s) found in the input file" << endl;
1695 int merge_proc_count = 0;
1697 for (
int j = 0; j < cur_file->iParallelMode; j++) {
1698 int event_per_proc = event_count;
1699 if (cur_file->iParallelMode > 1) {
1700 event_per_proc = (event_count + j) / proc_count;
1701 if (event_per_proc == 0) {
1702 start_event = start_event + event_per_proc;
1709 nullString nstr_output_file = nstrRealOutputFile;
1712 strBatchInitInputFile +=
"\"";
1713 strBatchInitInputFile += strRealInputFile;
1714 strBatchInitInputFile +=
"\" ";
1717 if (nstr_output_file !=
nullptr) {
1719 if (cur_file->iParallelMode > 1)
1722 strBatchInitOutputFile +=
"\"";
1723 strBatchInitOutputFile += nstr_output_file.
GetValue();
1724 strBatchInitOutputFile +=
"\" ";
1726 strBatchInitOutputFile +=
"\">\" ";
1729 if (cur_start_event !=
nullptr) {
1731 strBatchInitStartEvent +=
"\"";
1732 strBatchInitStartEvent += strInt;
1733 strBatchInitStartEvent +=
"\" ";
1735 strBatchInitStartEvent +=
"\">\" ";
1738 if (cur_event_count !=
nullptr) {
1740 strBatchInitEventCount +=
"\"";
1741 strBatchInitEventCount += strInt;
1742 strBatchInitEventCount +=
"\" ";
1744 strBatchInitEventCount +=
"\">\" ";
1747 if (cur_macro->strAddArgs !=
"") {
1748 strBatchInitAddArgs +=
"\"";
1749 string mask_string = cur_macro->strAddArgs;
1751 strBatchInitAddArgs += mask_string;
1752 strBatchInitAddArgs +=
"\" ";
1754 strBatchInitAddArgs +=
"\">\" ";
1756 bool is_clean =
false;
1759 if (cur_file->strPutPath !=
"") {
1761 strBatchInitPut +=
"\"";
1762 strBatchInitPut += cur_file->strPutCommand;
1763 strBatchInitPut +=
" ";
1764 strBatchInitPut += cur_file->strFileIn;
1765 strBatchInitPut +=
" ";
1766 strBatchInitPut += cur_file->strPutPath;
1768 if (cur_file->iParallelMode > 1) {
1769 strBatchInitPut +=
"; touch ";
1770 strBatchInitPut += cur_file->strPutPath + (string)
".lock\" ";
1772 if (is_clean ==
false)
1773 strBatchInitClean +=
"\"";
1775 strBatchInitClean +=
" ";
1776 strBatchInitClean += cur_file->strPutPath + (string)
".lock\" ";
1779 strBatchInitPut +=
"\" ";
1785 strBatchInitPut +=
"\"";
1786 strBatchInitPut +=
"wait ";
1787 strBatchInitPut += cur_file->strPutPath;
1788 strBatchInitPut +=
".lock\" ";
1791 strBatchInitPut +=
"\">\" ";
1794 if (cur_file->strGetPath !=
"") {
1795 strBatchInitGet +=
"\"";
1796 strBatchInitGet += cur_file->strGetCommand;
1797 strBatchInitGet +=
" ";
1798 if (cur_file->iParallelMode > 1) {
1799 strBatchInitGet += nstr_output_file.
GetValue();
1800 strBatchInitGet +=
" ";
1805 strBatchInitGet += cur_file->strGetPath;
1806 strBatchInitGet +=
" ";
1807 strBatchInitGet += cur_file->nstrFileOut.GetValue();
1809 strBatchInitGet +=
"\" ";
1811 strBatchInitGet +=
"\">\" ";
1815 if (cur_file->strPutPath !=
"") {
1816 if (is_clean ==
false)
1817 strBatchInitClean +=
"\"";
1819 strBatchInitClean +=
" ";
1820 strBatchInitClean += cur_file->strPutPath;
1824 if (cur_file->strGetPath !=
"") {
1825 if (is_clean ==
false)
1826 strBatchInitClean +=
"\"";
1828 strBatchInitClean +=
" ";
1829 strBatchInitClean += cur_file->strGetPath;
1833 for (
const auto& str_clean_path : cur_file->vecCleanPath) {
1834 if (is_clean ==
false)
1835 strBatchInitClean +=
"\"";
1837 strBatchInitClean +=
" ";
1838 strBatchInitClean += str_clean_path;
1843 strBatchInitClean +=
"\" ";
1846 if (nstr_output_file !=
nullptr)
1847 strOutputUnion +=
" " + nstr_output_file.
GetValue();
1849 start_event = start_event + event_per_proc;
1852 if ((merge_proc_count > 1) && (cur_file->iMerge >= 0)) {
1856 vecPartialOutputFiles.push_back(strOutputUnion);
1857 vecMergeMode.push_back(cur_file->iMerge);
1864 if (job_par->iRunMode == 2)
1866 strBatchFile +=
"export CONTAINER_PATH=";
1869 strBatchFile += it->second;
1871 info_message((TString::Format(
"ERROR nica-scheduler$ Undefined container type: '%s'",
1872 job_par->strContainerType.c_str()))
1877 strBatchFile +=
"\n\n";
1881 if (job_par->nstrCommandline !=
nullptr) {
1882 strBatchFile += job_par->nstrCommandline.GetValue();
1883 strBatchFile +=
"\n";
1889 strBatchFile += strBatchInitInputFile;
1890 strBatchFile +=
")\n\n";
1891 strBatchFile += strBatchInitOutputFile;
1892 strBatchFile +=
")\n\n";
1893 strBatchFile += strBatchInitStartEvent;
1894 strBatchFile +=
")\n\n";
1895 strBatchFile += strBatchInitEventCount;
1896 strBatchFile +=
")\n\n";
1897 strBatchFile += strBatchInitAddArgs;
1898 strBatchFile +=
")\n\n";
1899 strBatchFile += strBatchInitPut, strBatchFile +=
")\n\n";
1900 strBatchFile += strBatchInitGet, strBatchFile +=
")\n\n";
1901 strBatchFile += strBatchInitClean, strBatchFile +=
")\n\n";
1902 strBatchFile +=
"\n";
1905 for (
size_t ind_macro = 0; ind_macro < job_par->vecMacros.size(); ind_macro++) {
1909 int start_line_number = ind_macro * cur_macro->
vecFiles.size();
1910 strBatchFile +=
"let \"array_index = \"";
1912 strBatchFile +=
"\" - 1 + ";
1913 strBatchFile += to_string(start_line_number);
1914 strBatchFile +=
"\"\n\n";
1918 strBatchFile +=
"element=${PutArray[${array_index}]}\n"
1919 "if [ \"${element}\" != \">\" ]\nthen\n"
1920 " # Check if PutElement contains \"wait <file_path>\"\n"
1921 " if [[ \"${element}\" == *\"wait \"* ]]; then\n"
1922 " # Extract file path after \"wait \"\n"
1923 " lock_file_path=\"${1#wait }\"\n"
1924 " # Wait until file appears\n"
1925 " while [ ! -f \"$lock_file_path\" ]; do\n";
1927 strBatchFile +=
" echo \"DEBUG nica-scheduler: wait while putting file\"\n";
1928 strBatchFile +=
" sleep 5\n"
1931 " set -f # avoid globbing (expansion of *)\n"
1932 " # arr=(${element// / })\n"
1933 " # comm=\"${arr[0]} ${arr[1]} ${arr[2]}\"\n"
1936 strBatchFile +=
" echo \"DEBUG nica-scheduler: PUT command = ${element}\"\n";
1937 strBatchFile +=
" eval ${element}\n"
1942 strBatchFile +=
"element=${InputFileArray[${array_index}]}\n";
1944 if (job_par->iRunMode == 1) {
1945 strBatchFile +=
"export ROOT_HIST=0\n";
1946 strBatchFile +=
"comm=\"root -b -q '";
1948 strBatchFile +=
"(\\\"${element}\\\"\"\n"
1949 "element=${OutputFileArray[${array_index}]}\n"
1950 "if [ \"${element}\" != \">\" ]\nthen\n"
1951 " comm=\"${comm},\\\"${element}\\\"\"\n";
1956 strBatchFile +=
"comm=\"'";
1959 strBatchFile +=
"(\\\\\\\"${element}\\\\\\\"\"\n"
1960 "element=${OutputFileArray[${array_index}]}\n"
1961 "if [ \"${element}\" != \">\" ]\nthen\n"
1962 " comm=\"${comm},\\\\\\\"${element}\\\\\\\"\"\n";
1965 strBatchFile +=
"fi\n"
1966 "element=${StartEventArray[${array_index}]}\n"
1967 "if [ \"${element}\" != \">\" ]\nthen\n"
1968 " comm=\"${comm},${element}\"\n"
1970 "element=${EventCountArray[${array_index}]}\n"
1971 "if [ \"${element}\" != \">\" ]\nthen\n"
1972 " comm=\"${comm},${element}\"\n"
1974 "element=${AddArgsArray[${array_index}]}\n"
1975 "if [ \"${element}\" != \">\" ]\nthen\n"
1976 " comm=\"${comm},${element}\"\n"
1978 "comm=\"$comm)'\"\n\n";
1980 strBatchFile +=
" echo \"DEBUG nica-scheduler: Run Command = $comm\"\n";
1981 if (job_par->iRunMode == 1)
1982 strBatchFile +=
"eval $comm\n\n";
1985 strBatchFile +=
"element=${GetArray[${array_index}]}\n"
1986 "if [ \"${element}\" != \">\" ]\nthen\n"
1987 " set -f # avoid globbing (expansion of *)\n"
1988 "# arr=(${element// / })\n"
1989 "# comm=\"${arr[0]} ${arr[1]} ${arr[2]}\"\n"
1992 strBatchFile +=
" echo \"DEBUG nica-scheduler: GET command = ${element}\"\n";
1993 strBatchFile +=
" eval ${element}\n"
1997 strBatchFile +=
"element=${CleanArray[${array_index}]}\n"
1998 "if [ \"${element}\" != \">\" ]\nthen\n"
1999 " set -f # avoid globbing (expansion of *)\n"
2000 " arr=(${element// / })\n"
2001 " comm=\"rm -rf \"\n"
2002 " for i in \"${arr[@]}\"\n"
2004 " comm=\"${comm}${i} \"\n"
2007 strBatchFile +=
" echo \"DEBUG nica-scheduler: CLEAN command = $comm\"\n";
2008 strBatchFile +=
" eval $comm\n"
2014 if (job_par->iRunMode == 1)
2016 strBatchFile +=
"echo \" End date: `date`\"\nexit 0\n";
2019 if (job_par->iRunMode == 2)
2022 strBatchFile +=
" apptainer run -B \"/eos,/cvmfs\" --fakeroot \"$CONTAINER_PATH\" ";
2023 strBatchFile += exe_dir +
"apptainer_run.sh " +
"\"$@\" \"$comm\" \n\n";
2024 strBatchFile +=
"echo \" End date: `date`\"\nexit 0\n";
2026 string apptainer_run_file_path = exe_dir + (string)
"apptainer_run" + (
string)
".sh";
2027 FILE* pJobContFile = fopen(apptainer_run_file_path.c_str(),
"wt");
2028 if (pJobContFile ==
nullptr) {
2029 info_message(TString::Format(
"ERROR nica-scheduler$ Container file was not created: %s. "
2030 "This job will be skipped!",
2031 apptainer_run_file_path.c_str())
2034 cur_node = cur_node->next;
2038 fwrite(strApptainerRunFile.c_str(), strApptainerRunFile.length(),
sizeof(
char), pJobContFile);
2039 fclose(pJobContFile);
2041 chmod(apptainer_run_file_path.c_str(), S_IRWXU);
2049 stringstream ssMainScriptFile;
2050 ssMainScriptFile <<
"job_" << rand() << rand() <<
".sh";
2051 stringstream ssMainScriptPath;
2052 ssMainScriptPath << exe_dir << ssMainScriptFile.str();
2053 FILE* pJobFile = fopen(ssMainScriptPath.str().c_str(),
"wt");
2054 if (pJobFile ==
nullptr) {
2055 string error_path = ssMainScriptPath.str();
2057 ssMainScriptPath.str(
"");
2058 ssMainScriptPath.clear();
2059 ssMainScriptPath << ssMainScriptFile.str();
2061 pJobFile = fopen(ssMainScriptPath.str().c_str(),
"wt");
2062 if (pJobFile ==
nullptr) {
2063 error_path += (string)
" || " + ssMainScriptPath.str();
2065 ssMainScriptPath.str(
"");
2066 ssMainScriptPath.clear();
2068 pJobFile = fopen(ssMainScriptPath.str().c_str(),
"wt");
2069 if (pJobFile ==
nullptr) {
2070 error_path += (string)
" || " + ssMainScriptPath.str();
2071 info_message(TString::Format(
"ERROR nica-scheduler$ Job file was not created at: %s. "
2072 "This job will be skipped!",
2076 cur_node = cur_node->next;
2081 fwrite(strBatchFile.c_str(), strBatchFile.length(),
sizeof(
char), pJobFile);
2088 stringstream ssBatchCommand;
2090 ssBatchCommand << TString::Format(batch_com.
scheduler_run_job.c_str(), job_par->strJobName.c_str(),
2091 job_par->strDdependencyName.c_str(), globalParallelCount,
2098 if (job_par->nstrQueue !=
nullptr)
2100 job_par->nstrQueue.GetValue().c_str())
2103 if (job_par->nstrQOS !=
nullptr)
2105 job_par->nstrQOS.GetValue().c_str())
2108 if (job_par->nstrHosts !=
nullptr) {
2110 if (job_par->nstrHosts.GetValue()[0] ==
'~')
2112 job_par->nstrHosts.GetValue().substr(1).c_str())
2117 job_par->nstrHosts.GetValue().c_str())
2121 if (job_par->nstrWorkDir !=
nullptr)
2123 job_par->nstrWorkDir.GetValue().c_str())
2126 if (job_par->strOperativeMemory1 !=
"")
2128 job_par->strOperativeMemory1.c_str())
2132 ssBatchCommand <<
" " << ssMainScriptPath.str();
2135 cout <<
"DEBUG nica-scheduler$ Batch command: " << ssBatchCommand.str() << endl;
2137 array<char, 128> bufferBatch;
2139 stream = popen(ssBatchCommand.str().c_str(),
"r");
2140 while (fgets(bufferBatch.data(),
static_cast<int>(bufferBatch.size()), stream) !=
nullptr)
2141 ID += bufferBatch.data();
2146 cout <<
"DEBUG nica-scheduler$ Batch command output = " << ID << endl;
2150 info_message(
"ERROR nica-scheduler$ Job ID was not found: "
2151 "an error occurred or there is no batch system. This job will be skipped!",
2153 cur_node = cur_node->next;
2163 info_message(TString::Format(
"nica-scheduler$ The job '%s' has been submitted with ID: %s. "
2164 "Enter '%s' command to check status.",
2165 job_par->strJobName.c_str(), ID.c_str(),
2168 mapJobName2ID.insert(pair<string, string>(job_par->strJobName, ID));
2171 if (!vecPartialOutputFiles.empty()) {
2174 for (
size_t i = 0;
i < vecPartialOutputFiles.size();
i++) {
2177 string MERGEjob_path = exe_dir +
"merge.sh";
2178 stringstream ssMergeCommand;
2181 << vecPartialOutputFiles.at(
i).GetValue() <<
"\",MergeMode=\""
2182 << vecMergeMode.at(
i) <<
"\",ConfigFiles=\"" << job_par->strConfigPath
2183 <<
"\" " << MERGEjob_path;
2185 array<char, 128> bufferMerge;
2186 string localID =
"";
2187 stream = popen(ssMergeCommand.str().c_str(),
"r");
2188 while (fgets(bufferMerge.data(),
static_cast<int>(bufferMerge.size()), stream) !=
nullptr)
2189 localID += bufferMerge.data();
2193 if (localID ==
"") {
2194 info_message(
"ERROR nica-scheduler$ Union job ID was not found: an error occurred or "
2195 "there is no batch system.",
2200 cout <<
"nica-scheduler$ Task for merging parallel files has ID: " << localID << endl;
2210 mapJobName2ID.insert(pair<string, string>(job_par->strJobName,
"dummy"));
2214 if (job_par->nstrProcCount !=
nullptr) {
2215 if ((job_par->nstrProcCount.GetValue()[0] ==
'0')
2216 && (job_par->nstrProcCount.GetValue().length()) == 1)
2220 if (proc_count == 0) {
2222 "ERROR nica-scheduler$ Core number was not defined. This job will be skipped!", 1);
2223 cur_node = cur_node->next;
2227 proc_count = atoi(job_par->nstrProcCount.GetValue().c_str());
2229 proc_count = globalParallelCount == 0 ? 1 : globalParallelCount;
2231 if (proc_count == 0) {
2233 "ERROR nica-scheduler$ Process count can not be equal 0. This job will be skipped!", 1);
2234 cur_node = cur_node->next;
2239 if (proc_count > globalParallelCount)
2240 proc_count = globalParallelCount;
2242 vector<thread> vecThreads;
2244 shared_ptr<Semaphore> job_semaphore = make_shared<Semaphore>(proc_count);
2245 shared_ptr<mutex> job_mutex = make_shared<mutex>();
2248 if (job_par->nstrCommandline !=
nullptr) {
2250 stringstream ssBashCommand;
2251 ssBashCommand << job_par->strConfigPath << job_par->nstrCommandline.GetValue();
2252 if (job_par->nstrLogs !=
nullptr)
2253 ssBashCommand <<
" > " << job_par->nstrLogs.GetValue() <<
" 2>&1";
2256 stringstream ssBashFile;
2257 long int t = time(
nullptr);
2258 ssBashFile <<
"temp_" << t <<
".sh";
2261 bash_file.open(ssBashFile.str().c_str());
2262 bash_file << ssBashCommand.str();
2265 cout <<
"nica-scheduler$ Task 1 is running: command line - "
2266 << job_par->nstrCommandline.GetValue() << endl;
2269 stringstream ssBashRun;
2270 ssBashRun <<
"bash " << ssBashFile.str();
2271 int system_return = system(ssBashRun.str().c_str());
2272 if (system_return != 0)
2273 cout <<
"nica-scheduler$ WARNING: System call (in main) returned non-zero code: "
2274 << system_return << endl;
2278 stringstream ssRemoveCommand;
2279 ssRemoveCommand <<
"rm -rf " << ssBashFile.str();
2280 system_return = system(ssRemoveCommand.str().c_str());
2281 if (system_return != 0)
2282 cout <<
"nica-scheduler$ WARNING: System call (in ThreadLocalProcess) returned "
2284 << system_return << endl;
2286 cout <<
"DEBUG nica-scheduler$ '" << ssBashFile.str() <<
"' bash file was used " << endl;
2291 unsigned int thread_count = 1;
2292 if (job_par->vecMacros[0]->vecFiles.size() > 1)
2293 thread_count = job_par->vecMacros[0]->vecFiles.size();
2296 for (
unsigned int i = 0;
i < thread_count;
i++) {
2297 shared_ptr<structThreadPar> thread_par = make_shared<structThreadPar>();
2298 thread_par->sptrJobParameter = job_par;
2299 thread_par->semJobSemaphore = job_semaphore;
2300 thread_par->mutJobMutex = job_mutex;
2301 thread_par->iThreadCounter =
i + 1;
2303 job_semaphore->acquire();
2308 }
catch (
const system_error& e) {
2309 info_message((TString::Format(
"ERROR nica-scheduler$ error while creating thread with "
2310 "code = %d and message = %s",
2311 e.code().value(), e.what()))
2320 for (
auto& cur_thread : vecThreads) {
2323 }
catch (
const system_error& e) {
2325 "ERROR nica-scheduler$ Thread failed with code = %d and message = %s",
2326 e.code().value(), e.what()))
2333 cout <<
"nica-scheduler$ Local job has been finished" << endl;
2338 cur_node = cur_node->next;
2341 mapJobName2ID.clear();
2342 mapJobName2FileOut.clear();
2350 istringstream ss(input);
2355 bool isGen =
false, isEnergy =
false, isMinEnergy =
false, isMaxEnergy =
false, isBeam =
false,
2356 isTarget =
false, isDesc =
false, isPath =
false;
2357 string strGen, strBeam, strTarget, strDesc, strPath;
2358 double fEnergy, fMaxEnergy;
2360 while (getline(ss, token,
',')) {
2362 transform(token.begin(), token.end(), token.begin(), ::tolower);
2365 if ((token.length() > 4) && (token.substr(0, 4) ==
"gen=")) {
2367 strGen = token.substr(4);
2370 if ((token.length() > 7) && (token.substr(0, 7) ==
"energy=")) {
2371 token = token.substr(7);
2373 size_t indDash = token.find_first_of(
'-');
2374 if (indDash != string::npos) {
2375 stringstream stream;
2376 stream << token.substr(0, indDash);
2378 if (stream >> dVal) {
2383 if (token.length() > indDash) {
2384 stringstream stream2;
2385 stream2 << token.substr(indDash + 1);
2386 if (stream2 >> dVal) {
2396 stringstream stream;
2399 if (stream >> dVal) {
2408 if ((token.length() > 5) && (token.substr(0, 5) ==
"beam=")) {
2410 strBeam = token.substr(5);
2413 if ((token.length() > 5) && (token.substr(0, 5) ==
"desc=")) {
2415 strDesc = token.substr(5);
2418 if ((token.length() > 7) && (token.substr(0, 7) ==
"target=")) {
2420 strTarget = token.substr(7);
2423 if ((token.length() > 5) && (token.substr(0, 5) ==
"path=")) {
2425 strPath = token.substr(5);
2435 sql =
"select file_path "
2436 "from simulation_file";
2438 bool isWhere =
false;
2440 if (isGen ==
true) {
2442 sql += TString::Format(
" AND lower(generator_name) = '%s'", strGen.data());
2445 sql += TString::Format(
" "
2446 "where lower(generator_name) = '%s'",
2451 if (isEnergy ==
true) {
2461 sql += TString::Format(
"energy >= %f", fEnergy);
2463 sql += TString::Format(
" AND energy <= %f", fMaxEnergy);
2466 sql += TString::Format(
"energy <= %f", fMaxEnergy);
2468 sql += TString::Format(
"energy = %f", fEnergy);
2472 if (isBeam ==
true) {
2474 sql += TString::Format(
" AND lower(beam_particle) = '%s'", strBeam.data());
2477 sql += TString::Format(
" "
2478 "where lower(beam_particle) = '%s'",
2483 if (isTarget ==
true) {
2485 sql += TString::Format(
" AND lower(target_particle) = '%s'", strTarget.data());
2488 sql += TString::Format(
" "
2489 "where lower(target_particle) = '%s'",
2493 if (isDesc ==
true) {
2495 sql += TString::Format(
" AND lower(file_desc) like '%%%s%%'", strDesc.data());
2498 sql += TString::Format(
" "
2499 "where lower(file_desc) like '%%%s%%'",
2503 if (isPath ==
true) {
2505 sql += TString::Format(
" AND lower(file_path) like '%%%s%%'", strPath.data());
2508 sql += TString::Format(
" "
2509 "where lower(file_path) like '%%%s%%'",
2513 sql +=
" order by generator_name,energy";
2518 bool isPeriod =
false, isMinPeriod =
false, isMaxPeriod =
false, isRun =
false, isMinRun =
false,
2519 isMaxRun =
false, isBeam =
false, isTarget =
false, isEnergy =
false, isMinEnergy =
false,
2520 isMaxEnergy =
false, isEvents =
false, isMinEvents =
false, isMaxEvents =
false, isTime =
false,
2521 isMinTime =
false, isMaxTime =
false, isField =
false, isMinField =
false, isMaxField =
false,
2522 isSize =
false, isMinSize =
false, isMaxSize =
false, isPath =
false;
2523 string strBeam, strTarget, strTime, strMaxTime, strPath;
2524 int iPeriod, iMaxPeriod, iRun, iMaxRun, iEvents, iMaxEvents, iField, iMaxField;
2525 double fEnergy, fMaxEnergy, fSize, fMaxSize;
2527 while (getline(ss, token,
',')) {
2529 transform(token.begin(), token.end(), token.begin(), ::tolower);
2532 if ((token.length() > 7) && (token.substr(0, 7) ==
"period=")) {
2533 token = token.substr(7);
2535 size_t indDash = token.find_first_of(
'-');
2536 if (indDash != string::npos) {
2537 stringstream stream;
2538 stream << token.substr(0, indDash);
2540 if (stream >> iVal) {
2545 if (token.length() > indDash) {
2546 stringstream stream2;
2547 stream2 << token.substr(indDash + 1);
2548 if (stream2 >> iVal) {
2558 stringstream stream;
2561 if (stream >> iVal) {
2570 if ((token.length() > 4) && (token.substr(0, 4) ==
"run=")) {
2571 token = token.substr(4);
2573 size_t indDash = token.find_first_of(
'-');
2574 if (indDash != string::npos) {
2575 stringstream stream;
2576 stream << token.substr(0, indDash);
2578 if (stream >> iVal) {
2583 if (token.length() > indDash) {
2584 stringstream stream2;
2585 stream2 << token.substr(indDash + 1);
2586 if (stream2 >> iVal) {
2596 stringstream stream;
2599 if (stream >> iVal) {
2608 if ((token.length() > 5) && (token.substr(0, 5) ==
"beam=")) {
2610 strBeam = token.substr(5);
2613 if ((token.length() > 7) && (token.substr(0, 7) ==
"target=")) {
2615 strTarget = token.substr(7);
2618 if ((token.length() > 7) && (token.substr(0, 7) ==
"energy=")) {
2619 token = token.substr(7);
2621 size_t indDash = token.find_first_of(
'-');
2622 if (indDash != string::npos) {
2623 stringstream stream;
2624 stream << token.substr(0, indDash);
2626 if (stream >> dVal) {
2631 if (token.length() > indDash) {
2632 stringstream stream2;
2633 stream2 << token.substr(indDash + 1);
2634 if (stream2 >> dVal) {
2644 stringstream stream;
2647 if (stream >> dVal) {
2656 if ((token.length() > 6) && (token.substr(0, 6) ==
"events=")) {
2657 token = token.substr(6);
2659 size_t indDash = token.find_first_of(
'-');
2660 if (indDash != string::npos) {
2661 stringstream stream;
2662 stream << token.substr(0, indDash);
2664 if (stream >> iVal) {
2669 if (token.length() > indDash) {
2670 stringstream stream2;
2671 stream2 << token.substr(indDash + 1);
2672 if (stream2 >> iVal) {
2682 stringstream stream;
2685 if (stream >> iVal) {
2694 if ((token.length() > 5) && (token.substr(0, 5) ==
"time=")) {
2695 token = token.substr(5);
2697 size_t indDash = token.find_first_of(
'-');
2698 if (indDash != string::npos) {
2702 strTime = token.substr(0, indDash);
2704 if (token.length() > indDash) {
2707 strMaxTime = token.substr(indDash + 1);
2720 if ((token.length() > 6) && (token.substr(0, 6) ==
"field=")) {
2721 token = token.substr(6);
2723 size_t indDash = token.find_first_of(
'-');
2724 if (indDash != string::npos) {
2725 stringstream stream;
2726 stream << token.substr(0, indDash);
2728 if (stream >> iVal) {
2733 if (token.length() > indDash) {
2734 stringstream stream2;
2735 stream2 << token.substr(indDash + 1);
2736 if (stream2 >> iVal) {
2746 stringstream stream;
2749 if (stream >> iVal) {
2758 if ((token.length() > 5) && (token.substr(0, 5) ==
"size=")) {
2759 token = token.substr(5);
2761 size_t indDash = token.find_first_of(
'-');
2762 if (indDash != string::npos) {
2763 stringstream stream;
2764 stream << token.substr(0, indDash);
2766 if (stream >> dVal) {
2771 if (token.length() > indDash) {
2772 stringstream stream2;
2773 stream2 << token.substr(indDash + 1);
2774 if (stream2 >> dVal) {
2784 stringstream stream;
2787 if (stream >> dVal) {
2796 if ((token.length() > 5) && (token.substr(0, 5) ==
"path=")) {
2798 strPath = token.substr(5);
2812 sql =
"select file_path "
2815 bool isWhere =
false;
2817 if (isPeriod ==
true) {
2827 sql += TString::Format(
"period_number >= %d", iPeriod);
2829 sql += TString::Format(
" AND period_number <= %d", iMaxPeriod);
2832 sql += TString::Format(
"period_number <= %d", iMaxPeriod);
2834 sql += TString::Format(
"period_number = %d", iPeriod);
2838 if (isRun ==
true) {
2848 sql += TString::Format(
"run_number >= %d", iRun);
2850 sql += TString::Format(
" AND run_number <= %d", iMaxRun);
2853 sql += TString::Format(
"run_number <= %d", iMaxRun);
2855 sql += TString::Format(
"run_number = %d", iRun);
2859 if (isBeam ==
true) {
2861 sql += TString::Format(
" AND lower(beam_particle) = '%s'", strBeam.data());
2864 sql += TString::Format(
" "
2865 "where lower(beam_particle) = '%s'",
2870 if (isTarget ==
true) {
2872 sql += TString::Format(
" AND lower(target_particle) = '%s'", strTarget.data());
2875 sql += TString::Format(
" "
2876 "where lower(target_particle) = '%s'",
2881 if (isEnergy ==
true) {
2891 sql += TString::Format(
"energy >= %f", fEnergy);
2893 sql += TString::Format(
" AND energy <= %f", fMaxEnergy);
2896 sql += TString::Format(
"energy <= %f", fMaxEnergy);
2898 sql += TString::Format(
"energy = %f", fEnergy);
2902 if (isEvents ==
true) {
2912 sql += TString::Format(
"event_count >= %d", iEvents);
2914 sql += TString::Format(
" AND event_count <= %d", iMaxEvents);
2917 sql += TString::Format(
"event_count <= %d", iMaxEvents);
2919 sql += TString::Format(
"event_count = %d", iEvents);
2923 if (isTime ==
true) {
2934 if (!strptime(strMaxTime.c_str(),
"%Y-%m-%d %H:%M:%S", &tm)) {
2936 cout <<
"Error: " << strMaxTime
2937 <<
" is not correct datetime. DateTime format should be yyyy-mm-dd 24hh:mm:ss." << endl;
2939 if (!strptime(strTime.c_str(),
"%Y-%m-%d %H:%M:%S", &tm)) {
2941 cout <<
"Error: " << strTime
2942 <<
" is not correct datetime. DateTime format should be yyyy-mm-dd 24hh:mm:ss." << endl;
2948 sql += TString::Format(
"end_datetime >= '%s'", strTime.c_str());
2950 sql += TString::Format(
" AND start_datetime <= '%s'", strMaxTime.c_str());
2953 sql += TString::Format(
"start_datetime <= '%s'", strMaxTime.c_str());
2956 sql += TString::Format(
"start_datetime <= '%s' AND end_datetime >= '%s'", strTime.c_str(),
2962 if (isField ==
true) {
2972 sql += TString::Format(
"field_voltage >= %d", iField);
2974 sql += TString::Format(
" AND field_voltage <= %d", iMaxField);
2977 sql += TString::Format(
"field_voltage <= %d", iMaxField);
2979 sql += TString::Format(
"field_voltage = %d", iField);
2983 if (isSize ==
true) {
2993 sql += TString::Format(
"file_size >= %f", fSize);
2995 sql += TString::Format(
" AND file_size <= %f", fMaxSize);
2998 sql += TString::Format(
"file_size <= %f", fMaxSize);
3000 sql += TString::Format(
"file_size = %f", fSize);
3003 if (isPath ==
true) {
3005 sql += TString::Format(
" AND lower(file_path) like '%%%s%%'", strPath.data());
3008 sql += TString::Format(
" "
3009 "where lower(file_path) like '%%%s%%'",
3014 sql +=
" order by period_number,run_number";
3022 shared_ptr<structJobPar> job_par,
3023 shared_ptr<structMacroPar> macro_par,
3024 multimap<
string, unique_ptr<vector<nullString>>>& mapJobName2FileOut,
3027 char* pc_temp =
nullptr;
3030 if (strcmp((
char*)sub_node->name,
"file") == 0) {
3031 bool isStartEvent =
false, isEventCount =
false;
3032 int start_event, event_count;
3033 string sim_db =
"", exp_db =
"", job_input =
"", macro_input =
"", file_input =
"", put_command =
"cp",
3034 put_path =
"", get_command =
"cp", get_path =
"", get_output =
"", clean_path =
"",
3039 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"input");
3040 if (pc_temp !=
nullptr) {
3041 nstr_file_in.
SetValue((
string)pc_temp);
3045 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"output");
3046 if (pc_temp !=
nullptr) {
3047 nstr_file_out.
SetValue((
string)pc_temp);
3052 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"start_event");
3053 if (pc_temp !=
nullptr) {
3054 start_event = atoi(pc_temp);
3056 isStartEvent =
true;
3060 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"event_count");
3061 if (pc_temp !=
nullptr) {
3063 info_message(
"ERROR nica-scheduler$ Event count must be a number!", 1);
3067 event_count = atoi(pc_temp);
3069 if (event_count < 0) {
3070 info_message(
"ERROR nica-scheduler$ Event count must be a positive number or 0 (for all events)!",
3074 isEventCount =
true;
3081 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"merge");
3083 if (pc_temp !=
nullptr) {
3085 if (strcmp(lower_merge,
"true") == 0)
3087 if (strcmp(lower_merge,
"chain") == 0)
3089 if (strcmp(lower_merge,
"preserve") == 0)
3096 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"parallel_mode");
3097 if (pc_temp !=
nullptr) {
3098 parallel_mode = pc_temp;
3103 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"sim_input");
3104 if (pc_temp !=
nullptr) {
3109 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"exp_input");
3110 if (pc_temp !=
nullptr) {
3115 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"job_input");
3116 if (pc_temp !=
nullptr) {
3117 job_input = pc_temp;
3121 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"macro_input");
3122 if (pc_temp !=
nullptr) {
3123 macro_input = pc_temp;
3127 pc_temp = (
char*)xmlGetProp(sub_node, (
unsigned char*)
"file_input");
3128 if (pc_temp !=
nullptr) {
3129 file_input = pc_temp;
3135 xmlNodePtr sub_file_node = sub_node->children;
3136 while (sub_file_node) {
3138 if (sub_file_node->type == XML_ELEMENT_NODE) {
3140 if (strcmp((
char*)sub_file_node->name,
"put") == 0) {
3142 pc_temp = (
char*)xmlGetProp(sub_file_node, (
unsigned char*)
"command");
3143 if (pc_temp !=
nullptr) {
3144 put_command = pc_temp;
3148 pc_temp = (
char*)xmlGetProp(sub_file_node, (
unsigned char*)
"directory");
3149 if (pc_temp ==
nullptr) {
3150 info_message(
"ERROR nica-scheduler$ There is no 'directory' property for the <put/> tag!",
3152 sub_file_node = sub_file_node->next;
3160 if (strcmp((
char*)sub_file_node->name,
"get") == 0) {
3162 pc_temp = (
char*)xmlGetProp(sub_file_node, (
unsigned char*)
"command");
3163 if (pc_temp !=
nullptr) {
3164 get_command = pc_temp;
3169 pc_temp = (
char*)xmlGetProp(sub_file_node, (
unsigned char*)
"directory");
3170 if (pc_temp ==
nullptr) {
3171 info_message(
"ERROR nica-scheduler$ There is no 'directory' property for the <get/> tag!",
3173 sub_file_node = sub_file_node->next;
3181 if (strcmp((
char*)sub_file_node->name,
"clean") == 0) {
3182 pc_temp = (
char*)xmlGetProp(sub_file_node, (
unsigned char*)
"path");
3183 if (pc_temp !=
nullptr) {
3184 clean_path = pc_temp;
3190 sub_file_node = sub_file_node->next;
3193 vector<string> vecInputFiles;
3195 if (nstr_file_in !=
nullptr) {
3196 string str_file_in = nstr_file_in.
GetValue();
3197 vecInputFiles.push_back(str_file_in);
3200 if ((str_file_in.find(
'?') != std::string::npos) || (str_file_in.find(
'*') != std::string::npos)
3201 || (str_file_in.find(
'+') != std::string::npos))
3203 vecInputFiles =
glob(str_file_in.c_str());
3205 cout <<
"DEBUG nica-scheduler$ File count in the regular expression: " << vecInputFiles.size()
3211 if ((sim_db !=
"") || (exp_db !=
"")) {
3212 TString sql, strConnection =
"pgsql://" + (TString)job_par->dbSettings.db_host +
"/"
3213 + (TString)job_par->dbSettings.db_name;
3218 TSQLServer* pSQLServer = TSQLServer::Connect(strConnection, job_par->dbSettings.db_username.c_str(),
3219 job_par->dbSettings.db_password.c_str());
3220 if (pSQLServer == 0x00) {
3221 info_message(
"ERROR nica-scheduler$ Connection to the database was not established!", 1);
3226 cout <<
"DEBUG nica-scheduler$ SQL query: " << sql << endl;
3228 TSQLResult* res = pSQLServer->Query(sql);
3230 info_message(
"ERROR nica-scheduler$ An error was occurred during the SQL query!", 1);
3233 int nrows = res->GetRowCount();
3235 info_message(
"nica-scheduler$ WARNING: there are no records for these parameters", 1);
3238 while ((row = res->Next()) !=
nullptr) {
3239 string cur_file_path((
char*)row->GetField(0));
3240#ifdef MICC_NO_MANAGEMENT
3241 if ((simDB !=
"") && (batch_com.
run_command ==
"sbatch"))
3244 vecInputFiles.push_back(cur_file_path);
3254 if (job_input !=
"") {
3256 auto it = mapJobName2FileOut.find(job_input);
3257 if (it == mapJobName2FileOut.end())
3258 cout <<
"nica-scheduler$ WARNING: job (output list) was not found: " << job_input << endl;
3260 vector<nullString>* vec_string = it->second.get();
3261 for (vector<nullString>::iterator iter_output = vec_string->begin();
3262 iter_output != vec_string->end(); iter_output++)
3263 vecInputFiles.push_back((*iter_output).GetValue());
3268 if (macro_input !=
"") {
3270 bool is_found =
false;
3271 for (
const auto& iter : job_par->vecMacros) {
3274 for (
const auto& iter_output : prev_macro->
vecFiles) {
3275 if (iter_output.get()->nstrFileOut.isNull())
3277 vecInputFiles.push_back(iter_output.get()->nstrFileOut.GetValue());
3286 cout <<
"nica-scheduler$ WARNING: macro (output list) was not found: " << macro_input << endl;
3290 if (file_input !=
"") {
3293 ifstream listFile(str_file_input.c_str());
3296 while (getline(listFile, list_line))
3297 vecInputFiles.push_back(list_line);
3300 info_message(
"ERROR nica-scheduler$ Can not open text file with file input list!", 1);
3304 bool isFileFound =
false;
3305 for (
unsigned int i = 0;
i < vecInputFiles.size();
i++) {
3306 unique_ptr<structFilePar> filePar = make_unique<structFilePar>();
3308 filePar->strFileIn =
trim(vecInputFiles[
i]);
3310 if (nstr_file_out !=
nullptr)
3315 filePar->ptrStartEvent = make_unique<int>(start_event);
3318 filePar->ptrEventCount = make_unique<int>(event_count);
3320 if (parallel_mode !=
"")
3321 filePar->strParallelMode = parallel_mode;
3323 filePar->iMerge = iMerge;
3325 if (put_path !=
"") {
3326 filePar->strPutCommand = put_command;
3327 filePar->strPutPath =
3329 filePar->strPutPath =
3333 if (get_path !=
"") {
3334 filePar->strGetCommand = get_command;
3335 filePar->strGetPath =
3337 if (filePar->nstrFileOut ==
nullptr) {
3338 info_message(
"ERROR nica-scheduler$ <get> must employ output file path, but it is not set!", 1);
3341 filePar->strGetPath =
3345 if (clean_path !=
"") {
3347 for (
const string& cur_clean_path : vec_clean_path)
3348 filePar->vecCleanPath.push_back(
3352 macro_par->vecFiles.push_back(std::move(filePar));
3357 info_message(
"ERROR nica-scheduler$ No input files were found (possible tags: "
3358 "<input>, <file_input>, <sim_input>, <exp_input>, <job_input>, <macro_input>!",
3364 sub_node = sub_node->next;