Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch 7 support #165

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ python:
- "3.6"
- "3.7"
install:
- pip install --upgrade pip
- pip install --upgrade setuptools
- pip install -U tox
script:
- tox -- --es_tag=${ES_TAG}
Expand Down
12 changes: 10 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@ aioelasticsearch
Installation
------------

For elasticsearch < 7:

.. code-block:: shell

pip install aioelasticsearch[6]

For elasticsearch >= 7:

.. code-block:: shell

pip install aioelasticsearch
pip install aioelasticsearch[7]

Usage
-----
Expand Down Expand Up @@ -56,7 +64,7 @@ Asynchronous `scroll <https://www.elastic.co/guide/en/elasticsearch/reference/cu
async with Scan(
es,
index='index',
doc_type='doc_type',
doc_type='doc_type', # omit this for Elasticsearch 7
query={},
) as scan:
print(scan.total)
Expand Down
15 changes: 11 additions & 4 deletions aioelasticsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import asyncio

from elasticsearch import Elasticsearch as _Elasticsearch # noqa # isort:skip
from elasticsearch.connection_pool import (ConnectionSelector, # noqa # isort:skip
RoundRobinSelector)
from elasticsearch.serializer import JSONSerializer # noqa # isort:skip
try:
from elasticsearch import Elasticsearch as _Elasticsearch # noqa # isort:skip
from elasticsearch.connection_pool import (ConnectionSelector, # noqa # isort:skip
RoundRobinSelector)
from elasticsearch.serializer import JSONSerializer # noqa # isort:skip
except ImportError: # pragma: no cover
raise RuntimeError(
'Please reinstall the library specifying ES version -- '
'pip install aioelasticsearch[6] OR pip install aioelasticsearch[7]\n'
'\t\t or install elasticsearch-py manually -- https://github.com/elastic/elasticsearch-py.')


from .exceptions import * # noqa # isort:skip
from .pool import AIOHttpConnectionPool # noqa # isort:skip
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ def read(*parts):
description='elasticsearch-py wrapper for asyncio',
long_description=read('README.rst'),
install_requires=[
'elasticsearch>=6.0.0,<7.0.0',
'aiohttp>=3.5.0,<4.0.0',
],
extras_require={
'6': ['elasticsearch>=6.0.0,<7.0.0'],
'7': ['elasticsearch>=7.0.0,<8.0.0'],
},
python_requires='>=3.5.3',
packages=['aioelasticsearch'],
include_package_data=True,
Expand Down
32 changes: 29 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest
from aiohttp.test_utils import unused_port
from docker import from_env as docker_from_env
from elasticsearch import __version__ as elasticsearch_lib_version

import aioelasticsearch

Expand Down Expand Up @@ -55,7 +56,10 @@ def pytest_generate_tests(metafunc):
if 'es_tag' in metafunc.fixturenames:
tags = set(metafunc.config.option.es_tag)
if not tags:
tags = ['6.0.0']
if elasticsearch_lib_version[0] == 6:
tags = ['6.0.0']
else:
tags = ['7.0.0']
else:
tags = list(tags)
metafunc.parametrize("es_tag", tags, scope='session')
Expand Down Expand Up @@ -213,22 +217,44 @@ def pytest_pyfunc_call(pyfuncitem):


@pytest.fixture
def populate(es, loop):
def populate(es, loop, es_major_version):
async def do(index, doc_type, n, body):
coros = []

await es.indices.create(index)

kwargs = {}

if es_major_version < 7:
kwargs['doc_type'] = doc_type

for i in range(n):
coros.append(
es.index(
index=index,
doc_type=doc_type,
id=str(i),
body=body,
**kwargs,
),
)

await asyncio.gather(*coros, loop=loop)
await es.indices.refresh()
return do


@pytest.fixture
def es_major_version():
return elasticsearch_lib_version[0]


@pytest.fixture(autouse=True)
def skip_before_es7_marker(request, es_major_version):
if request.node.get_closest_marker('skip_before_es7') and es_major_version < 7:
pytest.skip('skipping this test for elasticsearch lib versions < 7')


@pytest.fixture(autouse=True)
def skip_after_es7_marker(request, es_major_version):
if request.node.get_closest_marker('skip_after_es7') and es_major_version >= 7:
pytest.skip('skipping this test for elasticsearch lib versions >= 7')
116 changes: 108 additions & 8 deletions tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def test_scan_scroll_id_without_context_manager(es):
scan.scroll_id


@pytest.mark.skip_after_es7
@pytest.mark.run_loop
async def test_scan_simple(es, populate):
async def test_scan_simple_before_es7(es, populate):
index = 'test_aioes'
doc_type = 'type_2'
scroll_size = 3
Expand Down Expand Up @@ -63,8 +64,74 @@ async def test_scan_simple(es, populate):
assert ids == {str(i) for i in range(10)}


@pytest.mark.skip_before_es7
@pytest.mark.run_loop
async def test_scan_equal_chunks_for_loop(es, es_clean, populate):
async def test_scan_simple_after_es7(es, populate):
index = 'test_aioes'
scroll_size = 3
n = 10

body = {'foo': 1}
await populate(index, None, n, body)
ids = set()

async with Scan(
es,
index=index,
size=scroll_size,
) as scan:
assert isinstance(scan.scroll_id, str)
assert scan.total == {'relation': 'eq', 'value': 10}
async for doc in scan:
ids.add(doc['_id'])
assert doc == {'_id': mock.ANY,
'_index': 'test_aioes',
'_score': None,
'_source': {'foo': 1},
'_type': '_doc',
'sort': mock.ANY}

assert ids == {str(i) for i in range(10)}


@pytest.mark.skip_before_es7
@pytest.mark.run_loop
async def test_scan_equal_chunks_for_loop_after_es7(es, es_clean, populate):
for n, scroll_size in [
(0, 1), # no results
(6, 6), # 1 scroll
(6, 8), # 1 scroll
(6, 3), # 2 scrolls
(6, 4), # 2 scrolls
(6, 2), # 3 scrolls
(6, 1), # 6 scrolls
]:
es_clean()

index = 'test_aioes'
doc_type = 'type_1'
body = {'foo': 1}

await populate(index, doc_type, n, body)

ids = set()

async with Scan(
es,
index=index,
size=scroll_size,
) as scan:

async for doc in scan:
ids.add(doc['_id'])

# check number of unique doc ids
assert len(ids) == n == scan.total['value']


@pytest.mark.skip_after_es7
@pytest.mark.run_loop
async def test_scan_equal_chunks_for_loop_before_es7(es, es_clean, populate):
for n, scroll_size in [
(0, 1), # no results
(6, 6), # 1 scroll
Expand Down Expand Up @@ -98,6 +165,27 @@ async def test_scan_equal_chunks_for_loop(es, es_clean, populate):
assert len(ids) == n == scan.total


@pytest.mark.skip_before_es7
@pytest.mark.run_loop
async def test_scan_no_mask_index(es):
index = 'undefined-*'
doc_type = 'any'
scroll_size = 3

async with Scan(
es,
index=index,
size=scroll_size,
) as scan:
assert scan.scroll_id is None
assert scan.total == {'relation': 'eq', 'value': 0}
cnt = 0
async for doc in scan: # noqa
cnt += 1
assert cnt == 0


@pytest.mark.skip_after_es7
@pytest.mark.run_loop
async def test_scan_no_mask_index(es):
index = 'undefined-*'
Expand Down Expand Up @@ -141,16 +229,20 @@ async def test_scan_no_scroll(es, loop, populate):


@pytest.mark.run_loop
async def test_scan_no_index(es):
async def test_scan_no_index(es, es_major_version):
index = 'undefined'
doc_type = 'any'
scroll_size = 3

scan_kwargs = {}
if es_major_version < 7:
scan_kwargs['doc_type'] = doc_type

async with Scan(
es,
index=index,
doc_type=doc_type,
size=scroll_size,
**scan_kwargs,
) as scan:
assert scan.scroll_id is None
assert scan.total == 0
Expand All @@ -161,12 +253,16 @@ async def test_scan_no_index(es):


@pytest.mark.run_loop
async def test_scan_warning_on_failed_shards(es, populate, mocker):
async def test_scan_warning_on_failed_shards(es, populate, mocker, es_major_version):
index = 'test_aioes'
doc_type = 'type_2'
scroll_size = 3
n = 10

scan_kwargs = {}
if es_major_version < 7:
scan_kwargs['doc_type'] = doc_type

body = {'foo': 1}
await populate(index, doc_type, n, body)

Expand All @@ -175,9 +271,9 @@ async def test_scan_warning_on_failed_shards(es, populate, mocker):
async with Scan(
es,
index=index,
doc_type=doc_type,
size=scroll_size,
raise_on_error=False,
**scan_kwargs,
) as scan:
i = 0
async for doc in scan: # noqa
Expand All @@ -192,12 +288,16 @@ async def test_scan_warning_on_failed_shards(es, populate, mocker):


@pytest.mark.run_loop
async def test_scan_exception_on_failed_shards(es, populate, mocker):
async def test_scan_exception_on_failed_shards(es, populate, mocker, es_major_version):
index = 'test_aioes'
doc_type = 'type_2'
scroll_size = 3
n = 10

scan_kwargs = {}
if es_major_version < 7:
scan_kwargs['doc_type'] = doc_type

body = {'foo': 1}
await populate(index, doc_type, n, body)

Expand All @@ -207,8 +307,8 @@ async def test_scan_exception_on_failed_shards(es, populate, mocker):
async with Scan(
es,
index=index,
doc_type=doc_type,
size=scroll_size,
**scan_kwargs,
) as scan:
with pytest.raises(ScanError) as cm:
async for doc in scan: # noqa
Expand Down
Loading