11logging.getLogger().setLevel(logging.INFO)
15 parser = argparse.ArgumentParser(
16 description=
'Script for transferring missing raw files to a destination storage\
17 For more information run with --help option'
32 choices=function_set.storage_dictionary,
33 help=
'Source storage to match and transfer missing raw files',
37 '--destination',
'-d',
40 choices=function_set.storage_dictionary,
41 help=
'Destination storage to obtain missing raw files',
47 help=
'Rewrite existing destination file if exists but has another size or checksum'
52 help=
'Remove destination file if there is no corresponding source file'
55 args = parser.parse_args()
57 run_number, source_storage, destination_storage, file_force, file_delete = args.run, args.source, args.destination, args.force, args.delete
59 if (source_storage[0] == destination_storage[0]):
60 logging.error(
"ERROR: source (%s) and destination (%s) storages must be different" % (source_storage[0], destination_storage[0]))
63 return run_number[0],source_storage[0],destination_storage[0],file_force,file_delete
66def transfer_file(source_file_path, source_storage_url,
67 destination_file_path, destination_storage_url, destination_run_storage_id,
68 source_file_size, source_file_checksum):
69 logging.info(
'Copying source file "%s" to the destination path "%s"' % (source_file_path, destination_file_path))
70 if function_set.cp_file_xrd(source_file_path, source_storage_url, destination_file_path, destination_storage_url) ==
False:
72 logging.info(
'The source file has been copied')
78 conn = psycopg2.connect((
"dbname=%s user=%s host=%s password=%s") % (config[
"db_name"], config[
"db_user"], config[
"db_host"], config[
"db_pass"]))
79 except Exception
as e:
80 logging.error(
"Database Connection Error: %s" % (e))
82 cursor = conn.cursor()
85 if (destination_storage_url
is None):
86 logging.debug(
'Inserting a destination file record with file path = "%s", no file size, no file_checksum' % (destination_file_path))
87 cursor.execute(
"insert into file_catalogue(run_storage_id, file_path) "
88 "values (%d, '%s', %d, '%s')" % (destination_run_storage_id, destination_file_path))
92 logging.info(
"Adding information on the destination file has been successfull, but without the size and checksum")
97 if (destination_file_size != str(source_file_size)):
98 logging.error(
"ERROR: Transferring file has been failed: destination (%s) and source (%d) file sizes are different" % (destination_file_size, source_file_size))
103 if (destination_file_checksum != source_file_checksum):
104 logging.error(
"ERROR: Transferring file has been failed: destination (%s) and source (%s) file checksums are different" % (destination_file_checksum, source_file_checksum))
109 logging.debug(
'Inserting a destination file record with file path = "%s", file size = %d, file_checksum = %s' % (destination_file_path, source_file_size, destination_file_checksum))
110 cursor.execute(
"insert into file_catalogue(run_storage_id, file_path, file_size, file_checksum) "
111 "values (%d, '%s', %d, '%s')" % (destination_run_storage_id, destination_file_path, source_file_size, destination_file_checksum))
113 logging.info(
"Adding information on the destination file has been successfull")
120run_number,source_storage,destination_storage,file_force,file_delete = argument_parser()
121logging.info(
"Transferring of missing raw files from '%s' to '%s' started" % (source_storage, destination_storage))
124config = json.load(open(
"file_catalogue.json"))
128 conn = psycopg2.connect((
"dbname=%s user=%s host=%s password=%s") % (config[
"db_name"], config[
"db_user"], config[
"db_host"], config[
"db_pass"]))
129except Exception
as e:
130 logging.error(
"ERROR: Database connection failed: %s" % (e))
132cursor = conn.cursor()
135cursor.execute(
"select rs.run_storage_id, rs.storage_path, s.xrootd_url "
136 "from run_storage rs join storage_ s on rs.storage_name = s.storage_name "
137 "where rs.run_number=%d and rs.storage_name='%s'" % (run_number, source_storage))
139src_run_storage_id, source_storage_path, source_storage_url = cursor.fetchone()
140if source_storage_path
is None:
141 logging.error(
"ERROR: No storage information (%s) found in the database for Run%s" % (source_storage, run_number))
144cursor.execute(
"select rs.run_storage_id, rs.storage_path, s.xrootd_url "
145 "from run_storage rs join storage_ s on rs.storage_name = s.storage_name "
146 "where rs.run_number=%d and rs.storage_name='%s'" % (run_number, destination_storage))
148dest_run_storage_id, destination_storage_path, destination_storage_url = cursor.fetchone()
149if destination_storage_path
is None:
150 logging.error(
"ERROR: No storage information (%s) found in the database for Run%s" % (destination_storage_path, run_number))
154cursor.execute(
"select file_path, file_size, file_checksum "
155 "from file_catalogue fc join run_storage rs on fc.run_storage_id = rs.run_storage_id "
156 "where run_number=%d and storage_name='%s'" % (run_number, source_storage))
158source_files = cursor.fetchall()
161cursor.execute(
"select file_path, file_size, file_checksum "
162 "from file_catalogue fc join run_storage rs on fc.run_storage_id = rs.run_storage_id "
163 "where run_number=%d and storage_name='%s'" % (run_number, destination_storage))
165destination_files = cursor.fetchall()
166destination_matches = []
167destination_matches = [-1
for i
in range(len(destination_files))]
172file_list = open(
"transfer_raw_data.txt",
"w")
176for cur_src_file
in source_files:
178 src_file_path, src_file_size, src_file_checksum = cur_src_file
179 src_file_end = src_file_path.replace(source_storage_path,
'')
180 logging.debug(
'Current check for source file: "%s"' % (src_file_path))
184 for cur_dest_file
in destination_files:
185 dest_file_path, dest_file_size, dest_file_checksum = cur_dest_file
186 if dest_file_path.endswith(src_file_end):
187 destination_matches[iter_dest] = iter_src
192 dest_file_path = destination_storage_path + src_file_end
193 logging.info(
"The destination file is missing at '%s'" % (dest_file_path))
194 print(dest_file_path, file=file_list)
195 transfer_file(src_file_path, source_storage_url,
196 dest_file_path, destination_storage_url, dest_run_storage_id,
197 src_file_size, src_file_checksum)
205 dest_file_path, dest_file_size, dest_file_checksum = cur_dest_file
207 if dest_file_size != src_file_size:
208 if (dest_file_size
is None):
209 logging.error(
"ERROR: the destination file (%s) have no information on the size.\nPlease, run 'match_raw_data.py' script at the destination storage" % (dest_file_path))
211 logging.error(
"ERROR: the source (%s) and destination (%s) files have different sizes: %d <> %d" % (src_file_path, dest_file_path, src_file_size, dest_file_size))
213 logging.info(
"The destination file at '%s' has an incorrect size and must be replaced (%d <> %d)" % (dest_file_path, dest_file_size, src_file_size))
214 print(dest_file_path, file=file_list)
219 if dest_file_checksum != src_file_checksum:
220 if (dest_file_checksum
is None):
221 logging.error(
"ERROR: the destination file (%s) have no information on the checksum.\nPlease, run 'match_raw_data.py' script at the destination storage" % (dest_file_path))
223 logging.error(
"ERROR: the source (%s) and destination (%s) files have different checksum: %s <> %s" % (src_file_path, dest_file_path, src_file_checksum, dest_file_checksum))
225 logging.info(
"The destination file at '%s' has an incorrect checksum and must be replaced (%s <> %s)" % (dest_file_path, dest_file_checksum, src_file_checksum))
226 print(dest_file_path, file=file_list)
232 logging.debug(
"The source and destination file are fully matched")
234logging.info(
"Transferring of missing raw files from '%s' to '%s' finished" % (source_storage, destination_storage))
238print(
"", file=file_list)
239for cur_dest_file
in destination_files:
240 dest_file_path, dest_file_size, dest_file_checksum = cur_dest_file
241 if destination_matches[iter_dest] == -1:
242 logging.warning(
"WARNING: The destination file at '%s' has no corresponding source file" % (dest_file_path))
243 print(dest_file_path, file=file_list)
247 logging.info(
"NOT IMPLEMENTED YET: The destination file with no corresponding source file has been deleted")
get_file_xrdsum(file_path, xrootd_url)
get_file_xrdsize(file_path, xrootd_url)
cp_file_xrd(source_file_path, destination_file_path, source_storage_url, destination_storage_url)