-
-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Darren Weber
committed
Feb 23, 2020
1 parent
904332b
commit 1c24619
Showing
15 changed files
with
830 additions
and
3 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
""" | ||
AWS asyncio test fixtures | ||
""" | ||
|
||
import aiobotocore.client | ||
import aiobotocore.config | ||
import pytest | ||
|
||
from aiobotocore.aiomoto.aiomoto_services import MotoService | ||
from aiobotocore.aiomoto.utils import AWS_ACCESS_KEY_ID | ||
from aiobotocore.aiomoto.utils import AWS_SECRET_ACCESS_KEY | ||
|
||
|
||
# | ||
# Asyncio AWS Services | ||
# | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_batch_server(): | ||
async with MotoService("batch") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_cloudformation_server(): | ||
async with MotoService("cloudformation") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_ec2_server(): | ||
async with MotoService("ec2") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_ecs_server(): | ||
async with MotoService("ecs") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_iam_server(): | ||
async with MotoService("iam") as svc: | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_dynamodb2_server(): | ||
async with MotoService("dynamodb2") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_logs_server(): | ||
# cloud watch logs | ||
async with MotoService("logs") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_s3_server(): | ||
async with MotoService("s3") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_sns_server(): | ||
async with MotoService("sns") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_sqs_server(): | ||
async with MotoService("sqs") as svc: | ||
svc.reset() | ||
yield svc.endpoint_url | ||
|
||
|
||
# | ||
# Asyncio AWS Clients | ||
# | ||
|
||
|
||
@pytest.fixture | ||
def aio_aws_session(aws_credentials, aws_region, event_loop): | ||
# pytest-asyncio provides and manages the `event_loop` | ||
|
||
session = aiobotocore.get_session(loop=event_loop) | ||
session.user_agent_name = "aiomoto" | ||
|
||
assert session.get_default_client_config() is None | ||
aioconfig = aiobotocore.config.AioConfig( | ||
max_pool_connections=1, region_name=aws_region | ||
) | ||
|
||
# Note: tried to use proxies for the aiobotocore.endpoint, to replace | ||
# 'https://batch.us-west-2.amazonaws.com/v1/describejobqueues', but | ||
# the moto.server does not behave as a proxy server. Leaving this | ||
# here for the record to avoid trying to do it again sometime later. | ||
# proxies = { | ||
# 'http': os.getenv("HTTP_PROXY", "http://127.0.0.1:5000/moto-api/"), | ||
# 'https': os.getenv("HTTPS_PROXY", "http://127.0.0.1:5000/moto-api/"), | ||
# } | ||
# assert aioconfig.proxies is None | ||
# aioconfig.proxies = proxies | ||
|
||
session.set_default_client_config(aioconfig) | ||
assert session.get_default_client_config() == aioconfig | ||
|
||
session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) | ||
session.set_debug_logger(logger_name="aiomoto") | ||
|
||
yield session | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_client(aio_aws_session): | ||
async def _get_client(service_name): | ||
async with MotoService(service_name) as srv: | ||
async with aio_aws_session.create_client( | ||
service_name, endpoint_url=srv.endpoint_url | ||
) as client: | ||
yield client | ||
|
||
return _get_client | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_batch_client(aio_aws_session, aio_aws_batch_server): | ||
async with aio_aws_session.create_client( | ||
"batch", endpoint_url=aio_aws_batch_server | ||
) as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_ec2_client(aio_aws_session, aio_aws_ec2_server): | ||
async with aio_aws_session.create_client( | ||
"ec2", endpoint_url=aio_aws_ec2_server | ||
) as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_ecs_client(aio_aws_session, aio_aws_ecs_server): | ||
async with aio_aws_session.create_client( | ||
"ecs", endpoint_url=aio_aws_ecs_server | ||
) as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_iam_client(aio_aws_session, aio_aws_iam_server): | ||
async with aio_aws_session.create_client( | ||
"iam", endpoint_url=aio_aws_iam_server | ||
) as client: | ||
client.meta.config.region_name = "aws-global" # not AWS_REGION | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_logs_client(aio_aws_session, aio_aws_logs_server): | ||
async with aio_aws_session.create_client( | ||
"logs", endpoint_url=aio_aws_logs_server | ||
) as client: | ||
yield client | ||
|
||
|
||
@pytest.fixture | ||
async def aio_aws_s3_client(aio_aws_session, aio_aws_s3_server): | ||
async with aio_aws_session.create_client( | ||
"s3", endpoint_url=aio_aws_s3_server | ||
) as client: | ||
yield client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
import asyncio | ||
import functools | ||
import logging | ||
import socket | ||
import threading | ||
import time | ||
import os | ||
|
||
# Third Party | ||
import aiohttp | ||
import moto.backends | ||
import moto.server | ||
import werkzeug.serving | ||
|
||
|
||
HOST = "127.0.0.1" | ||
|
||
_PYCHARM_HOSTED = os.environ.get("PYCHARM_HOSTED") == "1" | ||
CONNECT_TIMEOUT = 90 if _PYCHARM_HOSTED else 10 | ||
|
||
|
||
def get_free_tcp_port(release_socket: bool = False): | ||
sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
sckt.bind(("", 0)) | ||
addr, port = sckt.getsockname() | ||
if release_socket: | ||
sckt.close() | ||
return port | ||
|
||
return sckt, port | ||
|
||
|
||
class MotoService: | ||
""" Will Create MotoService. | ||
Service is ref-counted so there will only be one per process. Real Service will | ||
be returned by `__aenter__`.""" | ||
|
||
_services = dict() # {name: instance} | ||
|
||
def __init__(self, service_name: str, port: int = None): | ||
self._service_name = service_name | ||
|
||
if port: | ||
self._socket = None | ||
self._port = port | ||
else: | ||
self._socket, self._port = get_free_tcp_port() | ||
|
||
self._thread = None | ||
self._logger = logging.getLogger("MotoService") | ||
self._refcount = None | ||
self._ip_address = HOST | ||
self._server = None | ||
|
||
@property | ||
def endpoint_url(self): | ||
return "http://{}:{}".format(self._ip_address, self._port) | ||
|
||
def reset(self): | ||
# each service can have multiple regional backends | ||
service_backends = moto.backends.BACKENDS[self._service_name] | ||
for region_name, backend in service_backends.items(): | ||
backend.reset() | ||
|
||
def __call__(self, func): | ||
async def wrapper(*args, **kwargs): | ||
await self._start() | ||
try: | ||
result = await func(*args, **kwargs) | ||
finally: | ||
await self._stop() | ||
return result | ||
|
||
functools.update_wrapper(wrapper, func) | ||
wrapper.__wrapped__ = func | ||
return wrapper | ||
|
||
async def __aenter__(self): | ||
svc = self._services.get(self._service_name) | ||
if svc is None: | ||
self._services[self._service_name] = self | ||
self._refcount = 1 | ||
await self._start() | ||
return self | ||
else: | ||
svc._refcount += 1 | ||
return svc | ||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
self._refcount -= 1 | ||
|
||
if self._socket: | ||
self._socket.close() | ||
self._socket = None | ||
|
||
if self._refcount == 0: | ||
del self._services[self._service_name] | ||
await self._stop() | ||
|
||
def _server_entry(self): | ||
self._main_app = moto.server.DomainDispatcherApplication( | ||
moto.server.create_backend_app, service=self._service_name | ||
) | ||
self._main_app.debug = True | ||
|
||
if self._socket: | ||
self._socket.close() # release right before we use it | ||
self._socket = None | ||
|
||
self._server = werkzeug.serving.make_server( | ||
self._ip_address, self._port, self._main_app, True | ||
) | ||
self._server.serve_forever() | ||
|
||
async def _start(self): | ||
self._thread = threading.Thread(target=self._server_entry, daemon=True) | ||
self._thread.start() | ||
|
||
async with aiohttp.ClientSession() as session: | ||
start = time.time() | ||
|
||
while time.time() - start < 10: | ||
if not self._thread.is_alive(): | ||
break | ||
|
||
try: | ||
# we need to bypass the proxies due to monkeypatches | ||
async with session.get( | ||
self.endpoint_url + "/static", timeout=CONNECT_TIMEOUT | ||
): | ||
pass | ||
break | ||
except (asyncio.TimeoutError, aiohttp.ClientConnectionError): | ||
await asyncio.sleep(0.5) | ||
else: | ||
await self._stop() # pytest.fail doesn't call stop_process | ||
raise Exception( | ||
"Cannot start MotoService: {}".format(self._service_name) | ||
) | ||
|
||
async def _stop(self): | ||
if self._server: | ||
self._server.shutdown() | ||
|
||
self._thread.join() |
Oops, something went wrong.