BmnRoot
Loading...
Searching...
No Matches
catalog_raw_data.py
Go to the documentation of this file.
1# This python script is developed for matching raw file metadata
2# with the File Catalogue database for storages: DDC/DAQ, NCX or CICC/MICC
3import os
4import glob
5import json
6import logging
7import argparse
8import psycopg2
9import function_set
10
11logging.getLogger().setLevel(logging.INFO)
12
13def argument_parser():
14 parser = argparse.ArgumentParser(
15 description='Script for matching raw file metadata with the File Catalogue\
16 For more information run with --help option'
17 )
18 parser.add_argument(
19 '--run', '-r',
20 nargs=1,
21 type=int,
22 help='Run number',
23 default=8,
24 choices=[8],
25 required=True
26 )
27 parser.add_argument(
28 '--storage', '-s',
29 nargs=1,
30 type=str,
31 choices=['ddc','ncx','cicc'],
32 help='Name of the storage to write raw file metadata to the File Catalogue',
33 required=True
34 )
35
36 parser.add_argument(
37 '--delete', '-del',
38 action='store_true',
39 help='Remove file metadata in the File Catalogue if there is no corresponding file on the storage'
40 )
41
42 args = parser.parse_args()
43
44 run_number, storage_name, force_delete = args.run, args.storage, args.delete
45
46 return run_number[0],storage_name[0],force_delete
47
48
49run_number, storage_name, force_delete = argument_parser()
50logging.info("Matching raw file metadata with the File Catalogue started for '%s' storage" % (storage_name))
51
52# load database configuration from JSON
53config = json.load(open("file_catalogue.json"))
54
55# connect to the file database
56try:
57 conn = psycopg2.connect(("dbname=%s user=%s host=%s password=%s") % (config["db_name"], config["db_user"], config["db_host"], config["db_pass"]))
58except Exception as e:
59 logging.error("ERROR: Database connection failed: %s" % (e))
60 exit(1)
61cursor = conn.cursor()
62
63# get storage information
64cursor.execute("select rs.run_storage_id, rs.storage_path, s.xrootd_url "
65 "from run_storage rs join storage_ s on rs.storage_name = s.storage_name "
66 "where rs.run_number=%d and rs.storage_name='%s'" % (run_number, storage_name))
67conn.commit()
68run_storage_id, storage_path, xrootd_url = cursor.fetchone()
69if storage_path is None:
70 logging.error("ERROR: No storage information (%s) found in the database for Run%s" % (storage_name, run_number))
71 exit(2)
72
73# get a list of the raw files in the File Catalogue
74cursor.execute("select fc.file_guid, fc.file_path, fc.file_size, fc.file_checksum, 0 "
75 "from file_catalogue fc join run_storage rs on fc.run_storage_id = rs.run_storage_id "
76 "where rs.run_storage_id=%d" % (run_storage_id))
77conn.commit()
78catalogue_files = cursor.fetchall()
79catalogue_matches = []
80catalogue_matches = [0 for i in range(len(catalogue_files))]
81
82# get full file list in the storage directory
83#result = [os.path.join(dp, f) for dp, dn, filenames in os.walk(watch_dir) for f in filenames if os.path.splitext(f)[1] == '.data']
84all_files = glob.glob(storage_path + '/**/*', recursive=True)
85#print(list(all_files))
86
87#cursor.execute("select file_path from operation_info where file_path = ANY(%s);", [[10, 20]])
88#cursor.execute(cursor.mogrify("select file_path from operation_info where file_path in %s", [tuple([10, 20])]))
89# cycle for all files in the input directory
90for cur_file in all_files:
91 logging.debug("Current file '%s'" % (cur_file))
92
93 # searching for a file in the File Catalogue with the same path
94 i = 0
95 for file_record in catalogue_files:
96 catalogue_file_guid, catalogue_file_path, catalogue_file_size, catalogue_file_checksum, catalogue_file_matched = file_record
97 if catalogue_file_path == cur_file:
98 catalogue_matches[i] = 1
99 break
100 i+=1
101 # if no corresponding file in the File Catalogue
102 else:
103 file_record = None
104
105 if file_record is not None:
106 logging.debug('The current file exists')
107 catalogue_file_guid, catalogue_file_path, catalogue_file_size, catalogue_file_checksum, catalogue_file_matched = file_record
108
109 if catalogue_file_size is None:
110 try:
111 file_size = os.path.getsize(cur_file)
112 cursor.execute("update file_catalogue set file_size=%d where file_guid=%d" % (file_size, catalogue_file_guid))
113 conn.commit()
114 logging.debug('The size of the file has been updated in the File Catalogue')
115 except OSError as error:
116 logging.error("ERROR: File is not accessible to get its size: %s" % (cur_file))
117
118 if catalogue_file_checksum is None:
119 file_checksum = ''
120 if xrootd_url is None:
121 file_checksum = function_set.get_file_adler32c(cur_file)
122 else:
123 file_checksum = function_set.get_file_xrdsum(cur_file, xrootd_url)
124 if (file_checksum != ''):
125 cursor.execute("update file_catalogue set file_checksum=%s where file_guid=%d" % (file_checksum, catalogue_file_guid))
126 conn.commit()
127 logging.debug('The checksum of the file has been updated in the File Catalogue')
128 else:
129 logging.info("Adding file '%s' to the File Catalogue" % (cur_file))
130 try:
131 file_size = os.path.getsize(cur_file)
132 except OSError as error:
133 logging.error("ERROR: File is not accessible to get its size: %s" % (cur_file))
134 continue;
135 file_checksum = ''
136 if xrootd_url is None:
137 file_checksum = function_set.get_file_adler32c(cur_file)
138 else:
139 file_checksum = function_set.get_file_xrdsum(cur_file, xrootd_url)
140 if (file_checksum == ''):
141 continue;
142
143 logging.debug('Inserting a new file record with file path = "%s", file size = %d, file_checksum = %s' % (cur_file, file_size, file_checksum))
144 cursor.execute("insert into file_catalogue(run_storage_id, file_path, file_size, file_checksum) "
145 "values (%d, '%s', %d, '%s')" % (run_storage_id, cur_file, file_size, file_checksum))
146 conn.commit()
147 logging.info("Information on the current file has been successfullly added\n")
148
149# searching for records in the File Catalogue that have no pair on the file storage
150i = 0
151for file_record in catalogue_files:
152 catalogue_file_guid, catalogue_file_path, catalogue_file_size, catalogue_file_checksum, catalogue_file_matched = file_record
153 if catalogue_matches[i] == 0:
154 logging.warning("WARNING: The record in the File Catalogue ('%s') has no corresponding file" % (catalogue_file_path))
155 if force_delete:
156 cursor.execute("delete from file_catalogue where run_storage_id = %d and file_path = '%s'" % (run_storage_id, catalogue_file_path))
157 conn.commit()
158 logging.info("The record on the absent file in the File Catalogue has been deleted")
159 i+=1
160
161cursor.close()
162conn.close()
163logging.info("Matching raw file metadata with the File Catalogue finished for '%s' storage" % (storage_name))
get_file_xrdsum(file_path, xrootd_url)
get_file_adler32c(file_path)