-
Notifications
You must be signed in to change notification settings - Fork 2
/
wikidata_static_formula_extraction.py
168 lines (121 loc) · 4.91 KB
/
wikidata_static_formula_extraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#from annomathtex.annomathtex.recommendation.math_sparql import MathSparql
#f = MathSparql().all_formulae_search()
#print(f)
from SPARQLWrapper import SPARQLWrapper, JSON
import re
import os
import json
formula_query = """
SELECT ?item ?itemLabel ?formula ?itemDescription ?identifier ?identifierLabel ?identifierDescription WHERE {
?item wdt:P2534 ?formula.
#OPTIONAL{?identifier wdt:416 ?symbol .}
OPTIONAL{?item wdt:P527 ?identifier .}
SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }
}
LIMIT 5
"""
identifier_query = """
SELECT ?item ?itemLabel ?itemDescription ?identifier WHERE {
?item wdt:P416 ?identifier.
SERVICE wikibase:label { bd:serviceParam wikibase:language "en". }
}
#LIMIT 3
"""
class Sparql(object):
def __init__(self):
# Used to access the wikidata query service API
self.sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
self.q = formula_query
def query(self, q):
try:
self.sparql.setQuery(q)
self.sparql.setReturnFormat(JSON)
query_results = self.sparql.query().convert()
results = query_results['results']['bindings']
except Exception as e:
print(e)
results = []
return results
def process_identifiers(self):
results = self.query(identifier_query)
identifier_dict = {}
for i in results:
#print(i)
qid = i['item']['value']
try:
qid = re.findall(r'[Q|P][0-9]+$', qid)[0]
except Exception as e:
print(e)
print(i)
name = i['itemLabel']['value']
string = i['identifier']['value']
identifier_dict[qid] = {'name': name, 'string': string}
return identifier_dict
def process_formulae(self):
"""
This method executes all the queries that may be sent to the wikidata query service API. The results are
cleaned and returned as a list of dictionaries.
:param query_string: The query that is being used.
:param search_string: The string that is being searched for through the wikidata query service API.
:param limit: The limit for the number of results.
:return: A list of dictionaries, where each dictionary is one result from the search.
"""
formulae = self.query(formula_query)
identifiers = self.process_identifiers()
#print(formulae)
#print(identifiers)
#print(identifiers['Q173817'])
formula_concepts = {}
for i, r in enumerate(formulae):
if 'itemDescription' in r:
item_description = r['itemDescription']['value']
url = r['item']['value']
qid = url.split('/')[-1]
item_label = r['itemLabel']['value']
print(item_label)
formula = r['formula']
mathML = formula['value']
try:
tex = re.findall(r'(?<=alttext=\"{\\displaystyle ).*?(?=}\">)', mathML)[0]
except:
tex = ''
identifier = None
if 'identifier' in r:
identifier_value = r['identifier']['value']
identifier_qid = re.findall(r'[Q|P][0-9]+$', identifier_value)[0]
print(identifier_value)
print(identifier_qid)
try:
identifier = identifiers[identifier_qid]
print(identifier)
except KeyError as _:
continue
if item_label not in formula_concepts:
if identifier:
identifiers_dict = {'names': [identifier['name']], 'strings': [identifier['string']]}
else:
identifiers_dict = {'names': [], 'strings': []}
formula_concepts[item_label] = {
'qid': qid,
'formula': tex,
#'identifiers': [identifier] if identifier else []
'identifiers': identifiers_dict
}
else:
if identifier:
try:
existing_identifiers = formula_concepts[item_label]['identifiers']
if identifier['name'] not in existing_identifiers['names']:
existing_identifiers['names'].append(identifier['name'])
if identifier['string'] not in existing_identifiers['strings']:
existing_identifiers['strings'].append(identifier['string'])
except Exception as e:
print(e)
return formula_concepts
s = Sparql()
f = s.process_formulae()
#for k in f:
# print(k, f[k])
#path = os.path.join(os.getcwd(), 'annomathtex', 'annomathtex', 'recommendation', 'evaluation_files', 'wikidata_formulae.json')
#with open(path, 'w') as outfile:
# json.dump(f, outfile)