Skip to content

Commit

Permalink
aiomoto
Browse files Browse the repository at this point in the history
  • Loading branch information
Darren Weber committed Feb 20, 2020
1 parent fcf7cd4 commit ce6cb09
Show file tree
Hide file tree
Showing 13 changed files with 781 additions and 2 deletions.
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ def read_version():
packages=find_packages(),
install_requires=install_requires,
extras_require=extras_require,
include_package_data=True)
include_package_data=True,

# the following makes a plugin available to pytest
entry_points = {
'pytest11': [
'aiomoto = tests.aws.aio.aiomoto_fixtures.py', # or something
]
},
)
Empty file added tests/aws/__init__.py
Empty file.
Empty file added tests/aws/aio/__init__.py
Empty file.
174 changes: 174 additions & 0 deletions tests/aws/aio/aiomoto_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
AWS asyncio test fixtures
"""

import aiobotocore.client
import aiobotocore.config
import pytest

from tests.aws.aio.aiomoto_services import MotoService
from tests.aws.utils import AWS_ACCESS_KEY_ID
from tests.aws.utils import AWS_SECRET_ACCESS_KEY


#
# Asyncio AWS Services
#


@pytest.fixture
async def aio_aws_batch_server():
async with MotoService("batch") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_cloudformation_server():
async with MotoService("cloudformation") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_ec2_server():
async with MotoService("ec2") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_ecs_server():
async with MotoService("ecs") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_iam_server():
async with MotoService("iam") as svc:
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_dynamodb2_server():
async with MotoService("dynamodb2") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_logs_server():
# cloud watch logs
async with MotoService("logs") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_s3_server():
async with MotoService("s3") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_sns_server():
async with MotoService("sns") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_sqs_server():
async with MotoService("sqs") as svc:
svc.reset()
yield svc.endpoint_url


#
# Asyncio AWS Clients
#


@pytest.fixture
def aio_aws_session(aws_credentials, aws_region, event_loop):
# pytest-asyncio provides and manages the `event_loop`

session = aiobotocore.get_session(loop=event_loop)
session.user_agent_name = "aiomoto"

assert session.get_default_client_config() is None
aioconfig = aiobotocore.config.AioConfig(max_pool_connections=1, region_name=aws_region)

# Note: tried to use proxies for the aiobotocore.endpoint, to replace
# 'https://batch.us-west-2.amazonaws.com/v1/describejobqueues', but
# the moto.server does not behave as a proxy server. Leaving this
# here for the record to avoid trying to do it again sometime later.
# proxies = {
# 'http': os.getenv("HTTP_PROXY", "http://127.0.0.1:5000/moto-api/"),
# 'https': os.getenv("HTTPS_PROXY", "http://127.0.0.1:5000/moto-api/"),
# }
# assert aioconfig.proxies is None
# aioconfig.proxies = proxies

session.set_default_client_config(aioconfig)
assert session.get_default_client_config() == aioconfig

session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
session.set_debug_logger(logger_name="aiomoto")

yield session


@pytest.fixture
async def aio_aws_client(aio_aws_session):
async def _get_client(service_name):
async with MotoService(service_name) as srv:
async with aio_aws_session.create_client(
service_name, endpoint_url=srv.endpoint_url
) as client:
yield client

return _get_client


@pytest.fixture
async def aio_aws_batch_client(aio_aws_session, aio_aws_batch_server):
async with aio_aws_session.create_client(
"batch", endpoint_url=aio_aws_batch_server
) as client:
yield client


@pytest.fixture
async def aio_aws_ec2_client(aio_aws_session, aio_aws_ec2_server):
async with aio_aws_session.create_client("ec2", endpoint_url=aio_aws_ec2_server) as client:
yield client


@pytest.fixture
async def aio_aws_ecs_client(aio_aws_session, aio_aws_ecs_server):
async with aio_aws_session.create_client("ecs", endpoint_url=aio_aws_ecs_server) as client:
yield client


@pytest.fixture
async def aio_aws_iam_client(aio_aws_session, aio_aws_iam_server):
async with aio_aws_session.create_client("iam", endpoint_url=aio_aws_iam_server) as client:
client.meta.config.region_name = "aws-global" # not AWS_REGION
yield client


@pytest.fixture
async def aio_aws_logs_client(aio_aws_session, aio_aws_logs_server):
async with aio_aws_session.create_client(
"logs", endpoint_url=aio_aws_logs_server
) as client:
yield client


@pytest.fixture
async def aio_aws_s3_client(aio_aws_session, aio_aws_s3_server):
async with aio_aws_session.create_client("s3", endpoint_url=aio_aws_s3_server) as client:
yield client
143 changes: 143 additions & 0 deletions tests/aws/aio/aiomoto_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import asyncio
import functools
import logging
import socket
import threading
import time
import os

# Third Party
import aiohttp
import moto.backends
import moto.server
import werkzeug.serving


HOST = "127.0.0.1"

_PYCHARM_HOSTED = os.environ.get("PYCHARM_HOSTED") == "1"
CONNECT_TIMEOUT = 90 if _PYCHARM_HOSTED else 10


def get_free_tcp_port(release_socket: bool = False):
sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sckt.bind(("", 0))
addr, port = sckt.getsockname()
if release_socket:
sckt.close()
return port

return sckt, port


class MotoService:
""" Will Create MotoService.
Service is ref-counted so there will only be one per process. Real Service will
be returned by `__aenter__`."""

_services = dict() # {name: instance}

def __init__(self, service_name: str, port: int = None):
self._service_name = service_name

if port:
self._socket = None
self._port = port
else:
self._socket, self._port = get_free_tcp_port()

self._thread = None
self._logger = logging.getLogger("MotoService")
self._refcount = None
self._ip_address = HOST
self._server = None

@property
def endpoint_url(self):
return "http://{}:{}".format(self._ip_address, self._port)

def reset(self):
# each service can have multiple regional backends
service_backends = moto.backends.BACKENDS[self._service_name]
for region_name, backend in service_backends.items():
backend.reset()

def __call__(self, func):
async def wrapper(*args, **kwargs):
await self._start()
try:
result = await func(*args, **kwargs)
finally:
await self._stop()
return result

functools.update_wrapper(wrapper, func)
wrapper.__wrapped__ = func
return wrapper

async def __aenter__(self):
svc = self._services.get(self._service_name)
if svc is None:
self._services[self._service_name] = self
self._refcount = 1
await self._start()
return self
else:
svc._refcount += 1
return svc

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._refcount -= 1

if self._socket:
self._socket.close()
self._socket = None

if self._refcount == 0:
del self._services[self._service_name]
await self._stop()

def _server_entry(self):
self._main_app = moto.server.DomainDispatcherApplication(
moto.server.create_backend_app, service=self._service_name
)
self._main_app.debug = True

if self._socket:
self._socket.close() # release right before we use it
self._socket = None

self._server = werkzeug.serving.make_server(
self._ip_address, self._port, self._main_app, True
)
self._server.serve_forever()

async def _start(self):
self._thread = threading.Thread(target=self._server_entry, daemon=True)
self._thread.start()

async with aiohttp.ClientSession() as session:
start = time.time()

while time.time() - start < 10:
if not self._thread.is_alive():
break

try:
# we need to bypass the proxies due to monkeypatches
async with session.get(
self.endpoint_url + "/static", timeout=CONNECT_TIMEOUT
):
pass
break
except (asyncio.TimeoutError, aiohttp.ClientConnectionError):
await asyncio.sleep(0.5)
else:
await self._stop() # pytest.fail doesn't call stop_process
raise Exception("Cannot start MotoService: {}".format(self._service_name))

async def _stop(self):
if self._server:
self._server.shutdown()

self._thread.join()
6 changes: 6 additions & 0 deletions tests/aws/aio/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
AWS asyncio test fixtures
Test fixtures are loaded by ``pytest_plugins`` in tests/conftest.py
"""

Loading

0 comments on commit ce6cb09

Please sign in to comment.