BmnRoot
Loading...
Searching...
No Matches
transfer_raw_data.py
Go to the documentation of this file.
1# This python script is developed for matching raw data files
2# between storages: DDC/DAQ -> NCX -> CICC/MICC
3import sys
4import json
5import logging
6import argparse
7import psycopg2
8#from concurrent.futures import ThreadPoolExecutor
9import function_set
10
11logging.getLogger().setLevel(logging.INFO)
12
13# input argument parser
14def argument_parser():
15 parser = argparse.ArgumentParser(
16 description='Script for transferring missing raw files to a destination storage\
17 For more information run with --help option'
18 )
19 parser.add_argument(
20 '--run', '-r',
21 nargs=1,
22 type=int,
23 help='Run number',
24 default=8,
25 choices=[8],
26 required=True
27 )
28 parser.add_argument(
29 '--source', '-s',
30 nargs=1,
31 type=str,
32 choices=function_set.storage_dictionary,
33 help='Source storage to match and transfer missing raw files',
34 required=True
35 )
36 parser.add_argument(
37 '--destination', '-d',
38 nargs=1,
39 type=str,
40 choices=function_set.storage_dictionary,
41 help='Destination storage to obtain missing raw files',
42 required=True
43 )
44 parser.add_argument(
45 '--force', '-f',
46 action='store_true',
47 help='Rewrite existing destination file if exists but has another size or checksum'
48 )
49 parser.add_argument(
50 '--delete', '-del',
51 action='store_true',
52 help='Remove destination file if there is no corresponding source file'
53 )
54
55 args = parser.parse_args()
56
57 run_number, source_storage, destination_storage, file_force, file_delete = args.run, args.source, args.destination, args.force, args.delete
58
59 if (source_storage[0] == destination_storage[0]):
60 logging.error("ERROR: source (%s) and destination (%s) storages must be different" % (source_storage[0], destination_storage[0]))
61 sys.exit(7)
62
63 return run_number[0],source_storage[0],destination_storage[0],file_force,file_delete
64
65# copy source file to the destination if it is absent on the destination storage
66def transfer_file(source_file_path, source_storage_url,
67 destination_file_path, destination_storage_url, destination_run_storage_id,
68 source_file_size, source_file_checksum):
69 logging.info('Copying source file "%s" to the destination path "%s"' % (source_file_path, destination_file_path))
70 if function_set.cp_file_xrd(source_file_path, source_storage_url, destination_file_path, destination_storage_url) == False:
71 return False
72 logging.info('The source file has been copied')
73
74 #os.makedirs(flag_dir, 0o755, True)
75 #Path(flag_dir+file_name).import psycopg2touch()
76 # connect to the database
77 try:
78 conn = psycopg2.connect(("dbname=%s user=%s host=%s password=%s") % (config["db_name"], config["db_user"], config["db_host"], config["db_pass"]))
79 except Exception as e:
80 logging.error("Database Connection Error: %s" % (e))
81 return False
82 cursor = conn.cursor()
83
84 # if no XRootD access then do not check final size and checksum
85 if (destination_storage_url is None):
86 logging.debug('Inserting a destination file record with file path = "%s", no file size, no file_checksum' % (destination_file_path))
87 cursor.execute("insert into file_catalogue(run_storage_id, file_path) "
88 "values (%d, '%s', %d, '%s')" % (destination_run_storage_id, destination_file_path))
89 conn.commit()
90 cursor.close()
91 conn.close()
92 logging.info("Adding information on the destination file has been successfull, but without the size and checksum")
93 return True
94
95 # check the size and checksum of the destination file if possible via XRootD
96 destination_file_size = function_set.get_file_xrdsize(destination_file_path, destination_storage_url)
97 if (destination_file_size != str(source_file_size)):
98 logging.error("ERROR: Transferring file has been failed: destination (%s) and source (%d) file sizes are different" % (destination_file_size, source_file_size))
99 cursor.close()
100 conn.close()
101 return False
102 destination_file_checksum = function_set.get_file_xrdsum(destination_file_path, destination_storage_url)
103 if (destination_file_checksum != source_file_checksum):
104 logging.error("ERROR: Transferring file has been failed: destination (%s) and source (%s) file checksums are different" % (destination_file_checksum, source_file_checksum))
105 cursor.close()
106 conn.close()
107 return False
108
109 logging.debug('Inserting a destination file record with file path = "%s", file size = %d, file_checksum = %s' % (destination_file_path, source_file_size, destination_file_checksum))
110 cursor.execute("insert into file_catalogue(run_storage_id, file_path, file_size, file_checksum) "
111 "values (%d, '%s', %d, '%s')" % (destination_run_storage_id, destination_file_path, source_file_size, destination_file_checksum))
112 conn.commit()
113 logging.info("Adding information on the destination file has been successfull")
114
115 cursor.close()
116 conn.close()
117 return True
118
119
120run_number,source_storage,destination_storage,file_force,file_delete = argument_parser()
121logging.info("Transferring of missing raw files from '%s' to '%s' started" % (source_storage, destination_storage))
122
123# load File Catalogue database configuration from JSON
124config = json.load(open("file_catalogue.json"))
125
126# connect to the File Catalogue database
127try:
128 conn = psycopg2.connect(("dbname=%s user=%s host=%s password=%s") % (config["db_name"], config["db_user"], config["db_host"], config["db_pass"]))
129except Exception as e:
130 logging.error("ERROR: Database connection failed: %s" % (e))
131 exit(1)
132cursor = conn.cursor()
133
134# get source storage information from the File Catalogue
135cursor.execute("select rs.run_storage_id, rs.storage_path, s.xrootd_url "
136 "from run_storage rs join storage_ s on rs.storage_name = s.storage_name "
137 "where rs.run_number=%d and rs.storage_name='%s'" % (run_number, source_storage))
138conn.commit()
139src_run_storage_id, source_storage_path, source_storage_url = cursor.fetchone()
140if source_storage_path is None:
141 logging.error("ERROR: No storage information (%s) found in the database for Run%s" % (source_storage, run_number))
142 exit(2)
143# get destination storage information from the File Catalogue
144cursor.execute("select rs.run_storage_id, rs.storage_path, s.xrootd_url "
145 "from run_storage rs join storage_ s on rs.storage_name = s.storage_name "
146 "where rs.run_number=%d and rs.storage_name='%s'" % (run_number, destination_storage))
147conn.commit()
148dest_run_storage_id, destination_storage_path, destination_storage_url = cursor.fetchone()
149if destination_storage_path is None:
150 logging.error("ERROR: No storage information (%s) found in the database for Run%s" % (destination_storage_path, run_number))
151 exit(3)
152
153# get a list of the raw files on the source storage from the File Catalogue
154cursor.execute("select file_path, file_size, file_checksum "
155 "from file_catalogue fc join run_storage rs on fc.run_storage_id = rs.run_storage_id "
156 "where run_number=%d and storage_name='%s'" % (run_number, source_storage))
157conn.commit()
158source_files = cursor.fetchall()
159
160# get a list of the raw files on the destination storage from the File Catalogue
161cursor.execute("select file_path, file_size, file_checksum "
162 "from file_catalogue fc join run_storage rs on fc.run_storage_id = rs.run_storage_id "
163 "where run_number=%d and storage_name='%s'" % (run_number, destination_storage))
164conn.commit()
165destination_files = cursor.fetchall()
166destination_matches = []
167destination_matches = [-1 for i in range(len(destination_files))]
168cursor.close()
169conn.close()
170
171# create text file to write missing files as a list
172file_list = open("transfer_raw_data.txt", "w")
173
174# main loop to match source and destination raw files
175iter_src = -1
176for cur_src_file in source_files:
177 iter_src+=1
178 src_file_path, src_file_size, src_file_checksum = cur_src_file
179 src_file_end = src_file_path.replace(source_storage_path,'')
180 logging.debug('Current check for source file: "%s"' % (src_file_path))
181
182 # searching for a File Catalogue record at the destination storage with the same end of the file path
183 iter_dest = 0
184 for cur_dest_file in destination_files:
185 dest_file_path, dest_file_size, dest_file_checksum = cur_dest_file
186 if dest_file_path.endswith(src_file_end):
187 destination_matches[iter_dest] = iter_src
188 break
189 iter_dest+=1
190 # if no corresponding file on the destination storage then transfer
191 else:
192 dest_file_path = destination_storage_path + src_file_end
193 logging.info("The destination file is missing at '%s'" % (dest_file_path))
194 print(dest_file_path, file=file_list)
195 transfer_file(src_file_path, source_storage_url,
196 dest_file_path, destination_storage_url, dest_run_storage_id,
197 src_file_size, src_file_checksum)
198 continue
199
200 # if more than one raw file with the same name (critical error)
201 #if len(destination_file) > 1:
202 # logging.error("CRITICAL ERROR: Several raw files with the same name: %s" % (src_file_end))
203 # continue
204
205 dest_file_path, dest_file_size, dest_file_checksum = cur_dest_file
206 # if there is a corresponding file found then check size and checksum
207 if dest_file_size != src_file_size:
208 if (dest_file_size is None):
209 logging.error("ERROR: the destination file (%s) have no information on the size.\nPlease, run 'match_raw_data.py' script at the destination storage" % (dest_file_path))
210 continue
211 logging.error("ERROR: the source (%s) and destination (%s) files have different sizes: %d <> %d" % (src_file_path, dest_file_path, src_file_size, dest_file_size))
212 if file_force:
213 logging.info("The destination file at '%s' has an incorrect size and must be replaced (%d <> %d)" % (dest_file_path, dest_file_size, src_file_size))
214 print(dest_file_path, file=file_list)
215 #transfer_file(src_file_path, source_storage_url,
216 # dest_file_path, destination_storage_url, dest_run_storage_id,
217 # src_file_size, src_file_checksum)
218 continue
219 if dest_file_checksum != src_file_checksum:
220 if (dest_file_checksum is None):
221 logging.error("ERROR: the destination file (%s) have no information on the checksum.\nPlease, run 'match_raw_data.py' script at the destination storage" % (dest_file_path))
222 continue
223 logging.error("ERROR: the source (%s) and destination (%s) files have different checksum: %s <> %s" % (src_file_path, dest_file_path, src_file_checksum, dest_file_checksum))
224 if file_force:
225 logging.info("The destination file at '%s' has an incorrect checksum and must be replaced (%s <> %s)" % (dest_file_path, dest_file_checksum, src_file_checksum))
226 print(dest_file_path, file=file_list)
227 #transfer_file(src_file_path, source_storage_url,
228 # dest_file_path, destination_storage_url, dest_run_storage_id,
229 # src_file_size, src_file_checksum)
230 continue
231
232 logging.debug("The source and destination file are fully matched")
233
234logging.info("Transferring of missing raw files from '%s' to '%s' finished" % (source_storage, destination_storage))
235
236# searching for destination files that have no source pair
237iter_dest = 0
238print("", file=file_list)
239for cur_dest_file in destination_files:
240 dest_file_path, dest_file_size, dest_file_checksum = cur_dest_file
241 if destination_matches[iter_dest] == -1:
242 logging.warning("WARNING: The destination file at '%s' has no corresponding source file" % (dest_file_path))
243 print(dest_file_path, file=file_list)
244 if file_delete:
245 #cursor.execute("delete from file_catalogue where run_storage_id = %d and file_path = '%s'" % (run_storage_id, catalogue_file_path))
246 #conn.commit()
247 logging.info("NOT IMPLEMENTED YET: The destination file with no corresponding source file has been deleted")
248 iter_dest+=1
249
250file_list.close()
get_file_xrdsum(file_path, xrootd_url)
get_file_xrdsize(file_path, xrootd_url)
cp_file_xrd(source_file_path, destination_file_path, source_storage_url, destination_storage_url)