forked from kobotoolbox/kpi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
query_parser.py
316 lines (254 loc) · 10.8 KB
/
query_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# coding: utf-8
import operator
import re
from collections import defaultdict
from distutils import util
from functools import reduce
from django.conf import settings
from django.db.models import Q
from django_request_cache import cache_for_request
from kpi.exceptions import (
SearchQueryTooShortException,
QueryParserBadSyntax,
QueryParserNotSupportedFieldLookup,
)
from .canopy_autogenerated_parser import parse as grammar_parse
"""
This is a utility for parsing a Boolean, Whoosh-like query string and
translating it into a Django Q object, which can then be used to filter a
queryset in the ORM.
Syntax examples:
* `name:term` returns any object whose `name` field exactly matches
`term` (case sensitive)
* `owner__username=meg` traverses the `owner` relationship, returning
any object whose `owner` field matches an object whose `username` field
exactly matches `meg` (case sensitive)
* `color:orange NOT (type:fruit OR type:chair)` returns anything
whose color is orange so long as it is not a fruit or a chair. AND, OR,
and NOT operators are supported. They must be in ALL CAPS.
Special notes:
* The value `null` in a query is translated to `None`, e.g. `parent:null`
effectively becomes the ORM `filter(parent=None)`
"""
class QueryParseActions:
"""
Actions for the parser to take when it encounters certain identifiers
(see the file grammar.peg)
"""
RESERVED_KEYWORDS = [
'contains',
'icontains',
'exact',
'iexact',
]
def __init__(self, default_field_lookups: list, min_search_characters: int):
self.default_field_lookups = default_field_lookups
self.min_search_characters = min_search_characters
self.has_term_with_sufficient_length = False
@staticmethod
def process_value(field, value):
# If all we're doing when we have a type mismatch with a field
# is returning an empty set, then we don't need to do type validation.
# Django compares between field values and string versions just fine.
# But there's no magic string for null, so we're adding one.
# TODO: Use Django or DRF machinery (or JSON parsing?) to handle types
# that need special treatment, like dates
# Handle None value
if value == 'null':
return None
# Handle booleans - necessary when querying inside JSONBFields, and
# also some other contexts: see `get_parsed_parameters()`
try:
lower_value = value.lower()
except AttributeError:
pass
else:
if lower_value in ['true', 'false']:
return bool(util.strtobool(lower_value))
return value
@staticmethod
def query(text, a, b, elements):
exp = elements[1]
if hasattr(exp, 'text') and exp.text == '':
# Handle the empty query case with an empty Q object, returning all
return Q()
else:
# fallthrough
return exp
@staticmethod
def orexp(text, a, b, elements):
# fallthrough if singular
if elements[1].text == '':
return elements[0]
# else, combine full sequence of ORs into flattened expression
else:
# Start with the first Q object
orgroup = elements[0]
# Loop through the repeated clauses and OR the subexpressions.
for clause in elements[1].elements:
orgroup |= clause.expr
return orgroup
@staticmethod
def andexp(text, a, b, elements):
# fallthrough if singular
if elements[1].text == '':
return elements[0]
# else, combine full sequence of ANDs into flattened expression
else:
# Start with the first Q object
andgroup = elements[0]
# Loop through the repeated clauses and AND the subexpressions.
for clause in elements[1].elements:
andgroup &= clause.expr
return andgroup
@staticmethod
def parenexp(text, a, b, elements):
# fallthrough to subexpression
exp = elements[2]
return exp
@staticmethod
def notexp(text, a, b, elements):
# negate subexpression (Q object)
exp = elements[2]
return ~exp
def term(self, text, a, b, elements):
def _get_value(_field, _elements):
# A search term by itself without a specified field
_value = _elements[1]
# the `field` value is not used in `process_value()`
return self.process_value(_field, _value)
if elements[0].text == '':
value = _get_value('', elements)
if len(value) >= self.min_search_characters:
# Note that at least one term meets the minimum length
# requirement
self.has_term_with_sufficient_length = True
# Since no field was specified, apply the search term to all
# default fields
q_list = [
Q(**{field: value}) for field in self.default_field_lookups
]
# Join all the default field queries together with boolean OR
return reduce(operator.or_, q_list)
else:
# Terms with named fields are exempt from the minimum length
# requirement
self.has_term_with_sufficient_length = True
# A field+colon, and a value [[field,':'],value]
field = elements[0].elements[0]
# Bypass `status` field because it does not really exist.
# It's only a property of Asset model.
if field == 'status':
return Q()
# Trivial case, `field` is not a list inside a jsonb field.
if not re.search(r'(\[\]$|\[\]__.+)', field):
value = _get_value(field, elements)
return Q(**{field: value})
value = _get_value('', elements)
field, parts = field.split('[]')
if not value: # search for an empty list
return Q(**{f'{field}': []})
if not parts: # search within a list for an exact match
return Q(**{f'{field}': [value]})
dict_key, *field_lookup = parts.strip('__').split('__')
if dict_key in self.RESERVED_KEYWORDS:
if field_lookup:
raise QueryParserBadSyntax
return self.get_q_for_list(
field, field_lookup=dict_key, value=value
)
return self.get_q_for_list(
field, field_lookup=field_lookup, value=value, dict_key=dict_key
)
def get_q_for_list(self, field, field_lookup, value, dict_key=''):
if dict_key:
field_lookup = field_lookup[0] if field_lookup else None
if field_lookup == 'exact':
q_list = [
Q(**{f'{field}__contains': [{dict_key: value}]}),
Q(**{f'{field}__1__isnull': True}),
]
return reduce(operator.and_, q_list)
if not field_lookup or field_lookup == 'contains':
if not value: # search an empty value, same behaviour as an empty list
return Q(**{f'{field}': []})
# contains: List should contain at least one dict which
# equals `value`, e.g. '{"value": "CAN"}' should match one of
# the items inside:
# [
# {"value": "CAN", "label": "Canada"},
# {"value": "CAN", "label": "USA"}
# ]
return Q(**{f'{field}__contains': [{dict_key: value}]})
else:
if field_lookup == 'iexact':
# List should contain only one item which equals `value`
# but it is case-insensitive.
q_list = [
Q(**{f'{field}__icontains': value}),
Q(**{f'{field}__1__isnull': True}),
]
return reduce(operator.and_, q_list)
if field_lookup in ['icontains', 'contains']:
# icontains: List should contain at least one item which
# matches part of `value`, e.g. "english" value will
# work with "English (en)" in DB
# contains: List should contain at least one item which
# equals `value`, e.g. "English" value will not work
# with "English (en)" in DB. `value` must be "English (en)"
return Q(**{f'{field}__{field_lookup}': value})
raise QueryParserNotSupportedFieldLookup
@staticmethod
def word(text, a, b, elements):
return text[a:b]
@staticmethod
def string(text, a, b, elements):
return text[a+1:b-1]
@staticmethod
def name(text, a, b, elements):
return text[a:b]
def get_parsed_parameters(parsed_query: Q) -> dict:
"""
NOTE: this is a hack that does not respect boolean logic.
Returns a dictionary of all parameters detected in the query and their
values. Values are always returned as list even if there is only one value
found.
For example:
`q=parent_uid:foo AND asset_type:survey OR asset_type:block` returns
{'parent_uid':['foo'], 'asset_type': ['survey', 'block']}
"""
parameters = defaultdict(list)
for child in parsed_query.__dict__['children']:
if isinstance(child, Q):
parameters.update(get_parsed_parameters(child))
continue
parameters[child[0]].append(child[1])
# Cast to `dict` to be able to raise KeyError when accessing a non-existing
# member of returned value
return dict(parameters)
@cache_for_request
def parse(
query: str,
default_field_lookups: list,
min_search_characters: int = None,
) -> Q:
"""
Parse a Boolean query string into a Django Q object.
If no field is specified in the query, `default_field_lookups` is assumed.
For example, if `default_field_lookups` is a list containing
`summary__icontains` and `name__icontains`, then the query `term` returns
any object whose `summary` or `name` field contains `term` (case
insensitive)
"""
if not min_search_characters:
min_search_characters = settings.MINIMUM_DEFAULT_SEARCH_CHARACTERS
actions = QueryParseActions(default_field_lookups, min_search_characters)
q_object = grammar_parse(query, actions)
if not actions.has_term_with_sufficient_length:
# If *no* search term is at least `min_search_characters` long and no
# search term specifies a field explicitly, abort the search to avoid
# placing an undue strain on the server. See
# https://github.com/kobotoolbox/kpi/pull/2830 and
# https://github.com/kobotoolbox/kpi/issues/3483
raise SearchQueryTooShortException()
return q_object