-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathtornado_elasticsearch.py
971 lines (874 loc) · 45.3 KB
/
tornado_elasticsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
"""tornado_elasticsearch extends the official elasticsearch library adding
asynchronous support for the Tornado stack.
See http://elasticsearch-py.readthedocs.org/en/latest/ for information
on how to use the API beyond the introduction for how to use with Tornado::
from tornado import gen
from tornado import web
from tornado_elasticsearch import AsyncElasticsearch
class Info(web.RequestHandler):
@web.asynchronous
@gen.engine
def get(self, *args, **kwargs):
es = AsyncElasticsearch()
info = yield es.info()
self.finish(info)
"""
from elasticsearch.connection.base import Connection
from elasticsearch import exceptions
from elasticsearch.client import Elasticsearch
from elasticsearch.transport import Transport, TransportError
from elasticsearch.client.utils import query_params, _make_path
from tornado import concurrent
from tornado import gen
from tornado import httpclient
import logging
import time
try:
from urllib import urlencode
except ImportError:
from urllib.parse import urlencode
from tornado import version
__version__ = '0.5.0'
LOGGER = logging.getLogger(__name__)
class AsyncHttpConnection(Connection):
"""Add Tornado Asynchronous support to ElasticSearch.
:param str host: The host for the connection
:param int port: The port for the connection
:param str|tuple http_auth: optional http auth information as either a
colon delimited string ``("username:password")`` or
tuple ``(username, password)``
:param int request_timeout: optional default timeout in seconds
:arg use_ssl: use ssl for the connection if ``True``
"""
_auth_user = None
_auth_password = None
_user_agent = 'tornado_elasticsearch %s/Tornado %s' % (__version__, version)
ssl_transport_schema = 'https'
def __init__(self, host='localhost', port=9200, http_auth=None,
use_ssl=False, request_timeout=None, max_clients=10, **kwargs):
super(AsyncHttpConnection, self).__init__(host=host, port=port,
**kwargs)
self._assign_auth_values(http_auth)
self.base_url = '%s://%s:%s%s' % (self.ssl_transport_schema if use_ssl
else self.transport_schema,
host, port, self.url_prefix)
httpclient.AsyncHTTPClient.configure(None, max_clients=max_clients)
self._client = httpclient.AsyncHTTPClient()
self._headers = {'Content-Type': 'application/json; charset=UTF-8'}
self._start_time = None
self.request_timeout = request_timeout
@concurrent.return_future
def perform_request(self, method, url, params=None, body=None,
timeout=None, ignore=(), callback=None):
request_uri = self._request_uri(url, params)
LOGGER.debug('%s, %r, %r', url, body, params)
kwargs = self._request_kwargs(method, body, timeout)
self._start_time = time.time()
def on_response(response):
duration = time.time() - self._start_time
raw_data = response.body.decode('utf-8') \
if response.body is not None else None
LOGGER.info('Response from %s: %s', url, response.code)
if not (200 <= response.code < 300) and \
response.code not in ignore:
LOGGER.debug('Error: %r', raw_data)
self.log_request_fail(method, request_uri, url, body, duration,
response.code)
error = exceptions.HTTP_EXCEPTIONS.get(response.code,
TransportError)
raise error(response.code, raw_data)
self.log_request_success(method, request_uri, url, body,
response.code, raw_data, duration)
callback((response.code, response.headers, raw_data))
LOGGER.debug('Fetching [%s] %s', kwargs['method'], request_uri)
LOGGER.debug('kwargs: %r', kwargs)
self._client.fetch(httpclient.HTTPRequest(request_uri, **kwargs),
callback=on_response)
def _assign_auth_values(self, http_auth):
"""Take the http_auth value and split it into the attributes that
carry the http auth username and password
:param str|tuple http_auth: The http auth value
"""
if not http_auth:
pass
elif isinstance(http_auth, (tuple, list)):
self._auth_user, self._auth_password = http_auth
elif isinstance(http_auth, str):
self._auth_user, self._auth_password = http_auth.split(':')
else:
raise ValueError('HTTP Auth Credentials should be str or '
'tuple, not %s' % type(http_auth))
def _request_kwargs(self, method, body, timeout):
if body and method == 'GET':
method = 'POST'
kwargs = {'method': method, 'user_agent': self._user_agent,
'headers': self._headers}
if self.request_timeout is not None:
kwargs['request_timeout'] = self.request_timeout
if self._auth_user and self._auth_password:
kwargs['auth_username'] = self._auth_user
kwargs['auth_password'] = self._auth_password
if body:
kwargs['body'] = body
if timeout:
kwargs['request_timeout'] = timeout
kwargs['allow_nonstandard_methods'] = True
return kwargs
def _request_uri(self, url, params):
uri = self.url_prefix + url
if params:
uri = '%s?%s' % (uri, urlencode(params or {}))
return '%s%s' % (self.base_url, uri)
class AsyncTransport(Transport):
@gen.coroutine
def perform_request(self, method, url, params=None, body=None):
"""Perform the actual request. Retrieve a connection from the
connection pool, pass all the information to it's perform_request
method and return the data.
If an exception was raised, mark the connection as failed and retry (up
to `max_retries` times).
If the operation was successful and the connection used was previously
marked as dead, mark it as live, resetting it's failure count.
:param method: HTTP method to use
:param url: absolute url (without host) to target
:param params: dictionary of query parameters, will be handed over to
the underlying :class:`~torando_elasticsearch.AsyncHTTPConnection`
class for serialization
:param body: body of the request, will be serialized using serializer
and passed to the connection
"""
if body is not None:
body = self.serializer.dumps(body)
# some clients or environments don't support sending GET with body
if method in ('HEAD', 'GET') and self.send_get_body_as != 'GET':
# send it as post instead
if self.send_get_body_as == 'POST':
method = 'POST'
# or as source parameter
elif self.send_get_body_as == 'source':
if params is None:
params = {}
params['source'] = body
body = None
if body is not None:
try:
body = body.encode('utf-8')
except (UnicodeDecodeError, AttributeError):
# bytes/str - no need to re-encode
pass
ignore = ()
if params and 'ignore' in params:
ignore = params.pop('ignore')
if isinstance(ignore, int):
ignore = (ignore, )
for attempt in range(self.max_retries + 1):
connection = self.get_connection()
try:
result = yield connection.perform_request(method, url,
params, body,
ignore=ignore)
(status, headers, data) = result
except TransportError as e:
retry = False
if isinstance(e, exceptions.ConnectionTimeout):
retry = self.retry_on_timeout
elif isinstance(e, exceptions.ConnectionError):
retry = True
elif e.status_code in self.retry_on_status:
retry = True
if retry:
# only mark as dead if we are retrying
self.mark_dead(connection)
# raise exception on last retry
if attempt == self.max_retries:
raise
else:
raise
else:
# connection didn't fail, confirm it's live status
self.connection_pool.mark_live(connection)
response = self.deserializer.loads(data,
headers.get('content-type')
) if data else None
raise gen.Return((status, response))
class AsyncElasticsearch(Elasticsearch):
"""Extends the official elasticsearch.Elasticsearch object to make the
client invoked methods coroutines.
"""
def __init__(self, hosts=None, **kwargs):
"""Create a new AsyncElasticsearch instance
"""
kwargs['connection_class'] = AsyncHttpConnection
kwargs['transport_class'] = AsyncTransport
super(AsyncElasticsearch, self).__init__(hosts, **kwargs)
@gen.coroutine
@query_params()
def ping(self, params=None):
""" Returns True if the cluster is up, False otherwise. """
try:
self.transport.perform_request('HEAD', '/', params=params)
except TransportError:
raise gen.Return(False)
raise gen.Return(True)
@gen.coroutine
@query_params()
def info(self, params=None):
"""Get the basic info from the current cluster.
:rtype: dict
"""
_, data = yield self.transport.perform_request('GET', '/',
params=params)
raise gen.Return(data)
@gen.coroutine
def health(self, params=None):
"""Coroutine. Queries cluster Health API.
Returns a 2-tuple, where first element is request status, and second
element is a dictionary with response data.
:param params: dictionary of query parameters, will be handed over to
the underlying :class:`~torando_elasticsearch.AsyncHTTPConnection`
class for serialization
"""
status, data = yield self.transport.perform_request(
"GET", "/_cluster/health", params=params)
raise gen.Return((status, data))
@gen.coroutine
@query_params('consistency', 'id', 'parent', 'percolate', 'refresh',
'replication', 'routing', 'timeout', 'timestamp', 'ttl',
'version', 'version_type')
def create(self, index, doc_type, body, id=None, params=None):
"""
Adds a typed JSON document in a specific index, making it searchable.
Behind the scenes this method calls index(..., op_type='create')
`<http://elasticsearch.org/guide/reference/api/index_/>`_
:arg index: The name of the index
:arg doc_type: The type of the document
:arg id: Document ID
:arg body: The document
:arg consistency: Explicit write consistency setting for the operation
:arg id: Specific document ID (when the POST method is used)
:arg parent: ID of the parent document
:arg percolate: Percolator queries to execute while indexing the doc
:arg refresh: Refresh the index after performing the operation
:arg replication: Specific replication type (default: sync)
:arg routing: Specific routing value
:arg timeout: Explicit operation timeout
:arg timestamp: Explicit timestamp for the document
:arg ttl: Expiration time for the document
:arg version: Explicit version number for concurrency control
:arg version_type: Specific version type
"""
result = yield self.index(index, doc_type, body, id=id, params=params,
op_type='create')
raise gen.Return(result)
@gen.coroutine
@query_params('consistency', 'op_type', 'parent', 'percolate', 'refresh',
'replication', 'routing', 'timeout', 'timestamp', 'ttl',
'version', 'version_type')
def index(self, index, doc_type, body, id=None, params=None):
"""
Adds or updates a typed JSON document in a specific index, making it
searchable. `<http://elasticsearch.org/guide/reference/api/index_/>`_
:arg index: The name of the index
:arg doc_type: The type of the document
:arg body: The document
:arg id: Document ID
:arg consistency: Explicit write consistency setting for the operation
:arg op_type: Explicit operation type (default: index)
:arg parent: ID of the parent document
:arg percolate: Percolator queries to execute while indexing the doc
:arg refresh: Refresh the index after performing the operation
:arg replication: Specific replication type (default: sync)
:arg routing: Specific routing value
:arg timeout: Explicit operation timeout
:arg timestamp: Explicit timestamp for the document
:arg ttl: Expiration time for the document
:arg version: Explicit version number for concurrency control
:arg version_type: Specific version type
"""
_, data = yield self.transport.perform_request(
'PUT' if id else 'POST', _make_path(index, doc_type, id),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('parent', 'preference', 'realtime', 'refresh', 'routing')
def exists(self, index, id, doc_type='_all', params=None):
"""
Returns a boolean indicating whether or not given document exists in
Elasticsearch. `<http://elasticsearch.org/guide/reference/api/get/>`_
:arg index: The name of the index
:arg id: The document ID
:arg doc_type: The type of the document (uses `_all` by default to
fetch the first document matching the ID across all types)
:arg parent: The ID of the parent document
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg realtime: Specify whether to perform the operation in realtime or
search mode
:arg refresh: Refresh the shard containing the document before
performing the operation
:arg routing: Specific routing value
"""
try:
self.transport.perform_request(
'HEAD', _make_path(index, doc_type, id), params=params)
except exceptions.NotFoundError:
return gen.Return(False)
raise gen.Return(True)
@gen.coroutine
@query_params('_source', '_source_exclude', '_source_include', 'fields',
'parent', 'preference', 'realtime', 'refresh', 'routing')
def get(self, index, id, doc_type='_all', params=None):
"""
Get a typed JSON document from the index based on its id.
`<http://elasticsearch.org/guide/reference/api/get/>`_
:arg index: The name of the index
:arg id: The document ID
:arg doc_type: The type of the document (uses `_all` by default to
fetch the first document matching the ID across all types)
:arg _source: True or false to return the _source field or not, or a
list of fields to return
:arg _source_exclude: A list of fields to exclude from the returned
_source field
:arg _source_include: A list of fields to extract and return from the
_source field
:arg fields: A comma-separated list of fields to return in the response
:arg parent: The ID of the parent document
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg realtime: Specify whether to perform the operation in realtime or
search mode
:arg refresh: Refresh the shard containing the document before
performing the operation
:arg routing: Specific routing value
"""
_, data = yield self.transport.perform_request(
'GET', _make_path(index, doc_type, id), params=params)
raise gen.Return(data)
@gen.coroutine
@query_params('allow_no_indices', 'expand_wildcards', 'ignore_unavailable',
'local')
def get_alias(self, index=None, name=None, params=None):
"""
Retrieve a specified alias.
`<http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html>`_
:arg index: A comma-separated list of index names to filter aliases
:arg name: A comma-separated list of alias names to return
:arg allow_no_indices: Whether to ignore if a wildcard indices
expression resolves into no concrete indices. (This includes `_all`
string or when no indices have been specified)
:arg expand_wildcards: Whether to expand wildcard expression to
concrete indices that are open, closed or both., default 'all',
valid choices are: 'open', 'closed', 'none', 'all'
:arg ignore_unavailable: Whether specified concrete indices should be
ignored when unavailable (missing or closed)
:arg local: Return local information, do not retrieve the state from
master node (default: false)
"""
_, result = yield self.transport.perform_request(
'GET', _make_path(index, '_alias', name), params=params)
raise gen.Return(result)
@gen.coroutine
@query_params('_source_exclude', '_source_include', 'parent', 'preference',
'realtime', 'refresh', 'routing')
def get_source(self, index, id, doc_type='_all', params=None):
"""
Get the source of a document by it's index, type and id.
`<http://elasticsearch.org/guide/reference/api/get/>`_
:arg index: The name of the index
:arg doc_type: The type of the document (uses `_all` by default to
fetch the first document matching the ID across all types)
:arg id: The document ID
:arg exclude: A list of fields to exclude from the returned
_source field
:arg include: A list of fields to extract and return from the
_source field
:arg parent: The ID of the parent document
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg realtime: Specify whether to perform the operation in realtime or
search mode
:arg refresh: Refresh the shard containing the document before
performing the operation
:arg routing: Specific routing value
"""
_, data = yield self.transport.perform_request(
'GET', _make_path(index, doc_type, id, '_source'), params=params)
raise gen.Return(data)
@gen.coroutine
@query_params('_source', '_source_exclude', '_source_include', 'fields',
'parent', 'preference', 'realtime', 'refresh', 'routing')
def mget(self, body, index=None, doc_type=None, params=None):
"""
Get multiple documents based on an index, type (optional) and ids.
`<http://elasticsearch.org/guide/reference/api/multi-get/>`_
:arg body: Document identifiers; can be either `docs` (containing full
document information) or `ids` (when index and type is provided
in the URL.
:arg index: The name of the index
:arg doc_type: The type of the document
:arg _source: True or false to return the _source field or not, or a
list of fields to return
:arg _source_exclude: A list of fields to exclude from the returned
_source field
:arg _source_include: A list of fields to extract and return from the
_source field
:arg fields: A comma-separated list of fields to return in the response
:arg parent: The ID of the parent document
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg realtime: Specify whether to perform the operation in realtime or
search mode
:arg refresh: Refresh the shard containing the document before
performing the operation
:arg routing: Specific routing value
"""
_, data = yield self.transport.perform_request(
'GET', _make_path(index, doc_type, '_mget'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('consistency', 'fields', 'lang', 'parent', 'percolate',
'refresh', 'replication', 'retry_on_conflict', 'routing',
'script', 'timeout', 'timestamp', 'ttl', 'version',
'version_type')
def update(self, index, doc_type, id, body=None, params=None):
"""
Update a document based on a script or partial data provided.
`<http://elasticsearch.org/guide/reference/api/update/>`_
:arg index: The name of the index
:arg doc_type: The type of the document
:arg id: Document ID
:arg body: The request definition using either `script` or partial `doc`
:arg consistency: Explicit write consistency setting for the operation
:arg fields: A comma-separated list of fields to return in the response
:arg lang: The script language (default: mvel)
:arg parent: ID of the parent document
:arg percolate: Perform percolation during the operation; use specific
registered query name, attribute, or wildcard
:arg refresh: Refresh the index after performing the operation
:arg replication: Specific replication type (default: sync)
:arg retry_on_conflict: Specify how many times should the operation be
retried when a conflict occurs (default: 0)
:arg routing: Specific routing value
:arg script: The URL-encoded script definition (instead of using
request body)
:arg timeout: Explicit operation timeout
:arg timestamp: Explicit timestamp for the document
:arg ttl: Expiration time for the document
:arg version: Explicit version number for concurrency control
:arg version_type: Explicit version number for concurrency control
"""
_, data = yield self.transport.perform_request('POST',
_make_path(index,
doc_type, id,
'_update'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('_source', '_source_exclude', '_source_include',
'analyze_wildcard', 'analyzer', 'default_operator', 'df',
'explain', 'fields', 'ignore_indices', 'indices_boost',
'lenient', 'lowercase_expanded_terms', 'from_', 'preference',
'q', 'routing', 'scroll', 'search_type', 'size', 'sort',
'source', 'stats', 'suggest_field', 'suggest_mode',
'suggest_size', 'suggest_text', 'timeout', 'version')
def search(self, index=None, doc_type=None, body=None, params=None):
"""
Execute a search query and get back search hits that match the query.
`<http://www.elasticsearch.org/guide/reference/api/search/>`_
:arg index: A comma-separated list of index names to search; use `_all`
or empty string to perform the operation on all indices
:arg doc_type: A comma-separated list of document types to search;
leave empty to perform the operation on all types
:arg body: The search definition using the Query DSL
:arg _source: True or false to return the _source field or not, or a
list of fields to return
:arg _source_exclude: A list of fields to exclude from the returned
_source field
:arg _source_include: A list of fields to extract and return from the
_source field
:arg analyze_wildcard: Specify whether wildcard and prefix queries
should be analyzed (default: false)
:arg analyzer: The analyzer to use for the query string
:arg default_operator: The default operator for query string query (AND
or OR) (default: OR)
:arg df: The field to use as default where no field prefix is given in
the query string
:arg explain: Specify whether to return detailed information about
score computation as part of a hit
:arg fields: A comma-separated list of fields to return as part of a hit
:arg ignore_indices: When performed on multiple indices, allows to
ignore `missing` ones (default: none)
:arg indices_boost: Comma-separated list of index boosts
:arg lenient: Specify whether format-based query failures (such as
providing text to a numeric field) should be ignored
:arg lowercase_expanded_terms: Specify whether query terms should be
lowercased
:arg from_: Starting offset (default: 0)
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg q: Query in the Lucene query string syntax
:arg routing: A comma-separated list of specific routing values
:arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search
:arg search_type: Search operation type
:arg size: Number of hits to return (default: 10)
:arg sort: A comma-separated list of <field>:<direction> pairs
:arg source: The URL-encoded request definition using the Query DSL
(instead of using request body)
:arg stats: Specific 'tag' of the request for logging and statistical
purposes
:arg suggest_field: Specify which field to use for suggestions
:arg suggest_mode: Specify suggest mode (default: missing)
:arg suggest_size: How many suggestions to return in response
:arg suggest_text: The source text for which the suggestions should be
returned
:arg timeout: Explicit operation timeout
:arg version: Specify whether to return document version as part of a
hit
"""
# from is a reserved word so it cannot be used, use from_ instead
if 'from_' in params:
params['from'] = params.pop('from_')
if doc_type and not index:
index = '_all'
_, data = yield self.transport.perform_request('GET',
_make_path(index,
doc_type,
'_search'),
params=params,
body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('_source', '_source_exclude', '_source_include',
'analyze_wildcard', 'analyzer', 'default_operator',
'df', 'fields', 'lenient', 'lowercase_expanded_terms',
'parent', 'preference', 'q', 'routing', 'source')
def explain(self, index, doc_type, id, body=None, params=None):
"""
The explain api computes a score explanation for a query and a specific
document. This can give useful feedback whether a document matches or
didn't match a specific query.
`<http://elasticsearch.org/guide/reference/api/explain/>`_
:arg index: The name of the index
:arg doc_type: The type of the document
:arg id: The document ID
:arg body: The query definition using the Query DSL
:arg _source: True or false to return the _source field or not, or a
list of fields to return
:arg _source_exclude: A list of fields to exclude from the returned
_source field
:arg _source_include: A list of fields to extract and return from the
_source field
:arg analyze_wildcard: Specify whether wildcards and prefix queries in
the query string query should be analyzed (default: false)
:arg analyzer: The analyzer for the query string query
:arg default_operator: The default operator for query string query (AND
or OR), (default: OR)
:arg df: The default field for query string query (default: _all)
:arg fields: A comma-separated list of fields to return in the response
:arg lenient: Specify whether format-based query failures (such as
providing text to a numeric field) should be ignored
:arg lowercase_expanded_terms: Specify whether query terms should be
lowercased
:arg parent: The ID of the parent document
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg q: Query in the Lucene query string syntax
:arg routing: Specific routing value
:arg source: The URL-encoded query definition (instead of using the
request body)
"""
_, data = yield self.transport.perform_request('GET',
_make_path(index,
doc_type, id,
'_explain'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params()
def scroll(self, scroll_id, scroll, params=None):
"""
Scroll a search request created by specifying the scroll parameter.
`<http://www.elasticsearch.org/guide/reference/api/search/scroll/>`_
:arg scroll_id: The scroll ID
:arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search
"""
body = {
"scroll": scroll,
"scroll_id": scroll_id
}
if params:
if "scroll" in params.keys():
params.pop("scroll")
if "scroll_id" in params.keys():
params.pop("scroll_id")
_, data = yield self.transport.perform_request('POST',
_make_path('_search',
'scroll'),
body=body,
params=params)
raise gen.Return(data)
@gen.coroutine
@query_params()
def clear_scroll(self, scroll_id, params=None):
"""
Clear the scroll request created by specifying the scroll parameter to
search.
`<http://www.elasticsearch.org/guide/reference/api/search/scroll/>`_
:arg scroll_id: The scroll ID or a list of scroll IDs
"""
if not isinstance(scroll_id, list):
scroll_id = [scroll_id]
body = {
"scroll_id": scroll_id
}
if params and "scroll_id" in params.keys():
params.pop("scroll_id")
_, data = yield self.transport.perform_request('DELETE',
_make_path('_search',
'scroll'),
body=body,
params=params)
raise gen.Return(data)
@gen.coroutine
@query_params('consistency', 'parent', 'refresh', 'replication', 'routing',
'timeout', 'version', 'version_type')
def delete(self, index, doc_type, id, params=None):
"""
Delete a typed JSON document from a specific index based on its id.
`<http://elasticsearch.org/guide/reference/api/delete/>`_
:arg index: The name of the index
:arg doc_type: The type of the document
:arg id: The document ID
:arg consistency: Specific write consistency setting for the operation
:arg parent: ID of parent document
:arg refresh: Refresh the index after performing the operation
:arg replication: Specific replication type (default: sync)
:arg routing: Specific routing value
:arg timeout: Explicit operation timeout
:arg version: Explicit version number for concurrency control
:arg version_type: Specific version type
"""
_, data = yield self.transport.perform_request('DELETE',
_make_path(index,
doc_type, id),
params=params)
raise gen.Return(data)
@gen.coroutine
@query_params('ignore_indices', 'min_score', 'preference', 'routing',
'source')
def count(self, index=None, doc_type=None, body=None, params=None):
"""
Execute a query and get the number of matches for that query.
`<http://elasticsearch.org/guide/reference/api/count/>`_
:arg index: A comma-separated list of indices to restrict the results
:arg doc_type: A comma-separated list of types to restrict the results
:arg body: A query to restrict the results (optional)
:arg ignore_indices: When performed on multiple indices, allows to
ignore `missing` ones (default: none)
:arg min_score: Include only documents with a specific `_score` value
in the result
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg routing: Specific routing value
:arg source: The URL-encoded query definition (instead of using the
request body)
"""
_, data = yield self.transport.perform_request('POST',
_make_path(index,
doc_type,
'_count'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('consistency', 'refresh', 'replication')
def bulk(self, body, index=None, doc_type=None, params=None):
"""
Perform many index/delete operations in a single API call.
`<http://elasticsearch.org/guide/reference/api/bulk/>`_
See the :func:`~elasticsearch.helpers.bulk_index` for a more friendly
API.
:arg body: The operation definition and data (action-data pairs)
:arg index: Default index for items which don't provide one
:arg doc_type: Default document type for items which don't provide one
:arg consistency: Explicit write consistency setting for the operation
:arg refresh: Refresh the index after performing the operation
:arg replication: Explicitly set the replication type (efault: sync)
"""
_, data = yield self.transport.perform_request('POST',
_make_path(index,
doc_type,
'_bulk'),
params=params,
body=self._bulk_body(body))
raise gen.Return(data)
@gen.coroutine
@query_params('search_type')
def msearch(self, body, index=None, doc_type=None, params=None):
"""
Execute several search requests within the same API.
`<http://www.elasticsearch.org/guide/reference/api/multi-search/>`_
:arg body: The request definitions (metadata-search request definition
pairs), separated by newlines
:arg index: A comma-separated list of index names to use as default
:arg doc_type: A comma-separated list of document types to use as default
:arg search_type: Search operation type
"""
_, data = yield self.transport.perform_request('GET',
_make_path(index,
doc_type,
'_msearch'),
params=params,
body=self._bulk_body(body))
raise gen.Return(data)
@gen.coroutine
@query_params('consistency', 'ignore_indices', 'replication', 'routing',
'source', 'timeout', 'q')
def delete_by_query(self, index, doc_type=None, body=None, params=None):
"""
Delete documents from one or more indices and one or more types based
on a query.
`<http://www.elasticsearch.org/guide/reference/api/delete-by-query/>`_
:arg index: A comma-separated list of indices to restrict the operation
:arg doc_type: A comma-separated list of types to restrict the operation
:arg body: A query to restrict the operation
:arg consistency: Specific write consistency setting for the operation
:arg ignore_indices: When performed on multiple indices, allows to
ignore `missing` ones (default: none)
:arg replication: Specific replication type (default: sync)
:arg routing: Specific routing value
:arg source: The URL-encoded query definition (instead of using the
request body)
:arg q: Query in the Lucene query string syntax
:arg timeout: Explicit operation timeout
"""
_, data = yield self.transport.perform_request('DELETE',
_make_path(index,
doc_type,
'_query'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('allow_no_indices', 'expand_wildcards', 'ignore_unavailable',
'local')
def get_mapping(self, index=None, doc_type=None, params=None):
"""
Retrieve mapping definition of index or index/type.
`<http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html>`_
:arg index: A comma-separated list of index names
:arg doc_type: A comma-separated list of document types
:arg allow_no_indices: Whether to ignore if a wildcard indices
expression resolves into no concrete indices. (This includes `_all`
string or when no indices have been specified)
:arg expand_wildcards: Whether to expand wildcard expression to concrete
indices that are open, closed or both., default 'open', valid
choices are: 'open', 'closed', 'none', 'all'
:arg ignore_unavailable: Whether specified concrete indices should be
ignored when unavailable (missing or closed)
:arg local: Return local information, do not retrieve the state from
master node (default: false)
"""
_, data = yield self.transport.perform_request('GET',
_make_path(index,
'_mapping',
doc_type),
params=params)
raise gen.Return(data)
@gen.coroutine
@query_params('ignore_indices', 'preference', 'routing', 'source')
def suggest(self, index=None, body=None, params=None):
"""
The suggest feature suggests similar looking terms based on a provided
text by using a suggester.
`<http://elasticsearch.org/guide/reference/api/search/suggest/>`_
:arg index: A comma-separated list of index names to restrict the
operation; use `_all` or empty string to perform the operation on
all indices
:arg body: The request definition
:arg ignore_indices: When performed on multiple indices, allows to
ignore `missing` ones (default: none)
:arg preference: Specify the node or shard the operation should be
performed on (default: random)
:arg routing: Specific routing value
:arg source: The URL-encoded request definition (instead of using
request body)
"""
_, data = yield self.transport.perform_request('POST',
_make_path(index,
'_suggest'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('prefer_local')
def percolate(self, index, doc_type, body, params=None):
"""
Send a percolate request which include a doc, and get back the queries
that match on that doc out of the set of registered queries.
`<http://elasticsearch.org/guide/reference/api/percolate/>`_
:arg index: The name of the index with a registered percolator query
:arg doc_type: The document type
:arg body: The document (`doc`) to percolate against registered queries;
optionally also a `query` to limit the percolation to specific
registered queries
:arg prefer_local: With `true`, specify that a local shard should be
used if available, with `false`, use a random shard (default: true)
"""
_, data = yield self.transport.perform_request('GET',
_make_path(index,
doc_type,
'_percolate'),
params=params, body=body)
raise gen.Return(data)
@gen.coroutine
@query_params('boost_terms', 'max_doc_freq', 'max_query_terms',
'max_word_len', 'min_doc_freq', 'min_term_freq',
'min_word_len', 'mlt_fields', 'percent_terms_to_match',
'routing', 'search_from', 'search_indices',
'search_query_hint', 'search_scroll', 'search_size',
'search_source', 'search_type', 'search_types', 'stop_words')
def mlt(self, index, doc_type, id, body=None, params=None):
"""
Get documents that are "like" a specified document.
`<http://elasticsearch.org/guide/reference/api/more-like-this/>`_
:arg index: The name of the index
:arg doc_type: The type of the document (use `_all` to fetch the first
document matching the ID across all types)
:arg id: The document ID
:arg body: A specific search request definition
:arg boost_terms: The boost factor
:arg max_doc_freq: The word occurrence frequency as count: words with
higher occurrence in the corpus will be ignored
:arg max_query_terms: The maximum query terms to be included in the
generated query
:arg max_word_len: The minimum length of the word: longer words will
be ignored
:arg min_doc_freq: The word occurrence frequency as count: words with
lower occurrence in the corpus will be ignored
:arg min_term_freq: The term frequency as percent: terms with lower
occurrence in the source document will be ignored
:arg min_word_len: The minimum length of the word: shorter words will
be ignored
:arg mlt_fields: Specific fields to perform the query against
:arg percent_terms_to_match: How many terms have to match in order to
consider the document a match (default: 0.3)
:arg routing: Specific routing value
:arg search_from: The offset from which to return results
:arg search_indices: A comma-separated list of indices to perform the
query against (default: the index containing the document)
:arg search_query_hint: The search query hint
:arg search_scroll: A scroll search request definition
:arg search_size: The number of documents to return (default: 10)
:arg search_source: A specific search request definition (instead of
using the request body)
:arg search_type: Specific search type (eg. `dfs_then_fetch`, `count`,
etc)
:arg search_types: A comma-separated list of types to perform the query
against (default: the same type as the document)
:arg stop_words: A list of stop words to be ignored
"""
_, data = yield self.transport.perform_request(
'GET', _make_path(index, doc_type, id, '_mlt'),
params=params, body=body)
raise gen.Return(data)