-
Notifications
You must be signed in to change notification settings - Fork 99
/
parse_parallel.py
116 lines (107 loc) · 4.83 KB
/
parse_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Implementation of parse.py that supports multiprocess
Main differences are 1) using Pool.starmap in process_largefile and 2) attach to local CoreNLP server in process_largefile.process_document
"""
import datetime
import itertools
import os
from multiprocessing import Pool
from pathlib import Path
from stanfordnlp.server import CoreNLPClient
import global_options
from culture import file_util, preprocess_parallel
def process_largefile(
input_file,
output_file,
input_file_ids,
output_index_file,
function_name,
chunk_size=100,
start_index=None,
):
""" A helper function that transforms an input file + a list of IDs of each line (documents + document_IDs) to two output files (processed documents + processed document IDs) by calling function_name on chunks of the input files. Each document can be decomposed into multiple processed documents (e.g. sentences).
Supports parallel with Pool.
Arguments:
input_file {str or Path} -- path to a text file, each line is a document
ouput_file {str or Path} -- processed linesentence file (remove if exists)
input_file_ids {str]} -- a list of input line ids
output_index_file {str or Path} -- path to the index file of the output
function_name {callable} -- A function that processes a list of strings, list of ids and return a list of processed strings and ids.
chunk_size {int} -- number of lines to process each time, increasing the default may increase performance
start_index {int} -- line number to start from (index starts with 0)
Writes:
Write the ouput_file and output_index_file
"""
try:
if start_index is None:
# if start from the first line, remove existing output file
# else append to existing output file
os.remove(str(output_file))
os.remove(str(output_index_file))
except OSError:
pass
assert file_util.line_counter(input_file) == len(
input_file_ids
), "Make sure the input file has the same number of rows as the input ID file. "
with open(input_file, newline="\n", encoding="utf-8", errors="ignore") as f_in:
line_i = 0
# jump to index
if start_index is not None:
# start at start_index line
for _ in range(start_index):
next(f_in)
input_file_ids = input_file_ids[start_index:]
line_i = start_index
for next_n_lines, next_n_line_ids in zip(
itertools.zip_longest(*[f_in] * chunk_size),
itertools.zip_longest(*[iter(input_file_ids)] * chunk_size),
):
line_i += chunk_size
print(datetime.datetime.now())
print(f"Processing line: {line_i}.")
next_n_lines = list(filter(None.__ne__, next_n_lines))
next_n_line_ids = list(filter(None.__ne__, next_n_line_ids))
output_lines = []
output_line_ids = []
with Pool(global_options.N_CORES) as pool:
for output_line, output_line_id in pool.starmap(
function_name, zip(next_n_lines, next_n_line_ids)
):
output_lines.append(output_line)
output_line_ids.append(output_line_id)
output_lines = "\n".join(output_lines) + "\n"
output_line_ids = "\n".join(output_line_ids) + "\n"
with open(output_file, "a", newline="\n") as f_out:
f_out.write(output_lines)
if output_index_file is not None:
with open(output_index_file, "a", newline="\n") as f_out:
f_out.write(output_line_ids)
if __name__ == "__main__":
with CoreNLPClient(
properties={
"ner.applyFineGrained": "false",
"annotators": "tokenize, ssplit, pos, lemma, ner, depparse",
},
memory=global_options.RAM_CORENLP,
threads=global_options.N_CORES,
timeout=12000000,
endpoint="http://localhost:9002", # change port here and in preprocess_parallel.py if 9002 is occupied
max_char_length=1000000,
) as client:
in_file = Path(global_options.DATA_FOLDER, "input", "documents.txt")
in_file_index = file_util.file_to_list(
Path(global_options.DATA_FOLDER, "input", "document_ids.txt")
)
out_file = Path(
global_options.DATA_FOLDER, "processed", "parsed", "documents.txt"
)
output_index_file = Path(
global_options.DATA_FOLDER, "processed", "parsed", "document_sent_ids.txt"
)
process_largefile(
input_file=in_file,
output_file=out_file,
input_file_ids=in_file_index,
output_index_file=output_index_file,
function_name=preprocess_parallel.process_document,
chunk_size=global_options.PARSE_CHUNK_SIZE,
)