-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecord_generator.py
104 lines (95 loc) · 3.44 KB
/
record_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from xml.sax import make_parser
from xml.sax.handler import feature_namespaces
from pymarc import XmlHandler, MARCReader, JSONReader
from queue import Queue
from threading import Thread
import tarfile
import sys
#
# Creates a record generator from a marc file in
# binary MARC, MARC-XML, or MARC-JSON format
#
class PymarcRecordGenerator(XmlHandler):
def __init__(self, file_name, on_error=None, timeout=1000):
self.on_error = on_error
self.call_back = None
self.q = Queue()
self.job_done = object()
self.timeout = timeout
self._record = None
self._field = None
self._subfield_code = None
self._text = []
self._strict = False
self.normalize_form = None
self.file_name = file_name
self.xml_source = False
def __enter__(self):
self.open()
return self
def open(self):
self.read_fh = self.fh = open(self.file_name, 'rb')
self.xml_source = False
if self.file_name.endswith('xml') or self.file_name.endswith('gz'):
self.xml_source = True
if self.file_name.endswith('gz'):
tar = tarfile.open(fileobj=self.fh, mode='r:gz')
for member in tar.getmembers():
f = tar.extractfile(member)
if f is not None:
self.read_fh = f
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
def close(self):
self.fh.close()
def process_record(self, record):
self.q.put(record)
self.q.put('dummy', True, self.timeout)
self.q.join()
def parse_handler(self):
parser = make_parser()
parser.setContentHandler(self)
parser.setFeature(feature_namespaces, 1)
try:
parser.parse(self.read_fh)
except Exception as e:
self.q.put(e)
self.q.put('dummy', True, self.timeout)
finally:
self.q.put(self.job_done)
def records(self):
if self.xml_source:
parser = make_parser()
parser.setContentHandler(self)
parser.setFeature(feature_namespaces, 1)
Thread(target=self.parse_handler, daemon=True).start()
while True:
next_item = self.q.get(True, self.timeout)
self.q.task_done()
if next_item is self.job_done:
break
if isinstance(next_item, Exception):
if self.on_error:
self.on_error(next_item)
break
else:
sys.stderr.write(f'{next_item}\n')
else:
yield next_item
self.q.get()
self.q.task_done()
else:
if self.file_name.endswith('json'):
marc_reader = JSONReader(self.read_fh)
else:
marc_reader = MARCReader(self.read_fh, utf8_handling='ignore')
for record in marc_reader:
if record:
yield record
else:
if self.on_error:
self.on_error(marc_reader.current_exception)
else:
sys.stderr.write(
f'Chunk was ignored because the following exception raised: {marc_reader.current_exception}\n'
)