BmnRoot
Loading...
Searching...
No Matches
function_set.py
Go to the documentation of this file.
1# common functions to use in BM@N python scipts
2import os
3import sys
4import string
5import logging
6import subprocess
7import hashlib
8import re
9import zlib # for adler32
10
11CHECKSUM_BLOCKSIZE=1024*1024*1024
12storage_dictionary = ['ddc','ncx', 'cicc']
13
14# tuning log of output messages
15logging.basicConfig(
16 format='%(asctime)s %(levelname)s: %(message)s',
17 datefmt='%Y-%m-%d %H:%M:%S',
18 handlers=[
19 logging.FileHandler("file_inspector.log", mode='w'),
20 logging.StreamHandler(sys.stdout)
21 ]
22)
23
24class CustomFormatter(logging.Formatter):
25 """ Class for colouring output messages"""
26 grey = "\x1b[38;20m"
27 yellow = "\x1b[33;93m"
28 red = "\x1b[33;91m"
29 bold_red = "\x1b[31;1m"
30 reset = "\x1b[0m"
31 format='%(asctime)s %(levelname)s: %(message)s'
32 datefmt='%Y-%m-%d %H:%M:%S',
33 FORMATS = {
34 logging.DEBUG: grey + format + reset,
35 logging.INFO: format,
36 logging.WARNING: yellow + format + reset,
37 logging.ERROR: red + format + reset,
38 logging.CRITICAL: bold_red + format + reset
39 }
40 def format(self, record):
41 log_fmt = self.FORMATS.get(record.levelno)
42 formatter = logging.Formatter(log_fmt)
43 return formatter.format(record)
44# colouring messages only for output stream, not file
45logging.getLogger().handlers[1].setFormatter(CustomFormatter())
46
47
48class FileAccessException(Exception):
49 """ Common exception while trying to get file attributes or content"""
50 def __init__(self, message, error_code=None):
51 self.message = message
52 self.error_code = error_code
53 def __str__(self):
54 return str(self.message)
55
56
57
59def get_process_output(process_string):
60 output_lines = subprocess.run([process_string], stdout=subprocess.PIPE).stdout.splitlines()
61 return output_lines
62
63# get process return code
64def get_process_retcode(process_string):
65 child = subprocess.Popen(process_string, stdout=subprocess.PIPE)
66 streamdata = child.communicate()[0]
67 return child.returncode
68
69# get process output, error and return code
70def get_process_full(process_string):
71 popen = subprocess.Popen(process_string, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
72 #process_output, process_error = popen.communicate()
73 popen.wait()
74 process_output = popen.stdout.read().decode("utf-8")
75 process_error = popen.stderr.read().decode("utf-8")
76 return process_output, process_error, popen.returncode
77
78
79
81def check_file_exists_ssh(file_path, ssh_data):
82 process_output, process_error, returncode = get_process_full(f"ssh {ssh_data} 'test -f {file_path} | echo $?'")
83 process_output = process_output.strip()
84 is_digits = bool(re.match('^[0-9]*$', process_output))
85 if (not process_output) or (not is_digits) or process_error:
86 logging.error(f'{file_path}: verifying the existence of the file using SSH failed: {returncode} - {process_error}')
87 return None
88 if int(process_output) == 0:
89 return True
90 else:
91 return False
92
93# check whether file exists using XRootD protocol, where absence is defined by output with "[ERROR]"
94def check_file_exists_xrd(file_path, xrootd_url):
95 process_output, process_error, returncode = get_process_full(f'xrdfs {xrootd_url} ls {file_path} | grep "\[ERROR]"')
96 process_output = process_output.strip()
97 if process_output or process_error:
98 return False
99 return True
100
101# check whether file exists using EOS protocol
102def check_file_exists_eos(file_path, xrootd_url):
103 if get_process_retcode(f"eos {xrootd_url} stat -f {file_path}") == 0:
104 return True
105 else:
106 return False
107
108def get_file_sshsize(file_path, ssh_data):
109 """ Get file size in bytes via SSH (in case of ssh-key access)
110
111 :param file_path: file path
112 :param ssh_data: ssh login format: "user@host"
113 :returns: file size in bytes
114 :raises FileAccessException: if there is any error while getting file size (message + integer code)
115 """
116 process_output, process_error, returncode = get_process_full(f"ssh {ssh_data} 'du -b {file_path} | cut -f1'")
117 process_output = process_output.strip()
118 is_digits = bool(re.match('^[0-9]*$', process_output))
119 if (not process_output) or (not is_digits) or process_error:
120 logging.error(f'{file_path}: obtaining the file size using SSH failed: {returncode} - {process_error}')
121 raise FileAccessException(process_error, returncode)
122 return int(process_output)
123
124def get_file_xrdsize(file_path, xrootd_url):
125 """ Get file size in bytes via XRootD
126
127 :param file_path: file path
128 :param xrootd_url: XRootD URL, e.g. 'root://eos.jinr.ru//eos/nica/bmn...'
129 :returns: file size in bytes
130 :raises FileAccessException: if there is any error while getting file size (message + integer code)
131 """
132 process_output, process_error, returncode = get_process_full(f"xrdfs {xrootd_url} stat {file_path} | grep Size | awk -F ' ' '{{print $2}}'")
133 process_output = process_output.strip()
134 is_digits = bool(re.match('^[0-9]*$', process_output))
135 if (not process_output) or (not is_digits) or process_error:
136 logging.error(f"{file_path}: obtaining the file size using 'xrdfs' failed: {returncode} - {process_error}")
137 raise FileAccessException(process_error, returncode)
138 return int(process_output)
139
140def get_file_eossize(file_path, xrootd_url):
141 """ Get file size in bytes via EOS
142
143 :param file_path: file path
144 :param xrootd_url: XRootD/EOS URL, e.g. 'root://eos.jinr.ru//eos/nica/bmn...'
145 :returns: file size in bytes
146 :raises FileAccessException: if there is any error while getting file size (message + integer code)
147 """
148 process_output, process_error, returncode = get_process_full(f"eos {xrootd_url} fileinfo {file_path} --size | awk '{{print $2}}'")
149 process_output = process_output.strip()
150 is_digits = bool(re.match('^[0-9]*$', process_output))
151 if (not process_output) or (not is_digits) or process_error:
152 logging.error(f"{file_path}: obtaining the file size using 'eos' failed: {returncode} - {process_error}")
153 raise FileAccessException(process_error, returncode)
154 return int(process_output)
155
156def get_file_MD5c(file_path):
157 """ Get file MD5 checksum (read splitted by chunks)
158
159 :param file_path: file path
160 :returns: MD5 file checksum
161 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
162 """
163 file_md5 = hashlib.md5()
164 try:
165 f = open(file_path, "rb")
166 for chunk in iter(lambda: f.read(CHECKSUM_BLOCKSIZE), b""):
167 file_md5.update(chunk)
168 return file_md5.hexdigest()
169 except FileNotFoundError as e:
170 logging.error(f"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
171 raise FileAccessException(str(e), 1)
172 except Exception as e:
173 logging.error(f"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
174 raise FileAccessException(str(e), 2)
175
176def get_file_MD5a(file_path):
177 """ Get file MD5 checksum (read all at once)
178
179 :param file_path: file path
180 :returns: MD5 file checksum
181 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
182 """
183 try:
184 a_file = open(file_path, "rb")
185 file_content = a_file.read()
186 file_md5 = hashlib.md5(file_content).hexdigest()
187 return file_md5
188 except FileNotFoundError as e:
189 logging.error(f"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
190 raise FileAccessException(str(e), 1)
191 except Exception as e:
192 logging.error(f"{file_path}: the file is not accessible to calculate MD5 checksum (exception: {e})")
193 raise FileAccessException(str(e), 2)
194
195def get_file_adler32c(file_path):
196 """ Get file Adler32 checksum (read splitted by chunks)
197
198 :param file_path: file path
199 :returns: Adler32 file checksum
200 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
201 """
202 adler_sum = 1
203 try:
204 with open(file_path,"rb") as f:
205 while True:
206 data = f.read(CHECKSUM_BLOCKSIZE)
207 if not data:
208 break
209 adler_sum = zlib.adler32(data, adler_sum)
210 if adler_sum < 0:
211 adler_sum += 2**32
212 return hex(adler_sum)[2:10].zfill(8).lower()
213 except FileNotFoundError as e:
214 logging.error(f"{file_path}: the file is not accessible to calculate Adler32 checksum (exception: {e})")
215 raise FileAccessException(str(e), 1)
216 except Exception as e:
217 logging.error(f"{file_path}: the file is not accessible to calculate Adler32 checksum (exception: {e})")
218 raise FileAccessException(str(e), 2)
219
220def get_file_adler32a(file_path):
221 """ Get file Adler32 checksum (read whole file at once)
222
223 :param file_path: file path
224 :returns: Adler32 file checksum
225 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
226 """
227 BLOCKSIZE=256*1024*1024
228 file_md5 = 1
229 try:
230 with open(file_path,"rb") as f:
231 while True:
232 data = f.read(BLOCKSIZE)
233 if not data:
234 break
235 file_md5 = zlib.adler32(data, file_md5)
236 if file_md5 < 0:
237 file_md5 += 2**32
238 return hex(file_md5)[2:10].zfill(8).lower()
239 except FileNotFoundError as e:
240 logging.error(f"{file_path}: the file is not found to calculate Adler32 checksum (exception: {e})")
241 raise FileAccessException(str(e), 1)
242 except Exception as e:
243 logging.error(f"{file_path}: the file is not accessible to calculate Adler32 checksum (exception: {e})")
244 raise FileAccessException(str(e), 2)
245
246def get_file_xrdsum(file_path, xrootd_url):
247 """ Get file checksum via XRootD
248
249 :param file_path: file path
250 :param xrootd_url: XRootD/EOS URL, e.g. 'root://eos.jinr.ru//eos/nica/bmn...'
251 :returns: file checksum (Adler32 at the NICA storages)
252 :raises FileAccessException: if there is any error while getting file checksum (message + integer code)
253 :raises ValueError: if returned checksum has incorrect format (non-HEX value)
254 """
255 process_output, process_error, returncode = get_process_full(f"xrdfs {xrootd_url} query checksum {file_path} | awk -F ' ' '{{print $2}}'")
256 process_output = process_output.strip()
257 is_hex = all(c in string.hexdigits for c in process_output)
258 if (not process_output) or process_error:
259 logging.error(f"{file_path}: query of the file checksum using 'xrdfs' failed: {returncode} - {process_error}")
260 raise FileAccessException(process_error, returncode)
261 if not is_hex:
262 logging.error(f"{file_path}: query of the file checksum using 'xrdfs' returned non-HEX value: {process_output}")
263 raise ValueError(f"{file_path}: query of the file checksum using 'xrdfs' returned non-HEX value: {process_output}")
264 return process_output
265
266
267#
268def get_file_info(file_path, storage_url, protocol_type, ssh_con, start_tmp_dir, match_checksum = 0):
269 """ Get file information (size and checksum) using different protocols (in case of errors: file_size and/or file_checksum is None)
270
271 :param file_path: input file path to get its information
272 :param storage_url: storage URL, where the input file is located
273 :param protocol type: 0 - local file; 1 - SSH protocol; 2 - XRootD protocol; 3 - EOS protocol
274 :param ssh_con: SSH prefix as 'user@server'
275 :param start_tmp_dir: temporary directory to copy file for checking
276 :param match_checksum: 0 - do not match checksums, 1 - fast comparing checksum if it exists in the File System, 2- always manual recalculating checksum before comparing
277 :returns:
278 - 1 - file size (integer, in bytes);
279 - 2 - file checksum (string);
280 - 3 - error code (0 - no errors, 1 - file not found, 2 - file read error, 3 - error while getting checksum, <0 - internal errors)
281 - 4 - error message (in case of any errors)
282 """
283 match_sum = {
284 0: 'no',
285 1: 'fast',
286 2: 'full'}
287 logging.debug(f"Get file information locating at \"{file_path}\" (url: {storage_url}, match checksum: {match_sum.get(match_checksum, 'no')})")
288
289 # check destination file size
290 logging.debug(f'Checking the file size (protocol = {protocol_type})...')
291 file_size = None
292 if protocol_type == 0 or (protocol_type == 1 and ssh_con == ''):
293 try:
294 file_size = os.path.getsize(file_path)
295 except OSError as error:
296 logging.error(f"{file_path}: the file is not accessible to get its size")
297 return None, None, 2, f"{file_path}: the file is not accessible to get its size"
298 elif protocol_type == 1:
299 try:
300 file_size = get_file_sshsize(file_path, ssh_con)
301 except Exception as e:
302 return None, None, 2, str(e)
303 elif protocol_type == 2:
304 try:
305 file_size = get_file_xrdsize(file_path, storage_url)
306 except Exception as e:
307 return None, None, 2, str(e)
308 elif protocol_type == 3:
309 try:
310 file_size = get_file_eossize(file_path, storage_url)
311 except Exception as e:
312 return None, None, 2, str(e)
313 else:
314 logging.error(f"{file_path}: transfer protocol (#{protocol_type}) is not supported")
315 return None, None, 2, f"{file_path}: transfer protocol (#{protocol_type}) is not supported"
316
317 if file_size == None:
318 logging.error(f"{file_path}: the file size is not defined")
319 return None, None, 2, f"{file_path}: the file size is not defined"
320
321 if match_checksum == 0:
322 return file_size, '', 0, ''
323 # check destination file checksum
324 logging.debug('Checking the file checksum')
325 file_checksum = ''
326 if match_checksum == 1 and storage_url is not None:
327 if check_file_exists_xrd(file_path, storage_url):
328 try:
329 file_checksum = get_file_xrdsum(file_path, storage_url)
330 except Exception as e:
331 return file_size, None, 2, str(e)
332 else:
333 return file_size, None, 1, f"File not found using XRootD: {file_path}"
334 else:
335 if protocol_type == 0 or (protocol_type == 1 and ssh_con == ''):
336 try:
337 file_checksum = get_file_adler32c(file_path)
338 except Exception as e:
339 return file_size, None, 2, str(e)
340 elif protocol_type == 1 or protocol_type == 2 or protocol_type == 3:
341 local_file_copy = start_tmp_dir + '/' + os.path.basename(file_path)
342 if protocol_type == 1:
343 cp_file_ssh(file_path, local_file_copy, ssh_con)
344 elif protocol_type == 2:
345 #file_checksum = get_file_xrdsum(destination_file_path, destination_storage_url)
346 cp_file_xrd(file_path, storage_url, local_file_copy, None)
347 else:
348 #file_checksum = get_file_xrdsum(destination_file_path, destination_storage_url)
349 cp_file_eos(file_path, storage_url, local_file_copy, None)
350 try:
351 file_checksum = get_file_adler32c(local_file_copy)
352 except Exception as e:
353 return file_size, None, 2, str(e)
354 os.remove(local_file_copy)
355 else:
356 logging.error(f"{file_path}: the transfer protocol (#{protocol_type}) is not supported")
357 return file_size, None, 2, f"{file_path}: the transfer protocol (#{protocol_type}) is not supported"
358
359 if file_checksum == '':
360 logging.error(f"{file_path}: the checksum of the file is not defined")
361 #print(f'no checksum: match_checksum = {match_checksum}, storage_url = {storage_url}, protocol_type = {protocol_type}')
362 return file_size, None, 3, f"{file_path}: the checksum of the file is not defined"
363
364 return file_size, file_checksum, 0, ''
365
366
368def ls_dir_xrd(file_path, xrootd_url):
369 xrd_ls_lines = subprocess.run([f'xrdfs', '{xrootd_url} ls {file_path}'], stdout=subprocess.PIPE).stdout.splitlines()
370 return xrd_ls_lines
371
372# list EOS directory with appending indicator '/' to directories at the end
373def ls_dir_eos(file_path, xrootd_url):
374 eos_ls_lines = subprocess.run(['eos', f'{xrootd_url} ls -F {file_path}'], stdout=subprocess.PIPE).stdout.splitlines()
375 return eos_ls_lines
376
377# get file list in XRootD directory recursively
378def get_files_xrd(dir, xrootd_url):
379 files = list()
380 for item in ls_dir_xrd(dir, xrootd_url):
381 try:
382 if get_process_retcode(f"xrdfs {xrootd_url} stat -q IsDir {item} &> /dev/null") == 0:
383 files = files + get_files_xrd(item)
384 else:
385 files.append(item)
386 except FileNotFoundError as err:
387 logging.error(f'invalid directory ({item}) encountered while listing "{dir}" directory')
388 return files
389
390# get file list in EOS directory recursively
391def get_files_eos(dir, xrootd_url):
392 if not dir.endswith('/'): dir += '/'
393
394 files = list()
395 for item in ls_dir_eos(dir, xrootd_url):
396 abspath = dir + item
397 try:
398 if abspath.endswith('/'):
399 files = files + get_files_eos(abspath)
400 else:
401 files.append(item)
402 except FileNotFoundError as err:
403 logging.error(f'invalid directory ({item}) encountered while listing "{dir}" directory')
404 return files
405
406
408def cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, destination_ssh_prefix):
409 full_source_file = source_file_path
410 if source_ssh_prefix != '': full_source_file = source_ssh_prefix + ":" + source_file_path
411 full_destinaton_file = destination_file_path
412 if destination_ssh_prefix != '': full_destinaton_file = destination_ssh_prefix + ":" + destination_file_path
413
414 # create destination directory (if not exist)
415 destination_dir = os.path.dirname(destination_file_path)
416 if destination_ssh_prefix != '':
417 process_output, process_error, returncode = get_process_full(f"ssh {destination_ssh_prefix} 'mkdir -p {destination_dir}'")
418 if process_output or process_error:
419 logging.error(f'{destination_dir}: creating the destination folder failed: {returncode} - {process_error} (output: {process_output})')
420 return False
421 else:
422 if not os.path.exists(destination_dir):
423 os.makedirs(destination_dir) # ,0o755,True
424
425 process_output, process_error, returncode = get_process_full(f"scp -B -p {full_source_file} {full_destinaton_file}") # -B batch mode (not asking for passwords), -p preserves times from origin
426 if process_output or process_error:
427 logging.error(f'scp command failed: {returncode} - {process_error} (output: {process_output})')
428 return False
429 return True
430
431# copy file using specified protocol via SSH connection (in case of ssh-key access). ssh_prefix format: "user@host"
432# protocol type: 0 - local copy after SSH connection, 1 - scp copy -//-, 2 - xrd copy -//-, 3 - eos copy -//-
433# copy_prefix: prefix for a specified transfer protocol type
434# is_source_copy_prefix: whether use copy_prefix for source or destination file path (true = prefix:source, false = prefix:destination)
435def cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, transfer_protocol,
436 copy_prefix, is_source_copy_prefix = False):
437 if connection_ssh_prefix == '':
438 logging.error('ssh prefix for the first connection is not defined')
439 return False
440
441 copy_file_line = f"{copy_prefix}:{source_file_path} {destination_file_path}" if is_source_copy_prefix \
442 else f"{source_file_path} {copy_prefix}:{destination_file_path}"
443
444 command_line = f'ssh {connection_ssh_prefix} "'
445 if transfer_protocol == 0:
446 destination_dir = os.path.dirname(destination_file_path)
447 command_line += f'mkdir -p {destination_dir};\cp -f {source_file_path} {destination_file_path}"'
448 elif transfer_protocol == 1:
449 destination_dir = os.path.dirname(destination_file_path)
450 command_line += f'ssh {connection_ssh_prefix} mkdir -p {destination_dir}; scp -B -p {copy_file_line}"'
451 elif transfer_protocol == 2:
452 command_line += f'xrdcp --nopbar {copy_file_line}"'
453 elif transfer_protocol == 3:
454 command_line += f'eos cp -p -n -P -S {copy_file_line}"'
455
456 process_output, process_error, returncode = get_process_full(command_line)
457 if process_output or process_error:
458 logging.error(f'copy command via SSH failed: {returncode} - {process_error} (output: {process_output})')
459 return False
460 return True
461
462# copy file via XRootD
463def cp_file_xrd(source_file_path, destination_file_path, source_storage_url, destination_storage_url):
464 full_source_file = source_file_path
465 if source_storage_url is not None: full_source_file = source_storage_url + "/" + source_file_path
466 full_destinaton_file = destination_file_path
467 if destination_storage_url is not None: full_destinaton_file = destination_storage_url + "/" + destination_file_path
468
469 process_output, process_error, returncode = get_process_full(f"xrdcp --nopbar {full_source_file} {full_destinaton_file}")
470 if process_output or process_error:
471 logging.error(f'xrdcp command failed: {returncode} - {process_error} (output: {process_output})')
472 return False
473 return True
474
475# copy file via EOS
476def cp_file_eos(source_file_path, destination_file_path, source_storage_url, destination_storage_url):
477 full_source_file = source_file_path
478 if source_storage_url is not None: full_source_file = source_storage_url + "/" + source_file_path
479 full_destinaton_file = destination_file_path
480 if destination_storage_url is not None: full_destinaton_file = destination_storage_url + "/" + destination_file_path
481
482 process_output, process_error, returncode = get_process_full(f"eos cp -p -n -P -S {full_source_file} {full_destinaton_file}")
483 if process_output or process_error:
484 logging.error(f'eos cp command failed: {returncode} - {process_error} (output: {process_output})')
485 return False
486 return True
487
488
490def rm_file_xrd(file_path, xrootd_url):
491 process_output, process_error, returncode = get_process_full(f"xrdfs {xrootd_url} rm {file_path} && xrdfs {xrootd_url} ls {file_path} | grep [ERROR]") # "xrdfs {xrootd_url} rm {file_path}"
492 process_output = process_output.strip()
493 if (not process_error) and (not process_output):
494 logging.error(f"{file_path}: removing the file using 'xrdfs rm' failed': {returncode} - {process_error} (output: {process_output})")
495 return False
496 return True
__init__(self, message, error_code=None)
get_process_retcode(process_string)
check_file_exists_ssh(file_path, ssh_data)
GET FILE INFORMATION ### check whether file exists using SSH protocol.
get_file_xrdsum(file_path, xrootd_url)
get_file_info(file_path, storage_url, protocol_type, ssh_con, start_tmp_dir, match_checksum=0)
get_file_MD5c(file_path)
get_files_eos(dir, xrootd_url)
cp_file_eos(source_file_path, destination_file_path, source_storage_url, destination_storage_url)
get_file_adler32c(file_path)
get_file_eossize(file_path, xrootd_url)
get_files_xrd(dir, xrootd_url)
check_file_exists_xrd(file_path, xrootd_url)
ls_dir_eos(file_path, xrootd_url)
get_file_xrdsize(file_path, xrootd_url)
get_process_full(process_string)
get_file_sshsize(file_path, ssh_data)
get_process_output(process_string)
get_file_adler32a(file_path)
get_file_MD5a(file_path)
ls_dir_xrd(file_path, xrootd_url)
GET FILE LIST ### list XRootD directory.
cp_file_ssh(source_file_path, destination_file_path, source_ssh_prefix, destination_ssh_prefix)
COPY FILE ### copy file via SCP (in case of ssh-key access).
check_file_exists_eos(file_path, xrootd_url)
cp_file_xrd(source_file_path, destination_file_path, source_storage_url, destination_storage_url)
cp_file_ssh_protocol(source_file_path, destination_file_path, connection_ssh_prefix, transfer_protocol, copy_prefix, is_source_copy_prefix=False)
rm_file_xrd(file_path, xrootd_url)
REMOVE FILE ### remove remote file via XRootD.