11CHECKSUM_BLOCKSIZE=1024*1024*1024
12storage_dictionary = [
'ddc',
'ncx',
'cicc']
16 format=
'%(asctime)s %(levelname)s: %(message)s',
17 datefmt=
'%Y-%m-%d %H:%M:%S',
19 logging.FileHandler(
"file_inspector.log", mode=
'w'),
20 logging.StreamHandler(sys.stdout)
25 """ Class for colouring output messages"""
27 yellow =
"\x1b[33;93m"
29 bold_red =
"\x1b[31;1m"
31 format=
'%(asctime)s %(levelname)s: %(message)s'
32 datefmt=
'%Y-%m-%d %H:%M:%S',
34 logging.DEBUG: grey + format + reset,
36 logging.WARNING: yellow + format + reset,
37 logging.ERROR: red + format + reset,
38 logging.CRITICAL: bold_red + format + reset
41 log_fmt = self.
FORMATS.get(record.levelno)
42 formatter = logging.Formatter(log_fmt)
43 return formatter.format(record)
49 """ Common exception while trying to get file attributes or content"""
60 output_lines = subprocess.run([process_string], stdout=subprocess.PIPE).stdout.splitlines()
65 child = subprocess.Popen(process_string, stdout=subprocess.PIPE)
66 streamdata = child.communicate()[0]
67 return child.returncode
71 popen = subprocess.Popen(process_string, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=
True)
74 process_output = popen.stdout.read().decode(
"utf-8")
75 process_error = popen.stderr.read().decode(
"utf-8")
76 return process_output, process_error, popen.returncode
82 process_output, process_error, returncode =
get_process_full(f
"ssh {ssh_data} 'test -f {file_path} | echo $?'")
83 process_output = process_output.strip()
84 is_digits = bool(re.match(
'^[0-9]*$', process_output))
85 if (
not process_output)
or (
not is_digits)
or process_error:
86 logging.error(f
'{file_path}: verifying the existence of the file using SSH failed: {returncode} - {process_error}')
88 if int(process_output) == 0:
95 process_output, process_error, returncode =
get_process_full(f
'xrdfs {xrootd_url} ls {file_path} | grep "\[ERROR]"')
96 process_output = process_output.strip()
97 if process_output
or process_error:
109 """ Get file size in bytes via SSH (in case of ssh-key access)
111 :param file_path: file path
112 :param ssh_data: ssh login format: "user@host"
113 :returns: file size in bytes
114 :raises FileAccessException: if there is any error while getting file size (message + integer code)
116 process_output, process_error, returncode =
get_process_full(f
"ssh {ssh_data} 'du -b {file_path} | cut -f1'")
117 process_output = process_output.strip()
118 is_digits = bool(re.match(
'^[0-9]*$', process_output))
119 if (
not process_output)
or (
not is_digits)
or process_error:
120 logging.error(f
'{file_path}: obtaining the file size using SSH failed: {returncode} - {process_error}')
122 return int(process_output)
125 """ Get file size in bytes via XRootD
127 :param file_path: file path
128 :param xrootd_url: XRootD URL, e.g. 'root://eos.jinr.ru//eos/nica/bmn...'
129 :returns: file size in bytes
130 :raises FileAccessException: if there is any error while getting file size (message + integer code)
132 process_output, process_error, returncode =
get_process_full(f
"xrdfs {xrootd_url} stat {file_path} | grep Size | awk -F ' ' '{{print $2}}'")
133 process_output = process_output.strip()
134 is_digits = bool(re.match(
'^[0-9]*$', process_output))
135 if (
not process_output)
or (
not is_digits)
or process_error:
136 logging.error(f
"{file_path}: obtaining the file size using 'xrdfs' failed: {returncode} - {process_error}")
138 return int(process_output)
141 """ Get file size in bytes via EOS
143 :param file_path: file path
144 :param xrootd_url: XRootD/EOS URL, e.g. 'root://eos.jinr.ru//eos/nica/bmn...'
145 :returns: file size in bytes
146 :raises FileAccessException: if there is any error while getting file size (message + integer code)
148 process_output, process_error, returncode =
get_process_full(f
"eos {xrootd_url} fileinfo {file_path} --size | awk '{{print $2}}'")
149 process_output = process_output.strip()
150 is_digits = bool(re.match(
'^[0-9]*$', process_output))
151 if (
not process_output)
or (
not is_digits)
or process_error:
152 logging.error(f
"{file_path}: obtaining the file size using 'eos' failed: {returncode} - {process_error}")
154 return int(process_output)
157 """ Get file MD5 checksum (read splitted by chunks)
159 :param file_path: file path
160 :returns: MD5 file checksum
161 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
163 file_md5 = hashlib.md5()
165 f = open(file_path,
"rb")
166 for chunk
in iter(
lambda: f.read(CHECKSUM_BLOCKSIZE), b
""):
167 file_md5.update(chunk)
168 return file_md5.hexdigest()
169 except FileNotFoundError
as e:
170 logging.error(f
"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
172 except Exception
as e:
173 logging.error(f
"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
177 """ Get file MD5 checksum (read all at once)
179 :param file_path: file path
180 :returns: MD5 file checksum
181 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
184 a_file = open(file_path,
"rb")
185 file_content = a_file.read()
186 file_md5 = hashlib.md5(file_content).hexdigest()
188 except FileNotFoundError
as e:
189 logging.error(f
"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
191 except Exception
as e:
192 logging.error(f
"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
196 """ Get file Adler32 checksum (read splitted by chunks)
198 :param file_path: file path
199 :returns: Adler32 file checksum
200 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
204 with open(file_path,
"rb")
as f:
206 data = f.read(CHECKSUM_BLOCKSIZE)
209 adler_sum = zlib.adler32(data, adler_sum)
212 return hex(adler_sum)[2:10].zfill(8).lower()
213 except FileNotFoundError
as e:
214 logging.error(f
"{file_path}: the file is not accessible to calculate Adler32 checksum (exception: {e})")
216 except Exception
as e:
217 logging.error(f
"{file_path}: the file is not accessible to calculate Adler32 checksum (exception: {e})")
221 """ Get file Adler32 checksum (read whole file at once)
223 :param file_path: file path
224 :returns: Adler32 file checksum
225 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
227 BLOCKSIZE=256*1024*1024
230 with open(file_path,
"rb")
as f:
232 data = f.read(BLOCKSIZE)
235 file_md5 = zlib.adler32(data, file_md5)
238 return hex(file_md5)[2:10].zfill(8).lower()
239 except FileNotFoundError
as e:
240 logging.error(f
"{file_path}: the file is not found to calculate Adler32 checksum (exception: {e})")
242 except Exception
as e:
243 logging.error(f
"{file_path}: the file is not accessible to calculate Adler32 checksum (exception: {e})")
247 """ Get file checksum via XRootD
249 :param file_path: file path
250 :param xrootd_url: XRootD/EOS URL, e.g. 'root://eos.jinr.ru//eos/nica/bmn...'
251 :returns: file checksum (Adler32 at the NICA storages)
252 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
253 :raises ValueError: if returned checksum has incorrect format (non-HEX value)
255 process_output, process_error, returncode =
get_process_full(f
"xrdfs {xrootd_url} query checksum {file_path} | awk -F ' ' '{{print $2}}'")
256 process_output = process_output.strip()
257 is_hex = all(c
in string.hexdigits
for c
in process_output)
258 if (
not process_output)
or process_error:
259 logging.error(f
"{file_path}: query of the file checksum using 'xrdfs' failed: {returncode} - {process_error}")
262 logging.error(f
"{file_path}: query of the file checksum using 'xrdfs' returned non-HEX value: {process_output}")
263 raise ValueError(f
"{file_path}: query of the file checksum using 'xrdfs' returned non-HEX value: {process_output}")
264 return process_output
268def get_file_info(file_path, storage_url, protocol_type, ssh_con, start_tmp_dir, match_checksum = 0):
269 """ Get file information (size and checksum) using different protocols (in case of errors: file_size and/or file_checksum is None)
271 :param file_path: input file path to get its information
272 :param storage_url: storage URL, where the input file is located
273 :param protocol type: 0 - local file; 1 - SSH protocol; 2 - XRootD protocol; 3 - EOS protocol
274 :param ssh_con: SSH prefix as 'user@server'
275 :param start_tmp_dir: temporary directory to copy file for checking
276 :param match_checksum: 0 - do not match checksums, 1 - fast comparing checksum if it exists in the File System, 2- always manual recalculating checksum before comparing
278 - 1 - file size (integer, in bytes);
279 - 2 - file checksum (string);
280 - 3 - error code (0 - no errors, 1 - file not found, 2 - file read error, 3 - error while getting checksum, <0 - internal errors)
281 - 4 - error message (in case of any errors)
287 logging.debug(f
"Get file information locating at \"{file_path}\" (url: {storage_url}, match checksum: {match_sum.get(match_checksum, 'no')})")
290 logging.debug(f
'Checking the file size (protocol = {protocol_type})...')
292 if protocol_type == 0
or (protocol_type == 1
and ssh_con ==
''):
294 file_size = os.path.getsize(file_path)
295 except OSError
as error:
296 logging.error(f
"{file_path}: the file is not accessible to get its size")
297 return None,
None, 2, f
"{file_path}: the file is not accessible to get its size"
298 elif protocol_type == 1:
301 except Exception
as e:
302 return None,
None, 2, str(e)
303 elif protocol_type == 2:
306 except Exception
as e:
307 return None,
None, 2, str(e)
308 elif protocol_type == 3:
311 except Exception
as e:
312 return None,
None, 2, str(e)
314 logging.error(f
"{file_path}: transfer protocol (#{protocol_type}) is not supported")
315 return None,
None, 2, f
"{file_path}: transfer protocol (#{protocol_type}) is not supported"
317 if file_size ==
None:
318 logging.error(f
"{file_path}: the file size is not defined")
319 return None,
None, 2, f
"{file_path}: the file size is not defined"
321 if match_checksum == 0:
322 return file_size,
'', 0,
''
324 logging.debug(
'Checking the file checksum')
326 if match_checksum == 1
and storage_url
is not None:
330 except Exception
as e:
331 return file_size,
None, 2, str(e)
333 return file_size,
None, 1, f
"File not found using XRootD: {file_path}"
335 if protocol_type == 0
or (protocol_type == 1
and ssh_con ==
''):
338 except Exception
as e:
339 return file_size,
None, 2, str(e)
340 elif protocol_type == 1
or protocol_type == 2
or protocol_type == 3:
341 local_file_copy = start_tmp_dir +
'/' + os.path.basename(file_path)
342 if protocol_type == 1:
344 elif protocol_type == 2:
346 cp_file_xrd(file_path, storage_url, local_file_copy,
None)
349 cp_file_eos(file_path, storage_url, local_file_copy,
None)
352 except Exception
as e:
353 return file_size,
None, 2, str(e)
354 os.remove(local_file_copy)
356 logging.error(f
"{file_path}: the transfer protocol (#{protocol_type}) is not supported")
357 return file_size,
None, 2, f
"{file_path}: the transfer protocol (#{protocol_type}) is not supported"
359 if file_checksum ==
'':
360 logging.error(f
"{file_path}: the checksum of the file is not defined")
362 return file_size,
None, 3, f
"{file_path}: the checksum of the file is not defined"
364 return file_size, file_checksum, 0,
''
369 xrd_ls_lines = subprocess.run([f
'xrdfs',
'{xrootd_url} ls {file_path}'], stdout=subprocess.PIPE).stdout.splitlines()
374 eos_ls_lines = subprocess.run([
'eos', f
'{xrootd_url} ls -F {file_path}'], stdout=subprocess.PIPE).stdout.splitlines()
386 except FileNotFoundError
as err:
387 logging.error(f
'invalid directory ({item}) encountered while listing "{dir}" directory')
392 if not dir.endswith(
'/'): dir +=
'/'
398 if abspath.endswith(
'/'):
402 except FileNotFoundError
as err:
403 logging.error(f
'invalid directory ({item}) encountered while listing "{dir}" directory')
408def cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, destination_ssh_prefix):
409 full_source_file = source_file_path
410 if source_ssh_prefix !=
'': full_source_file = source_ssh_prefix +
":" + source_file_path
411 full_destinaton_file = destination_file_path
412 if destination_ssh_prefix !=
'': full_destinaton_file = destination_ssh_prefix +
":" + destination_file_path
415 destination_dir = os.path.dirname(destination_file_path)
416 if destination_ssh_prefix !=
'':
417 process_output, process_error, returncode =
get_process_full(f
"ssh {destination_ssh_prefix} 'mkdir -p {destination_dir}'")
418 if process_output
or process_error:
419 logging.error(f
'{destination_dir}: creating the destination folder failed: {returncode} - {process_error} (output: {process_output})')
422 if not os.path.exists(destination_dir):
423 os.makedirs(destination_dir)
425 process_output, process_error, returncode =
get_process_full(f
"scp -B -p {full_source_file} {full_destinaton_file}")
426 if process_output
or process_error:
427 logging.error(f
'scp command failed: {returncode} - {process_error} (output: {process_output})')
436 copy_prefix, is_source_copy_prefix = False):
437 if connection_ssh_prefix ==
'':
438 logging.error(
'ssh prefix for the first connection is not defined')
441 copy_file_line = f
"{copy_prefix}:{source_file_path} {destination_file_path}" if is_source_copy_prefix \
442 else f
"{source_file_path} {copy_prefix}:{destination_file_path}"
444 command_line = f
'ssh {connection_ssh_prefix} "'
445 if transfer_protocol == 0:
446 destination_dir = os.path.dirname(destination_file_path)
447 command_line += f
'mkdir -p {destination_dir};\cp -f {source_file_path} {destination_file_path}"'
448 elif transfer_protocol == 1:
449 destination_dir = os.path.dirname(destination_file_path)
450 command_line += f
'ssh {connection_ssh_prefix} mkdir -p {destination_dir}; scp -B -p {copy_file_line}"'
451 elif transfer_protocol == 2:
452 command_line += f
'xrdcp --nopbar {copy_file_line}"'
453 elif transfer_protocol == 3:
454 command_line += f
'eos cp -p -n -P -S {copy_file_line}"'
457 if process_output
or process_error:
458 logging.error(f
'copy command via SSH failed: {returncode} - {process_error} (output: {process_output})')
463def cp_file_xrd(source_file_path, destination_file_path, source_storage_url, destination_storage_url):
464 full_source_file = source_file_path
465 if source_storage_url
is not None: full_source_file = source_storage_url +
"/" + source_file_path
466 full_destinaton_file = destination_file_path
467 if destination_storage_url
is not None: full_destinaton_file = destination_storage_url +
"/" + destination_file_path
469 process_output, process_error, returncode =
get_process_full(f
"xrdcp --nopbar {full_source_file} {full_destinaton_file}")
470 if process_output
or process_error:
471 logging.error(f
'xrdcp command failed: {returncode} - {process_error} (output: {process_output})')
476def cp_file_eos(source_file_path, destination_file_path, source_storage_url, destination_storage_url):
477 full_source_file = source_file_path
478 if source_storage_url
is not None: full_source_file = source_storage_url +
"/" + source_file_path
479 full_destinaton_file = destination_file_path
480 if destination_storage_url
is not None: full_destinaton_file = destination_storage_url +
"/" + destination_file_path
482 process_output, process_error, returncode =
get_process_full(f
"eos cp -p -n -P -S {full_source_file} {full_destinaton_file}")
483 if process_output
or process_error:
484 logging.error(f
'eos cp command failed: {returncode} - {process_error} (output: {process_output})')
491 process_output, process_error, returncode =
get_process_full(f
"xrdfs {xrootd_url} rm {file_path} && xrdfs {xrootd_url} ls {file_path} | grep [ERROR]")
492 process_output = process_output.strip()
493 if (
not process_error)
and (
not process_output):
494 logging.error(f
"{file_path}: removing the file using 'xrdfs rm' failed': {returncode} - {process_error} (output: {process_output})")
__init__(self, message, error_code=None)
get_process_retcode(process_string)
check_file_exists_ssh(file_path, ssh_data)
GET FILE INFORMATION ### check whether file exists using SSH protocol.
get_file_xrdsum(file_path, xrootd_url)
get_file_info(file_path, storage_url, protocol_type, ssh_con, start_tmp_dir, match_checksum=0)
get_files_eos(dir, xrootd_url)
cp_file_eos(source_file_path, destination_file_path, source_storage_url, destination_storage_url)
get_file_adler32c(file_path)
get_file_eossize(file_path, xrootd_url)
get_files_xrd(dir, xrootd_url)
check_file_exists_xrd(file_path, xrootd_url)
ls_dir_eos(file_path, xrootd_url)
get_file_xrdsize(file_path, xrootd_url)
get_process_full(process_string)
get_file_sshsize(file_path, ssh_data)
get_process_output(process_string)
get_file_adler32a(file_path)
ls_dir_xrd(file_path, xrootd_url)
GET FILE LIST ### list XRootD directory.
cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, destination_ssh_prefix)
COPY FILE ### copy file via SCP (in case of ssh-key access).
check_file_exists_eos(file_path, xrootd_url)
cp_file_xrd(source_file_path, destination_file_path, source_storage_url, destination_storage_url)
cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, transfer_protocol, copy_prefix, is_source_copy_prefix=False)
rm_file_xrd(file_path, xrootd_url)
REMOVE FILE ### remove remote file via XRootD.