14from itertools
import chain
15from shutil
import copyfile
18except ImportError
as e:
19 if "telnet" not in e.msg:
22mail_address =
"gertsen@jinr.ru"
23logging.getLogger().setLevel(logging.INFO)
28function_file_path = os.path.dirname(os.path.realpath(__file__)) +
"/../function_set.py"
29with open(function_file_path,
"rb")
as source_file:
30 code = compile(source_file.read(), function_file_path,
"exec")
34config = json.load(open(
"file_inspector.json"))
36 conn = psycopg2.connect((
"dbname=%s user=%s host=%s password=%s") % (config[
"db_name"], config[
"db_user"], config[
"db_host"], config[
"db_pass"]))
38 logging.error(f
"Inspector Database connection failed: {e}")
42cursor.execute(
"select cluster_name "
45cursor_result = cursor.fetchall()
46cluster_dictionary = list(chain(*cursor_result))
47storage_dictionary_expanded = [
"*"]
48for row
in cluster_dictionary:
49 storage_dictionary_expanded.append(row+
":"+
"*")
50cursor.execute(
"select cluster_name, storage_name "
53cursor_result = cursor.fetchall()
54storage_dictionary = []
55for row
in cursor_result:
56 storage_dictionary.append(row[0]+
":"+row[1])
57 storage_dictionary_expanded.append(row[0]+
":"+row[1])
59cnt_all_files = cnt_found_catalogue = cnt_fileinfo_error = cnt_mismatched_files = cnt_update_info = cnt_add_catalogue = cnt_missed_files = cnt_deleted_files = 0
64 parser = argparse.ArgumentParser(
65 description=
'Script for checking integrity of experiment files on specified storages\
66 For more information execute it with --help option'
72 help=
'cluster name, where the script is started',
73 choices=cluster_dictionary,
80 help=
'storage to inspect as "cluster_name:storage_name"',
81 choices=storage_dictionary,
88 help='Rewrite existing destination file if exists but has another size or file checksum'
93 help='Remove destination file if there is no corresponding source file'
100 help=
'storage name, whose check is added to CRON',
101 choices=storage_dictionary_expanded,
105 args = parser.parse_args()
106 start_cluster, inspect_storage, storage_2cron_address = args.start, args.inspect, args.add_cron
107 if start_cluster
is not None:
108 start_cluster = start_cluster[0]
109 if inspect_storage
is not None:
110 inspect_storage = inspect_storage[0]
111 if storage_2cron_address
is not None:
112 storage_2cron_address = storage_2cron_address[0]
114 return start_cluster,inspect_storage,storage_2cron_address
116def add_failed_check(inspection_check_id, file_guid, error_type, error_details, conn):
117 cursor = conn.cursor()
118 logging.info(f
'Inserting information on failed check #{inspection_check_id} for FGUID {file_guid}: error type = {error_type} (details: {error_details})\n')
119 error_details = error_details.replace(
'\'',
'"')
120 cursor.execute(
"insert into failed_check(check_id, file_guid, error_type_id, error_details) "
121 f
"values ({inspection_check_id}, %s, {error_type}, '{error_details}')", (file_guid,))
125def transfer_file(source_cluster, source_storage, source_file_path, source_xrootd_url, source_file_size, source_file_checksum,
126 destination_cluster, destination_storage, destination_file_path, destination_xrootd_url,
127 transfer_protocol_code, source_ssh_prefix, destination_ssh_prefix, start_tmp_dir, conn):
128 logging.info(f
'Copying source file "{source_file_path}" at "{source_cluster}:{source_storage}" to the destination path "{destination_file_path}" at "{destination_cluster}:{destination_storage}"')
138 if transfer_protocol_code == 0:
139 destination_dir = os.path.dirname(destination_file_path)
140 if not os.path.exists(destination_dir):
141 os.makedirs(destination_dir)
143 copyfile(source_file_path, destination_file_path)
144 except Exception
as e:
145 logging.error(f
"{source_file_path}: copying the local file to {destination_file_path} failed: {e}")
147 elif transfer_protocol_code == 1:
148 if cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix,
'') ==
False:
150 elif transfer_protocol_code == 2:
151 if cp_file_xrd(source_file_path, destination_file_path, source_xrootd_url,
'') ==
False:
153 elif transfer_protocol_code == 3:
154 if cp_file_eos(source_file_path, destination_file_path, source_xrootd_url,
'') ==
False:
156 elif transfer_protocol_code == 4:
157 if cp_file_ssh(source_file_path, destination_file_path,
'', destination_ssh_prefix) ==
False:
159 elif transfer_protocol_code == 5:
160 if cp_file_xrd(source_file_path, destination_file_path,
'', destination_xrootd_url) ==
False:
162 elif transfer_protocol_code == 6:
163 if cp_file_eos(source_file_path, destination_file_path,
'', destination_xrootd_url) ==
False:
165 elif transfer_protocol_code == 7:
166 if cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, destination_ssh_prefix) ==
False:
168 elif transfer_protocol_code == 8:
169 if cp_file_xrd(source_file_path, destination_file_path, source_xrootd_url, destination_xrootd_url) ==
False:
171 elif transfer_protocol_code == 9:
172 if cp_file_eos(source_file_path, destination_file_path, source_xrootd_url, destination_xrootd_url) ==
False:
174 elif transfer_protocol_code == 10:
175 connection_ssh_prefix = destination_ssh_prefix
if destination_ssh_prefix !=
"" else source_ssh_prefix
176 cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, 0,
'')
177 elif transfer_protocol_code == 11:
178 if source_ssh_prefix ==
'' or destination_ssh_prefix ==
'':
179 logging.error(
"ssh copy to local through SSH has no SSH information")
181 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 1, source_ssh_prefix,
True)
182 elif transfer_protocol_code == 12:
183 if destination_ssh_prefix !=
'':
184 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 2, source_xrootd_url,
True)
185 elif source_ssh_prefix !=
'':
186 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 2, destination_xrootd_url,
False)
188 logging.error(
"xrd copy to local through SSH has no SSH information")
190 elif transfer_protocol_code == 13:
191 if destination_ssh_prefix !=
'':
192 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 3, source_xrootd_url,
True)
193 elif source_ssh_prefix !=
'':
194 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 3, destination_xrootd_url,
False)
196 logging.error(
"eos copy to local through SSH has no SSH information")
198 elif transfer_protocol_code == 20:
199 connection_ssh_prefix = source_ssh_prefix
if source_ssh_prefix !=
"" else destination_ssh_prefix
200 cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, 0,
'')
201 elif transfer_protocol_code == 21:
202 if source_ssh_prefix ==
'' or destination_ssh_prefix ==
'':
203 logging.error(
"ssh copy to local through SSH has no SSH information")
205 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 1, destination_ssh_prefix,
False)
206 elif transfer_protocol_code == 22:
207 if source_ssh_prefix !=
'':
208 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 2, destination_xrootd_url,
False)
209 elif destination_ssh_prefix !=
'':
210 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 2, source_xrootd_url,
True)
212 logging.error(
"xrd copy from local through SSH has no SSH information")
214 elif transfer_protocol_code == 23:
215 if source_ssh_prefix !=
'':
216 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 3, destination_xrootd_url,
False)
217 elif destination_ssh_prefix !=
'':
218 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 3, source_xrootd_url,
True)
220 logging.error(
"eos copy from local through SSH has no SSH information")
223 logging.error(f
"the transfer protocol (#{transfer_protocol_code}) is not supported")
230 check_protocol_type = 0
231 if transfer_protocol_code == 4
or transfer_protocol_code == 7
or transfer_protocol_code == 10
or transfer_protocol_code == 11:
232 check_protocol_type = 1
233 if transfer_protocol_code == 5
or transfer_protocol_code == 8: check_protocol_type = 2
234 if transfer_protocol_code == 6
or transfer_protocol_code == 9: check_protocol_type = 3
236 destination_file_size, destination_file_checksum, _, _ = get_file_info(destination_file_path, destination_xrootd_url, check_protocol_type, destination_ssh_prefix, start_tmp_dir, 1)
237 if destination_file_size
is None or destination_file_checksum ==
'':
240 if destination_file_size != source_file_size:
241 logging.error(f
"{source_file_path}: transferring the file failed: destination and source file sizes are different: {destination_file_size} <> {source_file_size}")
243 if (destination_file_checksum != source_file_checksum):
244 logging.error(f
"{source_file_path}: transferring the file failed: destination and source file checksums are different: {destination_file_checksum} <> {source_file_checksum}")
247 logging.info(
'The source file has been successfully copied')
249 logging.debug(f
'Inserting a record for the transferred file with path = "{destination_file_path}", file size = {destination_file_checksum}, file_checksum = {destination_file_checksum}')
250 cursor = conn.cursor()
251 cursor.execute(
"insert into file_catalogue(cluster_name, storage_name, file_path, file_size, file_checksum) "
252 f
"values ('{destination_cluster}', '{destination_storage}', '{destination_file_path}', {destination_file_size}, '{destination_file_checksum}')")
254 logging.info(
"Information on the transferred file has been successfullly added to the database\n")
258def check_files(file_list, catalogue_files, cluster_name, storage_name, xrootd_url, access_protocol, ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id):
259 for cur_file
in file_list:
260 logging.debug(f
"Current check: '{cur_file}' file")
264 for i, file_record
in enumerate(catalogue_files):
265 catalogue_file_guid, catalogue_file_path, _, _, _, _ = file_record
266 if catalogue_file_path == cur_file:
267 catalogue_matches[i] = 1
268 cnt_found_catalogue += 1
275 if file_record
is not None:
276 logging.debug(
'The current file is present within the File Catalogue')
277 catalogue_file_guid, catalogue_file_path, catalogue_file_size, catalogue_file_checksum, catalogue_replica, catalogue_file_matched = file_record
280 if full_check
or (catalogue_file_checksum
is None):
282 elif xrootd_url
is not None:
284 file_size, file_checksum, retcode, error_message = get_file_info(cur_file, xrootd_url, access_protocol, ssh_prefix, start_tmp_dir, match_checksum)
286 if retcode != 0
or file_size
is None or file_checksum
is None:
287 cnt_fileinfo_error += 1
288 if retcode != 0: add_failed_check(inspection_check_id, catalogue_file_guid, retcode, error_message, conn)
290 logging.error(f
"{cur_file}: critical error occured while accessing the file (retcode = {retcode}, file_size = {file_size}, file_checksum = {file_checksum}\n")
293 cursor = conn.cursor()
295 if catalogue_file_size
is None:
296 cursor.execute(f
"update file_catalogue set file_size={file_size} where file_guid={catalogue_file_guid}")
298 logging.debug(f
'The size of the file has been updated in the catalogue: {cur_file}')
301 if file_size != catalogue_file_size:
302 cnt_mismatched_files += 1
303 logging.error(f
"{cur_file}: the file size is different from the catalogue value: {file_size} <> {catalogue_file_size}")
306 logging.debug(f
"The file size is equal to the catalogue value: {file_size}")
308 if match_checksum != 0:
309 if catalogue_file_checksum
is None:
310 add_failed_check(inspection_check_id, catalogue_file_guid, 3,
"", conn)
311 cursor.execute(f
"update file_catalogue set file_checksum='{file_checksum}' where file_guid={catalogue_file_guid}")
313 logging.debug(
'The checksum of the file has been updated in the catalogue\n')
316 if file_checksum != catalogue_file_checksum:
317 cnt_mismatched_files += 1
318 logging.error(f
"{cur_file}: the checksum of the file is different from the catalogue value ({file_checksum} <> {catalogue_file_checksum})")
319 add_failed_check(inspection_check_id, catalogue_file_guid, 4,
"", conn)
322 logging.debug(f
"The file checksum is equal to the catalogue value: {file_checksum}\n")
325 logging.info(f
"Adding file '{cur_file}' to the database")
327 file_size, file_checksum, retcode, error_message = get_file_info(cur_file, xrootd_url, access_protocol, ssh_prefix, start_tmp_dir, 2)
328 if retcode != 0
or file_size
is None or file_checksum
is None:
329 cnt_fileinfo_error += 1
330 if retcode != 0: add_failed_check(inspection_check_id,
None, retcode, error_message, conn)
332 logging.error(f
"{cur_file}: critical error occured while accessing the file (retcode = {retcode}, file_size = {file_size}, file_checksum = {file_checksum}\n")
336 xrd_file_checksum =
''
337 if xrootd_url
is not None:
339 xrd_file_checksum = get_file_xrdsum(cur_file, xrootd_url)
340 except Exception
as e:
341 cnt_fileinfo_error += 1
342 add_failed_check(inspection_check_id,
None, 2, str(e), conn)
344 if file_checksum != xrd_file_checksum:
345 cnt_mismatched_files += 1
346 logging.error(f
"{cur_file}: the сhecksum of the file is different from the XRootD storage value ({file_checksum} <> {xrd_file_checksum})")
347 add_failed_check(inspection_check_id,
None, 4, f
"{cur_file}: the checksum of the file is different ({file_checksum} <> {xrd_file_checksum})", conn)
350 logging.debug(f
'Inserting a new file record with file path = "{cur_file}", file size = {file_size}, file_checksum = {file_checksum}')
351 cursor = conn.cursor()
352 cursor.execute(
"insert into file_catalogue(cluster_name, storage_name, file_path, file_size, file_checksum) "
353 f
"values ('{cluster_name}', '{storage_name}', '{cur_file}', {file_size}, '{file_checksum}')")
355 cnt_add_catalogue += 1
356 logging.info(
"Information on the current file has been successfullly added to the database\n")
368def define_transfer_protocol(so_transfer_protocol, si_transfer_protocol, oi_transfer_protocol, io_transfer_protocol):
369 transfer_protocol_code = -1
371 if si_transfer_protocol == 0:
372 if io_transfer_protocol
is not None:
373 return io_transfer_protocol
374 elif so_transfer_protocol == 0:
375 if oi_transfer_protocol
is not None:
376 if oi_transfer_protocol == 0:
379 return 4 + oi_transfer_protocol
381 if si_transfer_protocol > 1
and so_transfer_protocol > 1:
382 if si_transfer_protocol == 3
and so_transfer_protocol == 3:
387 if si_transfer_protocol == 1:
388 if so_transfer_protocol == 1:
390 elif io_transfer_protocol
is not None:
391 return 10 + io_transfer_protocol
393 if so_transfer_protocol == 1:
394 if oi_transfer_protocol
is not None:
395 return 20 + oi_transfer_protocol
397 return transfer_protocol_code
400def compare_storages(inspect_cluster, inspect_storage, inspect_storage_path, inspect_xrootd_url,
401 origin_cluster, origin_storage, origin_storage_path, origin_xrootd_url, replica_flags,
402 start_cluster, start_tmp_dir, si_check_protocol, si_transfer_protocol, si_ssh_prefix, conn):
403 logging.debug(f
'Compare replica inspected storage "{inspect_cluster}:{inspect_storage}" and origin (source) storage "{replica_cluster}:{replica_storage}" for inspected path "{inspect_storage_path}" vs source path "{replica_storage_path}"')
404 cursor = conn.cursor()
406 add_file = (
'a' in replica_flags)
407 delete_file = (
'd' in replica_flags)
408 update_file = (
'u' in replica_flags)
410 transfer_protocol_code = -1
411 if add_file
or update_file:
413 cursor.execute(
"select check_protocol, transfer_protocol, ssh_prefix "
414 "from storage_communication "
415 f
"where source_cluster_name='{inspect_cluster}' and destination_cluster_name='{origin_cluster}' and destination_storage_name='{origin_storage}'")
417 io_check_protocol, io_transfer_protocol, io_ssh_prefix = cursor.fetchone()
419 cursor.execute(
"select check_protocol, transfer_protocol, ssh_prefix "
420 "from storage_communication "
421 f
"where source_cluster_name='{origin_cluster}' and destination_cluster_name='{inspect_cluster}' and destination_storage_name='{inspect_storage}'")
423 oi_check_protocol, oi_transfer_protocol, oi_ssh_prefix = cursor.fetchone()
425 cursor.execute(
"select check_protocol, transfer_protocol, ssh_prefix "
426 "from storage_communication "
427 f
"where source_cluster_name='{start_cluster}' and destination_cluster_name='{origin_cluster}' and destination_storage_name='{origin_storage}'")
429 so_check_protocol, so_transfer_protocol, so_ssh_prefix = cursor.fetchone()
430 transfer_protocol_code = define_transfer_protocol(so_transfer_protocol, si_transfer_protocol, oi_transfer_protocol, io_transfer_protocol)
431 if transfer_protocol_code == -1:
432 logging.error(f
"no communication protocol is defined to transfer files between '{origin_cluster}:{origin_storage}' and '{inspect_cluster}:{inspect_storage}' from '{start_cluster}'. "
433 "Please, specify it in the database")
437 cursor.execute(
"select file_guid, file_path, file_size, file_checksum, replica_validity, 0 "
438 "from file_catalogue "
439 f
"where cluster_name='{origin_cluster}' and storage_name='{origin_storage}'")
441 origin_files = cursor.fetchall()
444 cursor.execute(
"select file_guid, file_path, file_size, file_checksum, replica_validity, 0 "
445 "from file_catalogue "
446 f
"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}'")
448 inspect_files = cursor.fetchall()
451 file_matches = [-1
for i
in range(len(inspect_files))]
453 for iter_origin, cur_origin_file
in enumerate(origin_files):
454 origin_file_guid, origin_file_path, origin_file_size, origin_file_checksum, origin_replica, origin_file_matched = cur_origin_file
456 logging.debug(f
'Current check for origin file: "{origin_file_path}"')
457 origin_file_end = origin_file_path.replace(origin_storage_path,
'')
460 for iter_inspect, cur_inspect_file
in enumerate(inspect_files):
461 inspect_file_guid, inspect_file_path, inspect_file_size, inspect_file_checksum, inspect_replica, inspect_file_matched = cur_inspect_file
462 if inspect_file_path.endswith(origin_file_end):
463 file_matches[iter_inspect] = iter_origin
468 new_inspect_file_path = inspect_storage_path + origin_file_end
469 logging.info(f
"'{new_inspect_file_path}' file is missing at the inspected storage")
471 transfer_file(origin_cluster, origin_storage, origin_file_path, origin_xrootd_url, origin_file_size, origin_file_checksum,
472 inspect_cluster, inspect_storage, new_inspect_file_path, inspect_xrootd_url,
473 transfer_protocol_code, so_ssh_prefix, si_ssh_prefix, start_tmp_dir, conn)
476 inspect_file_guid, inspect_file_path, inspect_file_size, inspect_file_checksum, inspect_replica, inspect_file_matched = cur_inspect_file
478 if inspect_file_size != origin_file_size:
479 if inspect_file_size
is None:
480 logging.error(f
"file '{inspect_file_size}' on '{inspect_storage}' storage has no information on the size")
482 logging.error(f
"the source ({origin_file_path}) and inspected ({inspect_file_size}) files have different sizes: {origin_file_size} <> {inspect_file_size}")
485 logging.info(f
"The inspected file at '{inspect_storage}' storage has an incorrect size and will be replaced")
487 transfer_file(origin_cluster, origin_storage, origin_file_path, origin_xrootd_url, origin_file_size, origin_file_checksum,
488 inspect_cluster, inspect_storage, new_inspect_file_path, inspect_xrootd_url,
489 transfer_protocol_code, so_ssh_prefix, si_ssh_prefix, start_tmp_dir, conn)
492 if inspect_file_checksum != origin_file_checksum:
493 if inspect_file_checksum
is None:
494 logging.error(f
"file '{inspect_file_path}' on '{inspect_storage}' storage has no information on the checksum")
496 logging.error(f
"the source ({origin_file_path}) and inspected ({inspect_file_path}) files have different checksum: {origin_file_checksum} <> {inspect_file_checksum}")
499 logging.info(f
"The inspected file at '{inspect_storage}' storage has an incorrect checksum and will be replaced")
501 transfer_file(origin_cluster, origin_storage, origin_file_path, origin_xrootd_url, origin_file_size, origin_file_checksum,
502 inspect_cluster, inspect_storage, new_inspect_file_path, inspect_xrootd_url,
503 transfer_protocol_code, so_ssh_prefix, si_ssh_prefix, start_tmp_dir, conn)
506 logging.debug(
"The source and destination file are fully matched")
508 logging.info(f
"Comparison of inspected files on the '{inspect_storage}' storage with the origin '{origin_storage}' storage finished")
512 for iter_inspect, cur_inspect_file
in enumerate(inspect_files):
513 inspect_file_guid, inspect_file_path, inspect_file_size, inspect_file_checksum, inspect_replica, inspect_file_matched = cur_inspect_file
514 if file_matches[iter_inspect] == -1:
515 logging.warning(f
"{inspect_file_path}: the inspected file has no corresponding source file")
518 cursor.execute(f
"delete from file_catalogue where file_guid = {inspect_file_guid}")
520 logging.info(
"The destination file with no corresponding source file has been deleted")
523def send_email(from_address, mail_address, subject, body):
526 command = f
"echo '{body}' | mail -s '{subject}' -r '{from_address}' '{mail_address}'"
527 result = subprocess.run(command, shell=
True, check=
True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
528 print(
"Email with summary results sent successfully")
529 except subprocess.CalledProcessError
as e:
530 print(f
"Failed to send email: {e.stderr.decode()}")
532 print(
"Trying to send via Telnet...")
533 with telnetlib.Telnet(
'mail.jinr.ru', 25)
as tn:
535 tn.write(b
"HELO idev.jinr.ru\n")
537 tn.write(f
"MAIL FROM: {from_address}\n".encode())
539 tn.write(f
"RCPT TO: <{mail_address}>\n".encode())
543 tn.write(body.encode())
545 tn.write(b
"\r\n.\r\n")
548 telnet_answer = tn.read_all()
549 if "Error" in telnet_answer:
550 print(
"Sending email with summary results failed")
552 print(
"Email with summary results sent via Telnet")
557start_cluster, inspect_storage_address, storage_2cron_address = argument_parser()
560if start_cluster
is None:
561 cluter_host = socket.gethostname()
562 cursor.execute(
"select cluster_name "
564 f
"where starts_with('{cluter_host}', node_prefix)")
566 if cursor.rowcount != 1:
567 logging.error(f
"the start hostname ({cluter_host}) does not match the prefix of any of the clusters")
569 (start_cluster,) = cursor.fetchone()
572cursor.execute(
"select temp_dir, ssh_prefix "
574 f
"where cluster_name='{start_cluster}'")
576start_tmp_dir, start_ssh_prefix = cursor.fetchone()
577if start_tmp_dir
is None:
578 logging.error(f
"no start cluster found in the database for name: {start_cluster}")
582if storage_2cron_address
is not None:
583 current_script_path = os.path.realpath(__file__)
585 if storage_2cron_address ==
"*":
586 storage_2cron_cluster =
"*"
587 storage_2cron_storage =
"*"
589 storage_2cron_array = storage_2cron_address.split(
":")
590 storage_2cron_cluster = storage_2cron_array[0]
591 storage_2cron_storage = storage_2cron_array[1]
594 storage_request =
"select s.check_interval,s.cluster_name,s.storage_name from storage_ s "
595 if storage_2cron_cluster !=
"*":
596 storage_request += f
"where s.cluster_name='{storage_2cron_cluster}'"
597 if storage_2cron_storage !=
"*":
598 storage_request += f
" and s.storage_name='{storage_2cron_storage}'"
600 cursor.execute(storage_request)
602 storage_2cron_list = cursor.fetchall()
603 if not storage_2cron_list:
604 logging.error(f
"{inspect_storage_address}: no storage found in the database to add to the CRON")
607 for storage_2cron_element
in storage_2cron_list:
608 process_output, process_error, process_returncode = get_process_full(f
'crontab -l | {{ cat; echo "* * */{storage_2cron_element[0]} * * python3 {current_script_path} '
609 f
'-s {start_cluster} -i {storage_2cron_element[1]}:{storage_2cron_element[2]}"; }} | crontab -' )
610 if process_returncode != 0:
611 print(f
'Error occured while adding job to UNIX CRON...\nOUTPUT: \n{process_output}\nERROR: \n{process_error}')
612 exit(process_returncode)
615inspect_storage_array = inspect_storage_address.split(
":")
616inspect_cluster = inspect_storage_array[0]
617inspect_storage = inspect_storage_array[1]
619inspection_starttime = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
620logging.info(f
"Script for checking integrity of experiment files on the '{inspect_storage_address}' storage started")
623cursor.execute(
"select s.storage_path, s.storage_type, s.xrootd_url, s.check_interval, s.full_check, s.check_flags, s.replica_cluster, s.replica_storage, s.replica_flags, c.ssh_prefix "
624 "from storage_ s join cluster_ c on s.cluster_name = c.cluster_name "
625 f
"where s.cluster_name='{inspect_cluster}' and s.storage_name='{inspect_storage}'")
627inspect_storage_path, inspect_storage_type, inspect_xrootd_url, inspect_storage_interval, inspect_storage_full, check_flags, replica_cluster, replica_storage, replica_flags, inspect_ssh_prefix = cursor.fetchone()
628if inspect_storage_path
is None:
629 logging.error(f
"{inspect_storage_address}: no storage found in the database")
634cursor.execute(
"select check_protocol, transfer_protocol "
635 "from storage_communication "
636 f
"where source_cluster='{start_cluster}' and destination_cluster='{inspect_cluster}' and destination_storage='{inspect_storage}'")
638if cursor.rowcount < 1:
639 logging.error(f
"no communication protocol is defined between '{start_cluster}' and '{inspect_storage_address}'. Please, specify it in the database")
641si_check_protocol, si_transfer_protocol = cursor.fetchone()
645cursor.execute(
"select count(*) as check_count "
646 "from storage_check "
647 f
"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}' and start_datetime > (select max(start_datetime) "
648 "from storage_check "
649 f
"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}' and full_check)")
651(fast_check_count,) = cursor.fetchone()
652if fast_check_count >= (inspect_storage_full-1):
656cursor.execute(
"select file_guid, file_path, file_size, file_checksum, replica_validity, 0 "
657 "from file_catalogue "
658 f
"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}'")
660catalogue_files = cursor.fetchall()
661catalogue_matches = []
662catalogue_matches = [0
for i
in range(len(catalogue_files))]
665catalogue_file_count = len(catalogue_files)
666cursor.execute(
"insert into storage_check(cluster_name, storage_name, start_datetime, file_count, full_check) "
667 f
"values ('{inspect_cluster}', '{inspect_storage}', '{inspection_starttime}', {catalogue_file_count}, '{full_check}') RETURNING check_id")
668inspection_check_id = cursor.fetchone()[0]
672if si_check_protocol == 0:
674 all_files = glob.glob(inspect_storage_path +
'/**/*', recursive=
True)
675 all_files = [f
for f
in all_files
if os.path.isfile(f)]
678 logging.error(f
"{inspect_storage_address}: there are no files on the storage")
680 check_files(all_files, catalogue_files, inspect_cluster, inspect_storage, inspect_xrootd_url, 0, inspect_ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id)
683elif si_check_protocol == 1:
686 process_output, process_error, returncode = get_process_full(f
"tar cf - . | ssh {inspect_ssh_prefix} 'D=`mktemp -d`; tar xf - -C $D; $D/ssh_run.sh \"-s {inspect_cluster}\" \"-i {inspect_cluster}:{inspect_storage}\"'")
687 process_output = process_output.strip()
689 logging.error(f
'{inspect_storage_address}: execution via SSH failed for the destination storage: {returncode} - {process_error}')
695elif si_check_protocol == 2:
696 xrd_file_list = get_files_xrd(inspect_storage_path, inspect_xrootd_url)
697 if not xrd_file_list:
698 logging.error(f
"{inspect_storage_address}: there are no files on the storage")
700 check_files(xrd_file_list, catalogue_files, inspect_cluster, inspect_storage, inspect_xrootd_url, 2, inspect_ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id)
703elif si_check_protocol == 3:
704 eos_file_list = get_files_eos(inspect_storage_path, inspect_xrootd_url)
705 if not eos_file_list:
706 logging.error(f
"{inspect_storage_address}: there are no files on the storage")
708 check_files(eos_file_list, catalogue_files, inspect_cluster, inspect_storage, inspect_xrootd_url, 3, inspect_ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id)
711delete_absent_files = (
"d" in check_flags)
712for i,file_record
in enumerate(catalogue_files):
713 if catalogue_matches[i] == 0:
714 catalogue_file_guid, catalogue_file_path, _, _, _, _ = file_record
715 logging.warning(f
"{catalogue_file_path}: the record in the File Catalogue has no corresponding file on the storage element")
716 cnt_missed_files += 1
717 add_failed_check(inspection_check_id, catalogue_file_guid, 1,
'', conn)
719 if delete_absent_files:
720 cursor.execute(f
"delete from file_catalogue where file_guid = {catalogue_file_guid}")
722 cnt_deleted_files += 1
723 logging.info(
"The record of the absent file in the File Catalogue has been deleted")
726if replica_cluster
is not None:
727 if "e" in replica_flags:
729 cursor.execute(
"select s.storage_path, s.xrootd_url "
731 f
"where s.cluster_name = '{replica_cluster}' and s.storage_name = '{replica_storage}'")
733 replica_storage_path, replica_xrootd_url = cursor.fetchone()
734 compare_storages(inspect_cluster, inspect_storage, inspect_storage_path, inspect_xrootd_url,
735 replica_cluster, replica_storage, replica_storage_path, replica_xrootd_url, replica_flags,
736 start_cluster, start_tmp_dir, si_check_protocol, si_transfer_protocol, inspect_ssh_prefix, conn)
738inspection_endtime = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
739cursor.execute(f
"update storage_check set end_datetime='{inspection_endtime}' where check_id={inspection_check_id}")
742cursor.execute(
"select count(*) "
744 f
"where check_id={inspection_check_id}")
745found_errors = cursor.fetchone()[0]
751from_address =
"<inspector@jinr.ru>"
752subject =
"File Inspector - Integrity Results"
753body = (f
"File inspection on '{inspect_storage_address}' storage completed\n"
754 f
"Operation time: {inspection_starttime} - {inspection_endtime}\n"
755 f
"Number of errors: {found_errors}\n")
756send_email(from_address, mail_address, subject, body)
758logging.info(f
"Script for checking integrity of experiment files on the '{inspect_storage_address}' storage finished")
759logging.info(
"Statistics:")
760logging.info(f
" All checked files - {cnt_all_files}, Found files in the Catalogue - {cnt_found_catalogue}")
761logging.info(f
" Missed files - {cnt_missed_files}, Deleted files from the Catalogue - {cnt_deleted_files}")
762logging.info(f
" Get file info error - {cnt_fileinfo_error}\n Updated records (file info) - {cnt_update_info}")
763logging.info(f
" Mismatched records - {cnt_mismatched_files}\n Added files to the Catalogue - {cnt_add_catalogue}")