-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpytest_nameko.py
219 lines (161 loc) · 5.89 KB
/
pytest_nameko.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from __future__ import absolute_import
# all imports are inline to make sure they happen after eventlet.monkey_patch
# which is called in pytest_load_initial_conftests (calling monkey_patch at
# import time breaks the pytest capturemanager)
import pytest
def pytest_addoption(parser):
parser.addoption(
'--blocking-detection',
action='store_true',
dest='blocking_detection',
default=False,
help='turn on eventlet hub blocking detection')
parser.addoption(
"--log-level", action="store",
default='DEBUG',
help=("The logging-level for the test run."))
parser.addoption(
"--amqp-uri", action="store", dest='AMQP_URI',
default='amqp://guest:guest@localhost:5672/nameko_test',
help=("The AMQP-URI to connect to rabbit with."))
parser.addoption(
"--rabbit-ctl-uri", action="store", dest='RABBIT_CTL_URI',
default='http://guest:guest@localhost:15672',
help=("The URI for rabbit's management API."))
def pytest_load_initial_conftests():
# make sure we monkey_patch before local conftests
import eventlet
eventlet.monkey_patch()
def pytest_configure(config):
import logging
import sys
if config.option.blocking_detection: # pragma: no cover
from eventlet import debug
debug.hub_blocking_detection(True)
log_level = config.getoption('log_level')
if log_level is not None:
log_level = getattr(logging, log_level)
logging.basicConfig(level=log_level, stream=sys.stderr)
@pytest.fixture
def empty_config(request):
from nameko.constants import AMQP_URI_CONFIG_KEY
return {
AMQP_URI_CONFIG_KEY: ""
}
@pytest.fixture(scope='session')
def rabbit_manager(request):
from nameko.testing import rabbit
config = request.config
return rabbit.Client(config.getoption('RABBIT_CTL_URI'))
@pytest.yield_fixture()
def rabbit_config(request, rabbit_manager):
from kombu import pools
from nameko.testing.utils import (
reset_rabbit_vhost, reset_rabbit_connections,
get_rabbit_connections, get_rabbit_config)
amqp_uri = request.config.getoption('AMQP_URI')
conf = get_rabbit_config(amqp_uri)
reset_rabbit_connections(conf['vhost'], rabbit_manager)
reset_rabbit_vhost(conf['vhost'], conf['username'], rabbit_manager)
yield conf
pools.reset() # close connections in pools
# raise a runtime error if the test leaves any connections lying around
connections = get_rabbit_connections(conf['vhost'], rabbit_manager)
open_connections = [
conn for conn in connections if conn['state'] != "closed"
]
if open_connections:
count = len(open_connections)
raise RuntimeError("{} rabbit connection(s) left open.".format(count))
@pytest.fixture
def ensure_cleanup_order(request):
""" Ensure ``rabbit_config`` is invoked early if it's used by any fixture
in ``request``.
"""
if "rabbit_config" in request.funcargnames:
request.getfuncargvalue("rabbit_config")
@pytest.yield_fixture
def container_factory(ensure_cleanup_order):
from nameko.containers import ServiceContainer
all_containers = []
def make_container(service_cls, config, worker_ctx_cls=None):
container = ServiceContainer(service_cls, config, worker_ctx_cls)
all_containers.append(container)
return container
yield make_container
for c in all_containers:
try:
c.stop()
except: # pragma: no cover
pass
@pytest.yield_fixture
def runner_factory(ensure_cleanup_order):
from nameko.runners import ServiceRunner
all_runners = []
def make_runner(config, *service_classes):
runner = ServiceRunner(config)
for service_cls in service_classes:
runner.add_service(service_cls)
all_runners.append(runner)
return runner
yield make_runner
for r in all_runners:
try:
r.stop()
except: # pragma: no cover
pass
@pytest.yield_fixture
def predictable_call_ids(request):
import itertools
from mock import patch
with patch('nameko.containers.new_call_id', autospec=True) as get_id:
get_id.side_effect = (str(i) for i in itertools.count())
yield get_id
@pytest.fixture()
def web_config(empty_config):
import socket
from nameko.constants import WEB_SERVER_CONFIG_KEY
# find a port that's likely to be free
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 0))
port = sock.getsockname()[1]
sock.close()
cfg = empty_config
cfg[WEB_SERVER_CONFIG_KEY] = str(port)
return cfg
@pytest.fixture()
def web_config_port(web_config):
from nameko.constants import WEB_SERVER_CONFIG_KEY
from nameko.web.server import parse_address
return parse_address(web_config[WEB_SERVER_CONFIG_KEY]).port
@pytest.yield_fixture()
def web_session(web_config_port):
from requests import Session
from werkzeug.urls import url_join
class WebSession(Session):
def request(self, method, url, *args, **kwargs):
url = url_join('http://127.0.0.1:%d/' % web_config_port, url)
return Session.request(self, method, url, *args, **kwargs)
sess = WebSession()
with sess:
yield sess
@pytest.yield_fixture()
def websocket(web_config_port):
import eventlet
from nameko.testing.websocket import make_virtual_socket
active_sockets = []
def socket_creator():
ws_app, wait_for_sock = make_virtual_socket(
'127.0.0.1', web_config_port)
gr = eventlet.spawn(ws_app.run_forever)
active_sockets.append((gr, ws_app))
return wait_for_sock()
try:
yield socket_creator
finally:
for gr, ws_app in active_sockets:
try:
ws_app.close()
except Exception: # pragma: no cover
pass
gr.kill()