-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindexer.py
108 lines (84 loc) · 3.53 KB
/
indexer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from elasticsearch_dsl import Index, Document, Integer, Text, analyzer, Keyword, Double
from elasticsearch_dsl.connections import connections
from elasticsearch import Elasticsearch, helpers
from evaluation import get_relevance_label_df
from datetime import datetime
from tqdm import tqdm
import logging
import json
import os
class QA(Document):
id = Integer()
question = Text()
answer = Text()
question_answer = Text()
def ingest_data(data, es, index):
""" Ingest data as a bulk of documents to ES index """
try:
docs = []
for pair in tqdm(data):
# initialize QA document
doc = QA()
if 'id' in pair:
doc.id = pair['id']
if 'question' in pair:
doc.question = pair['question']
if 'answer' in pair:
doc.answer = pair['answer']
if 'question' in pair and 'answer' in pair:
doc.question_answer = pair['question'] + " " + pair['answer']
docs.append(doc.to_dict(include_meta=False))
# bulk indexing
response = helpers.bulk(es, actions=docs, index=index, doc_type='doc')
except Exception:
logging.error('exception occured', exc_info=True)
def get_faq_qa_pairs(query_answer_pairs_filepath):
""" Get faq qa pair list """
relevance_label_df = get_relevance_label_df(query_answer_pairs_filepath)
faq_qa_pair_df = relevance_label_df[relevance_label_df['query_type'] == 'faq']
faq_qa_pairs = faq_qa_pair_df.T.to_dict().values()
faq_qa_pairs = list(faq_qa_pairs)
return faq_qa_pairs
if __name__ == "__main__":
try:
# Ingesting data to Elasticsearch
es = connections.create_connection(hosts=['localhost'], http_auth=('elastic', 'elastic'))
dirnames = ["CovidFAQ", "FAQIR", "StackFAQ"]
index_name = ""
faq_qa_pairs = []
now = datetime.now()
date = now.strftime("%Y-%m-%d")
for dirname in dirnames:
if dirname == "CovidFAQ":
index_name = "covidfaq_" + date
filepath = 'data/' + dirname + '/query_answer_pairs.json'
faq_qa_pairs = get_faq_qa_pairs(filepath)
elif dirname == "FAQIR":
index_name = "faqir_" + date
filepath = 'data/' + dirname + '/query_answer_pairs.json'
faq_qa_pairs = get_faq_qa_pairs(filepath)
elif dirname == "StackFAQ":
index_name = "stackfaq_" + date
filepath = 'data/' + dirname + '/query_answer_pairs.json'
faq_qa_pairs = get_faq_qa_pairs(filepath)
else:
raise ValueError("error, directory not exists")
print("{} records: ".format(dirname), len(faq_qa_pairs))
# Initialize index (only perform once)
index = Index(index_name)
# Define custom settings
index.settings(
number_of_shards=1,
number_of_replicas=0
)
# Delete the index, ignore if it doesn't exist
index.delete(ignore=404)
# Create the index in Elasticsearch
index.create()
# Register a document with the index
index.document(QA)
# Ingest data to Elasticsearch
ingest_data(faq_qa_pairs, es, index_name)
print("Finished indexing {} records to {} index".format(len(faq_qa_pairs), index_name))
except Exception:
logging.error('exception occured', exc_info=True)