-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathextract_record_identifiers.py
295 lines (246 loc) · 12.5 KB
/
extract_record_identifiers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import argparse
import libraries.handle_file
import libraries.record
import logging
import logging.config
import os
import xml.etree.ElementTree as ET
from csv import writer
from datetime import datetime
logger = logging.getLogger(__name__)
def init_argparse() -> argparse.ArgumentParser:
"""Initializes and returns ArgumentParser object."""
parser = argparse.ArgumentParser(
usage=('%(prog)s [option] directory_with_xml_files '
'[alma_records_with_current_oclc_num]'),
description=('For each XML file in the directory, extract the MMS ID '
'and OCLC Number(s) from each Alma record and append them to '
'the appropriate outputs/extract_record_identifiers/'
'master_list_records CSV file.')
)
parser.add_argument(
'-v', '--version', action='version',
version=f'{parser.prog} version 1.0.0'
)
parser.add_argument(
'directory_with_xml_files',
type=str,
help='the path to the directory containing the XML files to process'
)
parser.add_argument(
'alma_records_with_current_oclc_num',
nargs='?',
type=str,
help=('the name and path of the CSV file containing the MMS IDs of '
'all Alma records with a current OCLC number (e.g. inputs/extract_'
'record_identifiers/alma_records_with_current_oclc_num.csv)')
)
return parser
def main() -> None:
"""Extracts the MMS ID and OCLC Number(s) from each record of each XML file.
For each XML file in the specified directory, the MMS ID and OCLC Number(s)
from each Alma record are extracted and appended to the appropriate
outputs/extract_record_identifiers/master_list_records CSV file:
- If an error is encountered, then the record is added to:
outputs/extract_record_identifiers/master_list_records_with_errors.csv
- If the record's MMS ID appears in the optional
alma_records_with_current_oclc_num input file, then the record is added
to: outputs/extract_record_identifiers/
master_list_records_with_current_oclc_num.csv
- Otherwise, the record is added to: outputs/extract_record_identifiers/
master_list_records_with_potentially_old_oclc_num.csv
- If any of the above output files already exists in the directory, then it
is appended to (not overwritten).
"""
start_time = datetime.now()
# Initialize parser and parse command-line args
parser = init_argparse()
args = parser.parse_args()
# Configure logging
logging.config.fileConfig(
'logging.conf',
defaults={'log_filename': f'logs/extract_record_identifiers_'
f'{start_time.strftime("%Y-%m-%d_%H-%M-%S")}.log'},
disable_existing_loggers=False)
directory_with_xml_files = args.directory_with_xml_files.rstrip('/')
command_line_args_str = (f'command-line args:\n'
f'directory_with_xml_files = {directory_with_xml_files}\n'
f'alma_records_with_current_oclc_num = '
f'{args.alma_records_with_current_oclc_num}')
logger.info(f'Started {parser.prog} script with {command_line_args_str}\n')
# Create sets
mms_ids_already_processed = set()
# Populate alma_records_with_current_oclc_num set from input file.
# If there is no input file, then this set will remain empty.
alma_records_with_current_oclc_num = set()
libraries.handle_file.csv_column_to_set(
args.alma_records_with_current_oclc_num,
alma_records_with_current_oclc_num,
0,
True)
# logger.debug(f'{alma_records_with_current_oclc_num=}')
logger.debug(f'{len(alma_records_with_current_oclc_num)=}\n')
with open('outputs/extract_record_identifiers/master_list_records_with_'
'current_oclc_num.csv', mode='a',
newline='') as records_with_current_oclc_num, \
open('outputs/extract_record_identifiers/master_list_records_with_'
'potentially_old_oclc_num.csv', mode='a',
newline='') as records_with_potentially_old_oclc_num, \
open('outputs/extract_record_identifiers/master_list_records_with_'
'errors.csv', mode='a',
newline='') as records_with_errors:
records_with_current_oclc_num_writer = \
writer(records_with_current_oclc_num)
records_with_potentially_old_oclc_num_writer = \
writer(records_with_potentially_old_oclc_num)
records_with_errors_writer = writer(records_with_errors)
all_oclc_nums_col_heading = (f"All OCLC Numbers from Alma Record's "
f"035 $a [for this column and the previous one, "
f"{libraries.record.subfield_a_disclaimer}]")
# Check every XML file in directory
logger.debug(f'Started checking directory: {directory_with_xml_files}'
f'\n')
for file in os.listdir(directory_with_xml_files):
if not file.endswith('.xml'):
logger.warning(f'Not an XML file: {file}\n')
continue
logger.debug(f'Started processing file: {file}\n')
# Get root element of XML file
root = ET.parse(f'{directory_with_xml_files}/{file}').getroot()
# Iterate over each record element
for record_element in root.findall('record'):
# Extract MMS ID from 001 field
mms_id = record_element.find('./controlfield[@tag="001"]').text
# Check if MMS ID is a member of mms_ids_already_processed set
if mms_id in mms_ids_already_processed:
logger.debug(f'{mms_id} has already been processed\n')
continue
logger.debug(f'Started processing MMS ID {mms_id}')
# Add MMS ID to mms_ids_already_processed set
mms_ids_already_processed.add(mms_id)
# Iterate over each 035 $a field and add OCLC numbers to list
# and set
all_oclc_nums_from_record = list()
unique_oclc_nums_from_record = set()
found_error_in_record = False
error_msg = None
for field_035_element_index, field_035_element in enumerate(
record_element.findall('./datafield[@tag="035"]')):
# Extract subfield a (which would contain the OCLC number
# if present)
subfield_a_data = \
libraries.record.get_subfield_a_with_oclc_num(
field_035_element,
field_035_element_index)
# Add or append to error message
if subfield_a_data.error_msg is not None:
if error_msg is None:
error_msg = subfield_a_data.error_msg
else:
error_msg += '. ' + subfield_a_data.error_msg
if subfield_a_data.string_with_oclc_num is None:
# This 035 field either has no subfield $a or its first
# subfield $a does not contain an OCLC number. So skip
# it.
continue
(subfield_a_without_oclc_org_code_prefix,
extracted_oclc_num,
found_valid_oclc_prefix,
found_valid_oclc_num,
found_error_in_record) = \
libraries.record.extract_oclc_num_from_subfield_a(
subfield_a_data.string_with_oclc_num,
field_035_element_index,
found_error_in_record)
all_oclc_nums_from_record.append(
subfield_a_without_oclc_org_code_prefix)
unique_oclc_nums_from_record.add(extracted_oclc_num)
unique_oclc_nums_from_record_len = \
len(unique_oclc_nums_from_record)
unique_oclc_nums_from_record_str = None
all_oclc_nums_from_record_str = (
'<none>' if len(all_oclc_nums_from_record) == 0
else ', '.join(all_oclc_nums_from_record))
logger.debug(f'{unique_oclc_nums_from_record=}')
logger.debug(f'{all_oclc_nums_from_record=}')
if unique_oclc_nums_from_record_len == 0:
unique_oclc_nums_from_record_str = '<none>'
logger.debug(f'{mms_id} has no OCLC number in an 035 '
f'$a field')
found_error_in_record = True
elif unique_oclc_nums_from_record_len == 1:
unique_oclc_nums_from_record_str = \
next(iter(unique_oclc_nums_from_record))
if found_error_in_record:
logger.debug(f'{mms_id} has at least one invalid '
f'OCLC number: '
f'{unique_oclc_nums_from_record_str}')
else:
# unique_oclc_nums_from_record_len > 1
unique_oclc_nums_from_record_str = \
', '.join(unique_oclc_nums_from_record)
logger.debug(f'{mms_id} has multiple OCLC numbers: '
f'{unique_oclc_nums_from_record_str}')
found_error_in_record = True
if found_error_in_record:
# Add record to records_with_errors spreadsheet
if records_with_errors.tell() == 0:
# Write header row
records_with_errors_writer.writerow([
'MMS ID',
"Unique OCLC Number(s) from Alma Record's 035 $a",
all_oclc_nums_col_heading,
'Error'
])
records_with_errors_writer.writerow([
mms_id,
unique_oclc_nums_from_record_str,
all_oclc_nums_from_record_str,
error_msg
])
elif mms_id in alma_records_with_current_oclc_num:
logger.debug(f'{mms_id} has current OCLC number')
# Add record to records_with_current_oclc_num spreadsheet
if records_with_current_oclc_num.tell() == 0:
# Write header row
records_with_current_oclc_num_writer.writerow([
'MMS ID',
("Current OCLC Number (Unique OCLC Number from "
"Alma Record's 035 $a)"),
all_oclc_nums_col_heading,
'Warning'
])
records_with_current_oclc_num_writer.writerow([
mms_id,
unique_oclc_nums_from_record_str,
all_oclc_nums_from_record_str,
error_msg
])
else:
logger.debug(f'{mms_id} has a potentially-old OCLC number')
# Add record to records_with_current_oclc_num spreadsheet
if records_with_potentially_old_oclc_num.tell() == 0:
# Write header row
records_with_potentially_old_oclc_num_writer.writerow([
'MMS ID',
"Unique OCLC Number from Alma Record's 035 $a",
all_oclc_nums_col_heading,
'Warning'
])
records_with_potentially_old_oclc_num_writer.writerow([
mms_id,
unique_oclc_nums_from_record_str,
all_oclc_nums_from_record_str,
error_msg
])
logger.debug(f'Finished processing MMS ID {mms_id}\n')
logger.debug(f'Finished processing file: {file}\n')
logger.debug(f'Finished checking directory: {directory_with_xml_files}'
f'\n')
# logger.debug(f'{mms_ids_already_processed=}\n')
logger.debug(f'{len(mms_ids_already_processed)=}\n')
logger.info(f'Finished {parser.prog} script with {command_line_args_str}\n')
logger.info(f'Script completed in: {datetime.now() - start_time} '
f'(hours:minutes:seconds.microseconds)')
if __name__ == "__main__":
main()