-
Notifications
You must be signed in to change notification settings - Fork 5
/
corpus.py
336 lines (302 loc) · 11.3 KB
/
corpus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# -*- coding: utf-8 -*-
# @创建时间 : 15/3/2019
# @作者 : worry1613([email protected])
# GitHub : https://github.com/worry1613
# @CSDN : http://blog.csdn.net/worryabout/
import copy
import random
from util import q_to_b
from optparse import OptionParser
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
class Corpus:
_maps = {u't': u'TIME', # 时间
u'nr': u'PER', # 人
u'nt': u'ORG', # 组织
u'ns': u'LOC'} # 地点
def __init__(self):
self.lines = []
def pre_process(self, fin):
lines = self.load_corpus(fin)
self.lines = []
for line in lines:
words = [word for word in q_to_b(line.strip()).split(' ') if word] # 全角转半角
if len(words) <= 0:
continue
new_words = self.process_time(words) # 处理时间
new_words = self.process_person(new_words) # 处理人名
new_words = self.process_org(new_words) # 处理组织机构
self.lines.append(new_words)
# self.lines.append(' '.join(new_words))
def save_corpus(self, file_path, data=None):
"""
写语料
"""
d = ''
if data is None:
d = '\n'.join([' '.join(line) for line in self.lines])
f = open(file_path, 'w')
f.write(d)
f.close()
def load_corpus(self, file_path):
"""
读取语料
"""
f = open(file_path, 'r')
lines = f.readlines()
f.close()
return lines
def load_corpus_processed(self, file_path):
"""
读取已经处理完毕语料
"""
f = open(file_path, 'r')
lines = f.readlines()
f.close()
self.lines = [line.strip().split(' ') for line in lines]
return
def process_time(self, words):
"""
处理时间
"""
new_words = []
temp = ''
for k, word in enumerate(words):
if '/t' in word:
temp = temp[:-2] + word
elif temp:
new_words.append(temp)
temp = ''
new_words.append(word)
else:
new_words.append(word)
if temp:
new_words.append(temp)
return new_words
def process_person(self, words):
"""
处理人名
"""
new_words = []
temp = ''
lw = len(words)
index = 0
while index < lw:
word = words[index]
if '/nr' in word and index + 1 < lw and '/nr' in words[index + 1]:
temp = word[:-3] + words[index + 1]
new_words.append(temp)
temp = ''
index = index + 1
else:
new_words.append(word)
index = index + 1
return new_words
def process_org(self, words):
"""
处理组织机构名,[XX XX]
"""
new_words = []
temp = ''
for k, word in enumerate(words):
if '[' in word:
temp = word.split('/')[0][1:]
elif ']' in word and temp:
w = word.split('/')[0]
pos = word.split(']')[1]
temp += w + '/' + pos
new_words.append(temp)
temp = ''
elif temp:
temp += word.split('/')[0]
else:
new_words.append(word)
return new_words
def process_seq(self, words=None, keyfunc=None):
"""
标注数据
"""
if words is None:
words = self.lines
words_seq = [[word.split('/')[0] for word in line] for line in words] # 词
pos_seq = [[word.split('/')[1] for word in line] for line in words] # 词性
tags_seq = [[self._maps.get(p) if p in self._maps else 'O' for p in pos] for pos in pos_seq] # 词标签 ns,nt,t,nr
return keyfunc(words_seq, pos_seq, tags_seq)
# return self.tag_BIO_pos(words_seq,pos_seq,tags_seq)
def split_train(self, ra=.7, tms=1):
"""
切分训练测试数据集
:param ra: 训练数据比例 默认0.7
:param tms: 循环次数 默认1次
:return:
"""
l = len(self.lines)
a = [i for i in range(l)]
train = []
test = []
if tms == 1:
# 只切分一次
testl = random.sample(a, int(l * (1 - ra)))
all_set = set(a)
te_set = set(testl)
tr_set = all_set - te_set
trs = [self.lines[n] for n in tr_set]
tes = [self.lines[n] for n in te_set]
train.append(trs)
test.append(tes)
else:
# 切分多次,交叉验证用
all_set = set(a)
te_set = {}
tr_set = {}
all_te_set = copy.copy(all_set)
for t in range(tms):
if len(all_te_set) > int(l * (1 / tms)):
testl = random.sample(all_te_set, int(l * (1 / tms)) + 1)
else:
testl = all_te_set
te_set = set(testl)
tr_set = all_set - te_set
tes = [self.lines[n] for n in te_set]
trs = [self.lines[n] for n in tr_set]
all_te_set -= te_set
train.append(trs)
test.append(tes)
return train, test
def tag_BIO_pos(self, wordsq, posq, tagsq):
posq = [[[posq[index][i] for _ in range(len(wordsq[index][i]))]
for i in range(len(posq[index]))]
for index in range(len(posq))]
tagsq = [[[self.tag_perform_bio(tagsq[index][i], w) for w in range(len(wordsq[index][i]))]
for i in range(len(tagsq[index]))] for index in range(len(tagsq))]
wq = []
tq = []
pq = []
posq = [[t for p in pos for t in p] for pos in posq]
for pos in posq:
pq.extend(pos + [''])
tagsq = [[t for tag in tags for t in tag] for tags in tagsq]
for tags in tagsq:
tq.extend(tags + [''])
wordsq = [[t for word in words for t in word] for words in wordsq]
for words in wordsq:
wq.extend(words + [''])
lines = ['' if w == p == t == '' else '%s %s %s' % (w, p, t) for w, p, t in zip(wq, pq, tq)]
return lines
def tag_BMEWO_pos(self, wordsq, posq, tagsq):
posq = [[[posq[index][i] for _ in range(len(wordsq[index][i]))]
for i in range(len(posq[index]))]
for index in range(len(posq))]
tagsq = [[[self.tag_perform_bio(tagsq[index][i], w) for w in range(len(wordsq[index][i]))]
for i in range(len(tagsq[index]))] for index in range(len(tagsq))]
wq = []
tq = []
pq = []
posq = [[t for p in pos for t in p] for pos in posq]
for pos in posq:
pq.extend(pos + [''])
tagsq = [[t for tag in tags for t in tag] for tags in tagsq]
for tags in tagsq:
tq.extend(tags + [''])
wordsq = [[t for word in words for t in word] for words in wordsq]
for words in wordsq:
wq.extend(words + [''])
lines = ['' if w == p == t == '' else '%s %s %s' % (w, p, t) for w, p, t in zip(wq, pq, tq)]
return lines
def tag_BIO(self, wordsq, posq, tagsq):
tagsq = [[[self.tag_perform_bio(tagsq[index][i], w) for w in range(len(wordsq[index][i]))]
for i in range(len(tagsq[index]))] for index in range(len(tagsq))]
wq = []
tq = []
tagsq = [[t for tag in tags for t in tag] for tags in tagsq]
for tags in tagsq:
tq.extend(tags + [''])
wordsq = [[t for word in words for t in word] for words in wordsq]
for words in wordsq:
wq.extend(words + [''])
lines = ['' if w == t == '' else '%s %s' % (w, t) for w, t in zip(wq, tq)]
return lines
def tag_BMEWO(self, wordsq, posq, tagsq):
tagsq = [
[[self.tag_perform_bmewo(tagsq[index][i], w, len(wordsq[index][i])) for w in range(len(wordsq[index][i]))]
for i in range(len(tagsq[index]))] for index in range(len(tagsq))]
wq = []
tq = []
tagsq = [[t for tag in tags for t in tag] for tags in tagsq]
for tags in tagsq:
tq.extend(tags + [''])
wordsq = [[t for word in words for t in word] for words in wordsq]
for words in wordsq:
wq.extend(words + [''])
lines = ['' if w == t == '' else '%s %s' % (w, t) for w, t in zip(wq, tq)]
return lines
def tag_perform_bio(self, tag, index):
"""
标签使用BIO模式
"""
if index == 0 and tag != u'O':
return u'B-{}'.format(tag)
elif tag != u'O':
return u'I-{}'.format(tag)
else:
return tag
def tag_perform_bmewo(self, tag, index, mmax):
"""
标签使用BMEWO模式
"""
if index == 0 and tag != u'O':
return u'B-{}'.format(tag)
elif tag != u'O' and 0 < index and index < mmax - 1:
return u'M-{}'.format(tag)
elif tag != u'O' and index == mmax - 1:
return u'E-{}'.format(tag)
else:
return tag
def build_test(self, wordsq, keyfunc):
return self.process_seq(words=wordsq, keyfunc=keyfunc)
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('-i', '--input', type=str, help='原始已经标注语料库文件名', dest='input')
parser.add_option('-c', '--tms', type=int, default=1, help='循环生成训练测试数据集次数,用于交叉验证,最大10', dest='tms')
parser.add_option('-r', '--ratio', type=float, default=0.8, help='训练数据占比,默认0.8', dest='ratio')
parser.add_option('-f', '--dataformat', type=str, default='bio', help='训练数据集成生格式,默认bio,bio,bio_pos,bmewo,bmewo_pos',
dest='dataformat')
options, args = parser.parse_args()
if not options.input:
parser.print_help()
exit()
fin = options.input
fout = options.output
fload_formated = None
tms = options.tms
ratio = options.ratio
dformat = options.dataformat
corpus = Corpus()
fmap = {'bio': corpus.tag_BIO,
'bio_pos': corpus.tag_BIO_pos,
'bmewo': corpus.tag_BMEWO,
'bmewo_pos': corpus.tag_BMEWO_pos
}
if fload_formated:
corpus.load_corpus_processed(file_path=fload_formated)
else:
corpus.pre_process(fin)
# if fout:
# corpus.save_corpus(file_path=fout)
def save(f, d):
fin = open(f, 'w')
fin.write('\n'.join(d))
fin.close()
ra = ratio
tr, te = corpus.split_train(ra, tms)
for i in range(tms):
tr_text = corpus.process_seq(tr[i], fmap.get(dformat))
te_text = corpus.build_test(te[i], fmap.get(dformat))
if tms > 1:
ra = 1 - 1 / tms
tr_file = 'model/train_%s_%s_%s.txt' % (dformat, '%d_%.2f' % (tms, ra,), i)
te_file = 'model/test_%s_%s_%s.txt' % (dformat, '%d_%.2f' % (tms, 1 - ra,), i)
save(tr_file, tr_text)
save(te_file, te_text)
logging.info('生成训练数据集文件:%s' % (tr_file,))
logging.info('生成测试数据集文件:%s' % (te_file,))