BmnRoot
Loading...
Searching...
No Matches
file_inspector.py
Go to the documentation of this file.
1# This python script is developed for checking integrity of files with experimental and simulated data
2# on all storages: DDC@DAQ - NCX@LHEP - CICC@LIT
3import datetime
4import os
5import sys
6import glob
7import time
8import json
9import logging
10import argparse
11import psycopg2
12import socket
13import subprocess
14from itertools import chain
15from shutil import copyfile
16try:
17 import telnetlib
18except ImportError as e:
19 if "telnet" not in e.msg:
20 raise
21
22mail_address = "gertsen@jinr.ru"
23logging.getLogger().setLevel(logging.INFO)
24
25
26# load script with function set
27#function_file_path = os.environ['VMCWORKDIR'] + "/services/inspection/function_set.py"
28function_file_path = os.path.dirname(os.path.realpath(__file__)) + "/../function_set.py"
29with open(function_file_path, "rb") as source_file:
30 code = compile(source_file.read(), function_file_path, "exec")
31exec(code) #exec(code, globals, locals)
32
33# connect to the File Inspector database
34config = json.load(open("file_inspector.json")) # load File Inspector database configuration from JSON
35try:
36 conn = psycopg2.connect(("dbname=%s user=%s host=%s password=%s") % (config["db_name"], config["db_user"], config["db_host"], config["db_pass"]))
37except Exception as e:
38 logging.error(f"Inspector Database connection failed: {e}")
39 exit(1)
40cursor = conn.cursor()
41# get cluster and storage list for the parameters
42cursor.execute("select cluster_name "
43 "from cluster_")
44conn.commit()
45cursor_result = cursor.fetchall()
46cluster_dictionary = list(chain(*cursor_result))
47storage_dictionary_expanded = ["*"]
48for row in cluster_dictionary:
49 storage_dictionary_expanded.append(row+":"+"*")
50cursor.execute("select cluster_name, storage_name "
51 "from storage_")
52conn.commit()
53cursor_result = cursor.fetchall()
54storage_dictionary = []
55for row in cursor_result:
56 storage_dictionary.append(row[0]+":"+row[1])
57 storage_dictionary_expanded.append(row[0]+":"+row[1])
58# define counters for final statistics
59cnt_all_files = cnt_found_catalogue = cnt_fileinfo_error = cnt_mismatched_files = cnt_update_info = cnt_add_catalogue = cnt_missed_files = cnt_deleted_files = 0
60
61
62# input argument parser
63def argument_parser():
64 parser = argparse.ArgumentParser(
65 description='Script for checking integrity of experiment files on specified storages\
66 For more information execute it with --help option'
67 )
68 parser.add_argument(
69 '--start', '-s',
70 nargs=1,
71 type=str,
72 help='cluster name, where the script is started',
73 choices=cluster_dictionary,
74 required=False
75 )
76 parser.add_argument(
77 '--inspect', '-i',
78 nargs=1,
79 type=str,
80 help='storage to inspect as "cluster_name:storage_name"',
81 choices=storage_dictionary,
82 required=True
83 )
84 """
85 parser.add_argument(
86 '--force', '-f',
87 action='store_true',
88 help='Rewrite existing destination file if exists but has another size or file checksum'
89 )
90 parser.add_argument(
91 '--delete', '-del',
92 action='store_true',
93 help='Remove destination file if there is no corresponding source file'
94 )"""
95
96 parser.add_argument(
97 '--add_cron', '-a',
98 nargs=1,
99 type=str,
100 help='storage name, whose check is added to CRON',
101 choices=storage_dictionary_expanded,
102 required=False
103 )
104
105 args = parser.parse_args()
106 start_cluster, inspect_storage, storage_2cron_address = args.start, args.inspect, args.add_cron
107 if start_cluster is not None:
108 start_cluster = start_cluster[0]
109 if inspect_storage is not None:
110 inspect_storage = inspect_storage[0]
111 if storage_2cron_address is not None:
112 storage_2cron_address = storage_2cron_address[0]
113
114 return start_cluster,inspect_storage,storage_2cron_address
115
116def add_failed_check(inspection_check_id, file_guid, error_type, error_details, conn):
117 cursor = conn.cursor()
118 logging.info(f'Inserting information on failed check #{inspection_check_id} for FGUID {file_guid}: error type = {error_type} (details: {error_details})\n')
119 error_details = error_details.replace('\'', '"') # replace inner ' (single quotes) sybmol with " (double quotes)
120 cursor.execute("insert into failed_check(check_id, file_guid, error_type_id, error_details) "
121 f"values ({inspection_check_id}, %s, {error_type}, '{error_details}')", (file_guid,))
122 conn.commit()
123
124# copy source file to a destination storage element
125def transfer_file(source_cluster, source_storage, source_file_path, source_xrootd_url, source_file_size, source_file_checksum,
126 destination_cluster, destination_storage, destination_file_path, destination_xrootd_url,
127 transfer_protocol_code, source_ssh_prefix, destination_ssh_prefix, start_tmp_dir, conn):
128 logging.info(f'Copying source file "{source_file_path}" at "{source_cluster}:{source_storage}" to the destination path "{destination_file_path}" at "{destination_cluster}:{destination_storage}"')
129 # transfer_protocol_code:
130 # 0 - local copy,
131 # 1 - ssh copy to local, 2 - xrd copy to local, 3 - eos copy to local
132 # 4 - ssh copy from local, 5 - xrd copy from local, 6 - eos copy from local
133 # 7 - full ssh copy (scp), 8 - full xrootd copy, 9 - full eos copy
134 # 10 - local to local copy through SSH connection, 11 - ssh copy to local through SSH conection
135 # 12 - xrd copy to local through SSH connection, 13 - eos copy to local through SSH connection
136 # 20 - local from local copy through SSH connection, 21 - ssh copy from local through SSH connection
137 # 22 - xrd copy from local through SSH connection, 23 - eos copy from local through SSH connection
138 if transfer_protocol_code == 0:
139 destination_dir = os.path.dirname(destination_file_path)
140 if not os.path.exists(destination_dir):
141 os.makedirs(destination_dir)
142 try:
143 copyfile(source_file_path, destination_file_path)
144 except Exception as e:
145 logging.error(f"{source_file_path}: copying the local file to {destination_file_path} failed: {e}")
146 return False
147 elif transfer_protocol_code == 1:
148 if cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, '') == False:
149 return False
150 elif transfer_protocol_code == 2:
151 if cp_file_xrd(source_file_path, destination_file_path, source_xrootd_url, '') == False:
152 return False
153 elif transfer_protocol_code == 3:
154 if cp_file_eos(source_file_path, destination_file_path, source_xrootd_url, '') == False:
155 return False
156 elif transfer_protocol_code == 4:
157 if cp_file_ssh(source_file_path, destination_file_path, '', destination_ssh_prefix) == False:
158 return False
159 elif transfer_protocol_code == 5:
160 if cp_file_xrd(source_file_path, destination_file_path, '', destination_xrootd_url) == False:
161 return False
162 elif transfer_protocol_code == 6:
163 if cp_file_eos(source_file_path, destination_file_path, '', destination_xrootd_url) == False:
164 return False
165 elif transfer_protocol_code == 7:
166 if cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, destination_ssh_prefix) == False:
167 return False
168 elif transfer_protocol_code == 8:
169 if cp_file_xrd(source_file_path, destination_file_path, source_xrootd_url, destination_xrootd_url) == False:
170 return False
171 elif transfer_protocol_code == 9:
172 if cp_file_eos(source_file_path, destination_file_path, source_xrootd_url, destination_xrootd_url) == False:
173 return False
174 elif transfer_protocol_code == 10:
175 connection_ssh_prefix = destination_ssh_prefix if destination_ssh_prefix != "" else source_ssh_prefix
176 cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, 0, '')
177 elif transfer_protocol_code == 11:
178 if source_ssh_prefix == '' or destination_ssh_prefix == '':
179 logging.error("ssh copy to local through SSH has no SSH information")
180 return False
181 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 1, source_ssh_prefix, True)
182 elif transfer_protocol_code == 12:
183 if destination_ssh_prefix != '':
184 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 2, source_xrootd_url, True)
185 elif source_ssh_prefix != '':
186 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 2, destination_xrootd_url, False)
187 else:
188 logging.error("xrd copy to local through SSH has no SSH information")
189 return False
190 elif transfer_protocol_code == 13:
191 if destination_ssh_prefix != '':
192 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 3, source_xrootd_url, True)
193 elif source_ssh_prefix != '':
194 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 3, destination_xrootd_url, False)
195 else:
196 logging.error("eos copy to local through SSH has no SSH information")
197 return False
198 elif transfer_protocol_code == 20:
199 connection_ssh_prefix = source_ssh_prefix if source_ssh_prefix != "" else destination_ssh_prefix
200 cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, 0, '')
201 elif transfer_protocol_code == 21:
202 if source_ssh_prefix == '' or destination_ssh_prefix == '':
203 logging.error("ssh copy to local through SSH has no SSH information")
204 return False
205 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 1, destination_ssh_prefix, False)
206 elif transfer_protocol_code == 22:
207 if source_ssh_prefix != '':
208 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 2, destination_xrootd_url, False)
209 elif destination_ssh_prefix != '':
210 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 2, source_xrootd_url, True)
211 else:
212 logging.error("xrd copy from local through SSH has no SSH information")
213 return False
214 elif transfer_protocol_code == 23:
215 if source_ssh_prefix != '':
216 cp_file_ssh_protocol(source_file_path, destination_file_path, source_ssh_prefix, 3, destination_xrootd_url, False)
217 elif destination_ssh_prefix != '':
218 cp_file_ssh_protocol(source_file_path, destination_file_path, destination_ssh_prefix, 3, source_xrootd_url, True)
219 else:
220 logging.error("eos copy from local through SSH has no SSH information")
221 return False
222 else:
223 logging.error(f"the transfer protocol (#{transfer_protocol_code}) is not supported")
224 return False
225
226 # transfer_protocol_code:
227 # 20 - local from local copy through SSH connection, 21 - ssh copy from local through SSH connection
228 # 22 - xrd copy from local through SSH connection, 23 - eos copy from local through SSH connection
229 # protocol type: 0 - local file; 1 - SSH protocol; 2 - XRootD protocol; 3 - EOS protocol
230 check_protocol_type = 0
231 if transfer_protocol_code == 4 or transfer_protocol_code == 7 or transfer_protocol_code == 10 or transfer_protocol_code == 11:
232 check_protocol_type = 1
233 if transfer_protocol_code == 5 or transfer_protocol_code == 8: check_protocol_type = 2
234 if transfer_protocol_code == 6 or transfer_protocol_code == 9: check_protocol_type = 3
235
236 destination_file_size, destination_file_checksum, _, _ = get_file_info(destination_file_path, destination_xrootd_url, check_protocol_type, destination_ssh_prefix, start_tmp_dir, 1) # fast checksum after transfer
237 if destination_file_size is None or destination_file_checksum == '':
238 return False
239
240 if destination_file_size != source_file_size:
241 logging.error(f"{source_file_path}: transferring the file failed: destination and source file sizes are different: {destination_file_size} <> {source_file_size}")
242 return False
243 if (destination_file_checksum != source_file_checksum):
244 logging.error(f"{source_file_path}: transferring the file failed: destination and source file checksums are different: {destination_file_checksum} <> {source_file_checksum}")
245 return False
246
247 logging.info('The source file has been successfully copied')
248
249 logging.debug(f'Inserting a record for the transferred file with path = "{destination_file_path}", file size = {destination_file_checksum}, file_checksum = {destination_file_checksum}')
250 cursor = conn.cursor()
251 cursor.execute("insert into file_catalogue(cluster_name, storage_name, file_path, file_size, file_checksum) "
252 f"values ('{destination_cluster}', '{destination_storage}', '{destination_file_path}', {destination_file_size}, '{destination_file_checksum}')")
253 conn.commit()
254 logging.info("Information on the transferred file has been successfullly added to the database\n")
255 return True
256
257# checking integrity of files in the file list (comparing with the file catalogue in the database)
258def check_files(file_list, catalogue_files, cluster_name, storage_name, xrootd_url, access_protocol, ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id):
259 for cur_file in file_list:
260 logging.debug(f"Current check: '{cur_file}' file")
261 cnt_all_files += 1
262
263 # searching for a file in the file catalogue with the same path
264 for i, file_record in enumerate(catalogue_files):
265 catalogue_file_guid, catalogue_file_path, _, _, _, _ = file_record
266 if catalogue_file_path == cur_file:
267 catalogue_matches[i] = 1
268 cnt_found_catalogue += 1
269 break
270 # if no corresponding file in the File Catalogue
271 else:
272 file_record = None
273
274 # current file at the storage found in the database (File Catalogue)
275 if file_record is not None:
276 logging.debug('The current file is present within the File Catalogue')
277 catalogue_file_guid, catalogue_file_path, catalogue_file_size, catalogue_file_checksum, catalogue_replica, catalogue_file_matched = file_record
278
279 match_checksum = 0
280 if full_check or (catalogue_file_checksum is None):
281 match_checksum = 2
282 elif xrootd_url is not None:
283 match_checksum = 1
284 file_size, file_checksum, retcode, error_message = get_file_info(cur_file, xrootd_url, access_protocol, ssh_prefix, start_tmp_dir, match_checksum)
285 # error while getting file information, file size or checksum
286 if retcode != 0 or file_size is None or file_checksum is None:
287 cnt_fileinfo_error += 1
288 if retcode != 0: add_failed_check(inspection_check_id, catalogue_file_guid, retcode, error_message, conn)
289 else:
290 logging.error(f"{cur_file}: critical error occured while accessing the file (retcode = {retcode}, file_size = {file_size}, file_checksum = {file_checksum}\n")
291 continue
292
293 cursor = conn.cursor()
294 # compare file size with the catalogue value
295 if catalogue_file_size is None:
296 cursor.execute(f"update file_catalogue set file_size={file_size} where file_guid={catalogue_file_guid}")
297 conn.commit()
298 logging.debug(f'The size of the file has been updated in the catalogue: {cur_file}')
299 cnt_update_info += 1
300 else:
301 if file_size != catalogue_file_size:
302 cnt_mismatched_files += 1
303 logging.error(f"{cur_file}: the file size is different from the catalogue value: {file_size} <> {catalogue_file_size}")
304 continue
305 else:
306 logging.debug(f"The file size is equal to the catalogue value: {file_size}")
307 # compare file checksum with the catalogue value if 'match_checksum' enabled
308 if match_checksum != 0:
309 if catalogue_file_checksum is None:
310 add_failed_check(inspection_check_id, catalogue_file_guid, 3, "", conn)
311 cursor.execute(f"update file_catalogue set file_checksum='{file_checksum}' where file_guid={catalogue_file_guid}")
312 conn.commit()
313 logging.debug('The checksum of the file has been updated in the catalogue\n')
314 cnt_update_info += 1
315 else:
316 if file_checksum != catalogue_file_checksum:
317 cnt_mismatched_files += 1
318 logging.error(f"{cur_file}: the checksum of the file is different from the catalogue value ({file_checksum} <> {catalogue_file_checksum})")
319 add_failed_check(inspection_check_id, catalogue_file_guid, 4, "", conn)
320 continue
321 else:
322 logging.debug(f"The file checksum is equal to the catalogue value: {file_checksum}\n")
323 # current file at the storage not found in the database (File Catalogue)
324 else:
325 logging.info(f"Adding file '{cur_file}' to the database")
326
327 file_size, file_checksum, retcode, error_message = get_file_info(cur_file, xrootd_url, access_protocol, ssh_prefix, start_tmp_dir, 2) # manual checksum calculation when adding to the database
328 if retcode != 0 or file_size is None or file_checksum is None:
329 cnt_fileinfo_error += 1
330 if retcode != 0: add_failed_check(inspection_check_id, None, retcode, error_message, conn)
331 else:
332 logging.error(f"{cur_file}: critical error occured while accessing the file (retcode = {retcode}, file_size = {file_size}, file_checksum = {file_checksum}\n")
333 continue
334
335 # compare the obtained checksum with XRootD checksum for the file (if exists)
336 xrd_file_checksum = ''
337 if xrootd_url is not None:
338 try:
339 xrd_file_checksum = get_file_xrdsum(cur_file, xrootd_url)
340 except Exception as e:
341 cnt_fileinfo_error += 1
342 add_failed_check(inspection_check_id, None, 2, str(e), conn)
343 continue
344 if file_checksum != xrd_file_checksum:
345 cnt_mismatched_files += 1
346 logging.error(f"{cur_file}: the сhecksum of the file is different from the XRootD storage value ({file_checksum} <> {xrd_file_checksum})")
347 add_failed_check(inspection_check_id, None, 4, f"{cur_file}: the checksum of the file is different ({file_checksum} <> {xrd_file_checksum})", conn)
348 continue
349
350 logging.debug(f'Inserting a new file record with file path = "{cur_file}", file size = {file_size}, file_checksum = {file_checksum}')
351 cursor = conn.cursor()
352 cursor.execute("insert into file_catalogue(cluster_name, storage_name, file_path, file_size, file_checksum) "
353 f"values ('{cluster_name}', '{storage_name}', '{cur_file}', {file_size}, '{file_checksum}')")
354 conn.commit()
355 cnt_add_catalogue += 1
356 logging.info("Information on the current file has been successfullly added to the database\n")
357
358# define transfer protocol to update files between source storage and replica one
359# return transfer_protocol_code:
360# 0 - local copy,
361# 1 - ssh copy to local, 2 - xrd copy to local, 3 - eos copy to local
362# 4 - ssh copy from local, 5 - xrd copy from local, 6 - eos copy from local
363# 7 - full ssh copy (scp), 8 - full xrootd copy, 9 - full eos copy
364# 10 - local copy to local through SSH, 11 - ssh copy to local through SSH,
365# 12 - xrd copy to local through SSH, 13 - eos copy to local through SSH
366# 20 - local copy from local through SSH, 21 - ssh copy from local through SSH,
367# 22 - xrd copy from local through SSH, 23 - eos copy from local through SSH
368def define_transfer_protocol(so_transfer_protocol, si_transfer_protocol, oi_transfer_protocol, io_transfer_protocol):
369 transfer_protocol_code = -1
370
371 if si_transfer_protocol == 0: # local copy
372 if io_transfer_protocol is not None:
373 return io_transfer_protocol
374 elif so_transfer_protocol == 0:
375 if oi_transfer_protocol is not None:
376 if oi_transfer_protocol == 0: # local copy
377 return 0
378 else:
379 return 4 + oi_transfer_protocol
380
381 if si_transfer_protocol > 1 and so_transfer_protocol > 1:
382 if si_transfer_protocol == 3 and so_transfer_protocol == 3: # both eos copy
383 return 9
384 else:
385 return 8
386
387 if si_transfer_protocol == 1:
388 if so_transfer_protocol == 1:
389 return 7
390 elif io_transfer_protocol is not None:
391 return 10 + io_transfer_protocol
392
393 if so_transfer_protocol == 1:
394 if oi_transfer_protocol is not None:
395 return 20 + oi_transfer_protocol
396
397 return transfer_protocol_code
398
399# synchronize storages marked as replicas
400def compare_storages(inspect_cluster, inspect_storage, inspect_storage_path, inspect_xrootd_url,
401 origin_cluster, origin_storage, origin_storage_path, origin_xrootd_url, replica_flags,
402 start_cluster, start_tmp_dir, si_check_protocol, si_transfer_protocol, si_ssh_prefix, conn):
403 logging.debug(f'Compare replica inspected storage "{inspect_cluster}:{inspect_storage}" and origin (source) storage "{replica_cluster}:{replica_storage}" for inspected path "{inspect_storage_path}" vs source path "{replica_storage_path}"')
404 cursor = conn.cursor()
405
406 add_file = ('a' in replica_flags)
407 delete_file = ('d' in replica_flags)
408 update_file = ('u' in replica_flags)
409
410 transfer_protocol_code = -1
411 if add_file or update_file:
412 # get check and transfer protocol between inspected cluster and origin (source of the replica) storage
413 cursor.execute("select check_protocol, transfer_protocol, ssh_prefix "
414 "from storage_communication "
415 f"where source_cluster_name='{inspect_cluster}' and destination_cluster_name='{origin_cluster}' and destination_storage_name='{origin_storage}'")
416 conn.commit()
417 io_check_protocol, io_transfer_protocol, io_ssh_prefix = cursor.fetchone()
418 # get check and transfer protocol between origin cluster and inspected storage
419 cursor.execute("select check_protocol, transfer_protocol, ssh_prefix "
420 "from storage_communication "
421 f"where source_cluster_name='{origin_cluster}' and destination_cluster_name='{inspect_cluster}' and destination_storage_name='{inspect_storage}'")
422 conn.commit()
423 oi_check_protocol, oi_transfer_protocol, oi_ssh_prefix = cursor.fetchone()
424 # get check and transfer protocol between start cluster and origin storage
425 cursor.execute("select check_protocol, transfer_protocol, ssh_prefix "
426 "from storage_communication "
427 f"where source_cluster_name='{start_cluster}' and destination_cluster_name='{origin_cluster}' and destination_storage_name='{origin_storage}'")
428 conn.commit()
429 so_check_protocol, so_transfer_protocol, so_ssh_prefix = cursor.fetchone()
430 transfer_protocol_code = define_transfer_protocol(so_transfer_protocol, si_transfer_protocol, oi_transfer_protocol, io_transfer_protocol)
431 if transfer_protocol_code == -1:
432 logging.error(f"no communication protocol is defined to transfer files between '{origin_cluster}:{origin_storage}' and '{inspect_cluster}:{inspect_storage}' from '{start_cluster}'. "
433 "Please, specify it in the database")
434 return
435
436 # get a list of the files on the source of the replica storage from the File Catalogue
437 cursor.execute("select file_guid, file_path, file_size, file_checksum, replica_validity, 0 "
438 "from file_catalogue "
439 f"where cluster_name='{origin_cluster}' and storage_name='{origin_storage}'")
440 conn.commit()
441 origin_files = cursor.fetchall()
442
443 # get a list of the files in the File Fatalogue for the inspected storage
444 cursor.execute("select file_guid, file_path, file_size, file_checksum, replica_validity, 0 "
445 "from file_catalogue "
446 f"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}'")
447 conn.commit()
448 inspect_files = cursor.fetchall()
449
450 file_matches = []
451 file_matches = [-1 for i in range(len(inspect_files))]
452 # cycle matching files between the storage elements using database records
453 for iter_origin, cur_origin_file in enumerate(origin_files):
454 origin_file_guid, origin_file_path, origin_file_size, origin_file_checksum, origin_replica, origin_file_matched = cur_origin_file
455
456 logging.debug(f'Current check for origin file: "{origin_file_path}"')
457 origin_file_end = origin_file_path.replace(origin_storage_path, '')
458
459 # searching for a database record at the inspected storage with the same end of the file path
460 for iter_inspect, cur_inspect_file in enumerate(inspect_files):
461 inspect_file_guid, inspect_file_path, inspect_file_size, inspect_file_checksum, inspect_replica, inspect_file_matched = cur_inspect_file
462 if inspect_file_path.endswith(origin_file_end):
463 file_matches[iter_inspect] = iter_origin
464 break
465 # if no corresponding file on the inspected storage found, then transfer the missing file from the source of the replica
466 else:
467 if add_file:
468 new_inspect_file_path = inspect_storage_path + origin_file_end
469 logging.info(f"'{new_inspect_file_path}' file is missing at the inspected storage")
470 #print(new_inspect_file_path, file=file_list)
471 transfer_file(origin_cluster, origin_storage, origin_file_path, origin_xrootd_url, origin_file_size, origin_file_checksum,
472 inspect_cluster, inspect_storage, new_inspect_file_path, inspect_xrootd_url,
473 transfer_protocol_code, so_ssh_prefix, si_ssh_prefix, start_tmp_dir, conn)
474 continue
475
476 inspect_file_guid, inspect_file_path, inspect_file_size, inspect_file_checksum, inspect_replica, inspect_file_matched = cur_inspect_file
477 # if there is a corresponding file found then check size
478 if inspect_file_size != origin_file_size:
479 if inspect_file_size is None:
480 logging.error(f"file '{inspect_file_size}' on '{inspect_storage}' storage has no information on the size")
481 continue
482 logging.error(f"the source ({origin_file_path}) and inspected ({inspect_file_size}) files have different sizes: {origin_file_size} <> {inspect_file_size}")
483 # if inspected file is different from the origin, then copy/replace the first inspected file if 'update' flag is active
484 if update_file:
485 logging.info(f"The inspected file at '{inspect_storage}' storage has an incorrect size and will be replaced")
486 #print(inspect_file_path, file=file_list)
487 transfer_file(origin_cluster, origin_storage, origin_file_path, origin_xrootd_url, origin_file_size, origin_file_checksum,
488 inspect_cluster, inspect_storage, new_inspect_file_path, inspect_xrootd_url,
489 transfer_protocol_code, so_ssh_prefix, si_ssh_prefix, start_tmp_dir, conn)
490 continue
491 # if there is a corresponding file found then check checksum
492 if inspect_file_checksum != origin_file_checksum:
493 if inspect_file_checksum is None:
494 logging.error(f"file '{inspect_file_path}' on '{inspect_storage}' storage has no information on the checksum")
495 continue
496 logging.error(f"the source ({origin_file_path}) and inspected ({inspect_file_path}) files have different checksum: {origin_file_checksum} <> {inspect_file_checksum}")
497 # if inspected file is different from the origin, then copy/replace the first inspected file if 'update' flag is active
498 if update_file:
499 logging.info(f"The inspected file at '{inspect_storage}' storage has an incorrect checksum and will be replaced")
500 #print(inspect_file_path, file=file_list)
501 transfer_file(origin_cluster, origin_storage, origin_file_path, origin_xrootd_url, origin_file_size, origin_file_checksum,
502 inspect_cluster, inspect_storage, new_inspect_file_path, inspect_xrootd_url,
503 transfer_protocol_code, so_ssh_prefix, si_ssh_prefix, start_tmp_dir, conn)
504 continue
505
506 logging.debug("The source and destination file are fully matched")
507
508 logging.info(f"Comparison of inspected files on the '{inspect_storage}' storage with the origin '{origin_storage}' storage finished")
509
510 # searching for inspected files that have no pair with the source replica
511 #print("", file=file_list)
512 for iter_inspect, cur_inspect_file in enumerate(inspect_files):
513 inspect_file_guid, inspect_file_path, inspect_file_size, inspect_file_checksum, inspect_replica, inspect_file_matched = cur_inspect_file
514 if file_matches[iter_inspect] == -1:
515 logging.warning(f"{inspect_file_path}: the inspected file has no corresponding source file")
516 #print(dest_file_path, file=file_list)
517 if delete_file:
518 cursor.execute(f"delete from file_catalogue where file_guid = {inspect_file_guid}")
519 conn.commit()
520 logging.info("The destination file with no corresponding source file has been deleted")
521
522# send email with results
523def send_email(from_address, mail_address, subject, body):
524 # try send mail via 'mail' command
525 try:
526 command = f"echo '{body}' | mail -s '{subject}' -r '{from_address}' '{mail_address}'"
527 result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
528 print("Email with summary results sent successfully")
529 except subprocess.CalledProcessError as e:
530 print(f"Failed to send email: {e.stderr.decode()}")
531 # try send mail via 'Telnet' command
532 print("Trying to send via Telnet...")
533 with telnetlib.Telnet('mail.jinr.ru', 25) as tn:
534 time.sleep(0.3)
535 tn.write(b"HELO idev.jinr.ru\n")
536 time.sleep(0.3)
537 tn.write(f"MAIL FROM: {from_address}\n".encode())
538 time.sleep(0.3)
539 tn.write(f"RCPT TO: <{mail_address}>\n".encode())
540 time.sleep(0.3)
541 tn.write(b"DATA\n")
542 time.sleep(0.3)
543 tn.write(body.encode())
544 time.sleep(0.3)
545 tn.write(b"\r\n.\r\n")
546 time.sleep(0.3)
547 tn.write(b"quit\n")
548 telnet_answer = tn.read_all()
549 if "Error" in telnet_answer:
550 print("Sending email with summary results failed")
551 return False
552 print("Email with summary results sent via Telnet")
553 return True
554
555
556# MAIN PYTHON FUNCTION
557start_cluster, inspect_storage_address, storage_2cron_address = argument_parser()
558
559# if 'start cluster' optional argument is absent then automatically define the current cluster by node prefix (the database column)
560if start_cluster is None:
561 cluter_host = socket.gethostname()
562 cursor.execute("select cluster_name "
563 "from cluster_ "
564 f"where starts_with('{cluter_host}', node_prefix)")
565 conn.commit()
566 if cursor.rowcount != 1:
567 logging.error(f"the start hostname ({cluter_host}) does not match the prefix of any of the clusters")
568 exit(3)
569 (start_cluster,) = cursor.fetchone()
570
571# get information on the cluster, where the script has run
572cursor.execute("select temp_dir, ssh_prefix "
573 "from cluster_ "
574 f"where cluster_name='{start_cluster}'")
575conn.commit()
576start_tmp_dir, start_ssh_prefix = cursor.fetchone()
577if start_tmp_dir is None:
578 logging.error(f"no start cluster found in the database for name: {start_cluster}")
579 exit(2)
580
581# if just create CRON task for the given storage
582if storage_2cron_address is not None:
583 current_script_path = os.path.realpath(__file__)
584
585 if storage_2cron_address == "*":
586 storage_2cron_cluster = "*"
587 storage_2cron_storage = "*"
588 else:
589 storage_2cron_array = storage_2cron_address.split(":")
590 storage_2cron_cluster = storage_2cron_array[0]
591 storage_2cron_storage = storage_2cron_array[1]
592
593 # get information on the storage (adding to CRON) from the File Inspector database
594 storage_request = "select s.check_interval,s.cluster_name,s.storage_name from storage_ s "
595 if storage_2cron_cluster != "*":
596 storage_request += f"where s.cluster_name='{storage_2cron_cluster}'"
597 if storage_2cron_storage != "*":
598 storage_request += f" and s.storage_name='{storage_2cron_storage}'"
599
600 cursor.execute(storage_request)
601 conn.commit()
602 storage_2cron_list = cursor.fetchall()
603 if not storage_2cron_list:
604 logging.error(f"{inspect_storage_address}: no storage found in the database to add to the CRON")
605 exit(3)
606
607 for storage_2cron_element in storage_2cron_list:
608 process_output, process_error, process_returncode = get_process_full(f'crontab -l | {{ cat; echo "* * */{storage_2cron_element[0]} * * python3 {current_script_path} '
609 f'-s {start_cluster} -i {storage_2cron_element[1]}:{storage_2cron_element[2]}"; }} | crontab -' )
610 if process_returncode != 0:
611 print(f'Error occured while adding job to UNIX CRON...\nOUTPUT: \n{process_output}\nERROR: \n{process_error}')
612 exit(process_returncode)
613
614# check inspect storage (required argument)
615inspect_storage_array = inspect_storage_address.split(":")
616inspect_cluster = inspect_storage_array[0]
617inspect_storage = inspect_storage_array[1]
618
619inspection_starttime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
620logging.info(f"Script for checking integrity of experiment files on the '{inspect_storage_address}' storage started")
621
622# get information on the inspected storage from the File Inspector database
623cursor.execute("select s.storage_path, s.storage_type, s.xrootd_url, s.check_interval, s.full_check, s.check_flags, s.replica_cluster, s.replica_storage, s.replica_flags, c.ssh_prefix "
624 "from storage_ s join cluster_ c on s.cluster_name = c.cluster_name "
625 f"where s.cluster_name='{inspect_cluster}' and s.storage_name='{inspect_storage}'")
626conn.commit()
627inspect_storage_path, inspect_storage_type, inspect_xrootd_url, inspect_storage_interval, inspect_storage_full, check_flags, replica_cluster, replica_storage, replica_flags, inspect_ssh_prefix = cursor.fetchone()
628if inspect_storage_path is None:
629 logging.error(f"{inspect_storage_address}: no storage found in the database")
630 exit(3)
631
632# get access protocol between the start cluster and inspection storage from the File Inspector database
633# check_protocol: 0 = local, 1 = ssh, 2 = xrootd, 3 = eos && transfer_protocol: 0 = cp, 1 = scp, 2 = xrootd, 3 = eos
634cursor.execute("select check_protocol, transfer_protocol "
635 "from storage_communication "
636 f"where source_cluster='{start_cluster}' and destination_cluster='{inspect_cluster}' and destination_storage='{inspect_storage}'")
637conn.commit()
638if cursor.rowcount < 1:
639 logging.error(f"no communication protocol is defined between '{start_cluster}' and '{inspect_storage_address}'. Please, specify it in the database")
640 exit(4)
641si_check_protocol, si_transfer_protocol = cursor.fetchone()
642
643# define if the current inspection is fast (get checksum only if it provides by the storage) or full
644full_check = False
645cursor.execute("select count(*) as check_count "
646 "from storage_check "
647 f"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}' and start_datetime > (select max(start_datetime) "
648 "from storage_check "
649 f"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}' and full_check)")
650conn.commit()
651(fast_check_count,) = cursor.fetchone()
652if fast_check_count >= (inspect_storage_full-1):
653 full_check = True
654
655# get a list of the files in the file catalogue for the storage element
656cursor.execute("select file_guid, file_path, file_size, file_checksum, replica_validity, 0 "
657 "from file_catalogue "
658 f"where cluster_name='{inspect_cluster}' and storage_name='{inspect_storage}'")
659conn.commit()
660catalogue_files = cursor.fetchall()
661catalogue_matches = []
662catalogue_matches = [0 for i in range(len(catalogue_files))]
663
664# insert information on the integrity check
665catalogue_file_count = len(catalogue_files)
666cursor.execute("insert into storage_check(cluster_name, storage_name, start_datetime, file_count, full_check) "
667 f"values ('{inspect_cluster}', '{inspect_storage}', '{inspection_starttime}', {catalogue_file_count}, '{full_check}') RETURNING check_id")
668inspection_check_id = cursor.fetchone()[0]
669conn.commit()
670
671# get and check full file list in the local directory
672if si_check_protocol == 0:
673 #result = [os.path.join(dp, f) for dp, dn, filenames in os.walk(watch_dir) for f in filenames if os.path.splitext(f)[1] == '.data']
674 all_files = glob.glob(inspect_storage_path + '/**/*', recursive=True)
675 all_files = [f for f in all_files if os.path.isfile(f)] # remove directories from file list
676 #print(list(all_files))
677 if not all_files:
678 logging.error(f"{inspect_storage_address}: there are no files on the storage")
679 exit(5)
680 check_files(all_files, catalogue_files, inspect_cluster, inspect_storage, inspect_xrootd_url, 0, inspect_ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id)
681
682# get and check full file list using SSH connection to the inspected storage machine
683elif si_check_protocol == 1:
684 ssh_failed=True
685 while ssh_failed:
686 process_output, process_error, returncode = get_process_full(f"tar cf - . | ssh {inspect_ssh_prefix} 'D=`mktemp -d`; tar xf - -C $D; $D/ssh_run.sh \"-s {inspect_cluster}\" \"-i {inspect_cluster}:{inspect_storage}\"'")
687 process_output = process_output.strip()
688 if process_error:
689 logging.error(f'{inspect_storage_address}: execution via SSH failed for the destination storage: {returncode} - {process_error}')
690 #continue
691 exit(6)
692 ssh_failed=False
693
694# get and check full file list using XRootD commands to the inspected storage machine
695elif si_check_protocol == 2:
696 xrd_file_list = get_files_xrd(inspect_storage_path, inspect_xrootd_url)
697 if not xrd_file_list:
698 logging.error(f"{inspect_storage_address}: there are no files on the storage")
699 exit(7)
700 check_files(xrd_file_list, catalogue_files, inspect_cluster, inspect_storage, inspect_xrootd_url, 2, inspect_ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id)
701
702# get and check full file list using EOS commands to the inspected storage machine
703elif si_check_protocol == 3:
704 eos_file_list = get_files_eos(inspect_storage_path, inspect_xrootd_url)
705 if not eos_file_list:
706 logging.error(f"{inspect_storage_address}: there are no files on the storage")
707 exit(8)
708 check_files(eos_file_list, catalogue_files, inspect_cluster, inspect_storage, inspect_xrootd_url, 3, inspect_ssh_prefix, full_check, catalogue_matches, start_tmp_dir, conn, inspection_check_id)
709
710# searching for files in the File Catalogue that have no pair on the inspected storage element
711delete_absent_files = ("d" in check_flags) # check_flags - 'd' means "delete absent files"
712for i,file_record in enumerate(catalogue_files):
713 if catalogue_matches[i] == 0:
714 catalogue_file_guid, catalogue_file_path, _, _, _, _ = file_record
715 logging.warning(f"{catalogue_file_path}: the record in the File Catalogue has no corresponding file on the storage element")
716 cnt_missed_files += 1
717 add_failed_check(inspection_check_id, catalogue_file_guid, 1, '', conn)
718 # if File Catalogue record has no pair on the inspected storage and 'd' flag is set then delete the record
719 if delete_absent_files:
720 cursor.execute(f"delete from file_catalogue where file_guid = {catalogue_file_guid}")
721 conn.commit()
722 cnt_deleted_files += 1
723 logging.info("The record of the absent file in the File Catalogue has been deleted")
724
725# if the inspected storage element is replica, then compare it with the source (transfer_protocol: 0 = cp, 1 = scp, 2 = xrootd, 3 = eos)
726if replica_cluster is not None:
727 if "e" in replica_flags:
728 # get information on a parent storage, for which the current one is replica
729 cursor.execute("select s.storage_path, s.xrootd_url "
730 "from storage_ s "
731 f"where s.cluster_name = '{replica_cluster}' and s.storage_name = '{replica_storage}'")
732 conn.commit()
733 replica_storage_path, replica_xrootd_url = cursor.fetchone()
734 compare_storages(inspect_cluster, inspect_storage, inspect_storage_path, inspect_xrootd_url,
735 replica_cluster, replica_storage, replica_storage_path, replica_xrootd_url, replica_flags,
736 start_cluster, start_tmp_dir, si_check_protocol, si_transfer_protocol, inspect_ssh_prefix, conn)
737
738inspection_endtime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
739cursor.execute(f"update storage_check set end_datetime='{inspection_endtime}' where check_id={inspection_check_id}")
740conn.commit()
741
742cursor.execute("select count(*) "
743 "from failed_check "
744 f"where check_id={inspection_check_id}")
745found_errors = cursor.fetchone()[0]
746conn.commit()
747cursor.close()
748conn.close()
749
750# send email with resuls of the integrity check
751from_address = "<inspector@jinr.ru>"
752subject = "File Inspector - Integrity Results"
753body = (f"File inspection on '{inspect_storage_address}' storage completed\n"
754 f"Operation time: {inspection_starttime} - {inspection_endtime}\n"
755 f"Number of errors: {found_errors}\n")
756send_email(from_address, mail_address, subject, body)
757
758logging.info(f"Script for checking integrity of experiment files on the '{inspect_storage_address}' storage finished")
759logging.info("Statistics:")
760logging.info(f" All checked files - {cnt_all_files}, Found files in the Catalogue - {cnt_found_catalogue}")
761logging.info(f" Missed files - {cnt_missed_files}, Deleted files from the Catalogue - {cnt_deleted_files}")
762logging.info(f" Get file info error - {cnt_fileinfo_error}\n Updated records (file info) - {cnt_update_info}")
763logging.info(f" Mismatched records - {cnt_mismatched_files}\n Added files to the Catalogue - {cnt_add_catalogue}")