-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
159 lines (126 loc) · 6.85 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Built-in library
import asyncio
import logging
import time
# Third-party library
import rx.operators as ops
import prometheus_client
from prometheus_client import Gauge, Summary, Counter, CONTENT_TYPE_LATEST
from rx.subject import Subject
from rx.scheduler.eventloop import AsyncIOScheduler
from aiohttp import web, ClientSession
from aio_pika import connect_robust
# Costum library
from handler import OrderHandler, AccountHandler
# Log configuration
LOG_REQUEST_FORMAT = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s')
LOG_REQUEST_HANDLER = logging.FileHandler('request.log')
LOG_REQUEST_HANDLER.setFormatter(LOG_REQUEST_FORMAT)
class Webserver:
def __init__(self):
self.app = web.Application()
self.webservice = ClientSession()
self.loop = asyncio.get_event_loop()
self.request = Subject() # Requests observable object
self.subscriptions = [] # List for containing disposable request observer
self.rmqConn = None
self.channel = {}
self.orderHandler = OrderHandler()
self.accountHandler = AccountHandler()
self.requestLogger = logging.getLogger('request')
self.requestLogger.addHandler(LOG_REQUEST_HANDLER)
self.requestLogger.setLevel(logging.INFO)
def logRequest(self, request : web.Request):
self.requestLogger.info(f"Incoming request <- {request.remote}|{request.method}|{request.path}")
# General handler for dispatching request to every request observers
async def dispatcher(self, request: web.Request) -> web.Response:
startTime = time.time()
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).inc()
future = asyncio.Future()
request['future'] = future
request['loop'] = self.loop
try:
body = await request.json()
request['body'] = body
request['queue'] = request.path.replace('/','')
request['channel'] = self.channel[request.path]
except Exception as e:
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).dec()
self.app['REQUEST_COUNT'].labels(request.path, request.method).inc()
return web.json_response({'status':'FAIL', 'order_id': None, 'client_ref': None, 'reason':'Failed to parse request body'}, status=400)
self.request.on_next(request) # Pass the request to the observers
await future
result = future.result() # Get the result response from the observers
latency = time.time() - startTime
self.app['REQUEST_LATENCY'].labels(request.path, request.method).observe(latency)
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).dec()
self.app['REQUEST_COUNT'].labels(request.path, request.method).inc()
return result
async def accountQuery(self, request: web.Request) -> web.Response:
startTime = time.time()
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).inc()
account = request.match_info['account']
response = {'results': []}
async with self.webservice.get(f'http://localhost:8001/accounts/{account}') as resp:
response = await resp.json()
latency = time.time() - startTime
self.app['REQUEST_LATENCY'].labels(request.path, request.method).observe(latency)
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).dec()
self.app['REQUEST_COUNT'].labels(request.path, request.method).inc()
return web.json_response(response)
async def orderQuery(self, request: web.Request) -> web.Response:
startTime = time.time()
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).inc()
account = request.match_info['account']
response = {'results': []}
async with self.webservice.get(f'http://localhost:8002/orders/{account}') as resp:
response = await resp.json()
latency = time.time() - startTime
self.app['REQUEST_LATENCY'].labels(request.path, request.method).observe(latency)
self.app['REQUEST_PROGRESS'].labels(request.path, request.method).dec()
self.app['REQUEST_COUNT'].labels(request.path, request.method).inc()
return web.json_response(response)
async def metrics(self, request: web.Request) -> web.Response:
resp = web.Response(body=prometheus_client.generate_latest())
resp.content_type = CONTENT_TYPE_LATEST
return resp
async def on_shutdown(self, app: web.Application):
await self.rmqConn.close()
map(lambda i: i.dispose(), self.subscriptions)
async def init(self):
self.app['REQUEST_COUNT'] = Counter('request_total', 'Total Incoming Request', ('path', 'method'), unit='requests')
self.app['REQUEST_LATENCY'] = Summary('request_latency', 'Request Process Time', ('path', 'method'), unit='seconds')
self.app['REQUEST_PROGRESS'] = Gauge('request_progress', 'Request in Progress', ('path', 'method'), unit='requests')
# Establish connection to RabbitMQ
self.rmqConn = await connect_robust(login='ikhwanrnurzaman', password='123456')
# Establish channel for order request and declare order queue
self.channel['/order'] = await self.rmqConn.channel()
await self.channel['/order'].declare_queue('order', durable=True)
# Establish channel for account request and declare account queue
self.channel['/account'] = await self.rmqConn.channel()
await self.channel['/account'].declare_queue('account', durable=True)
# Create disposable request observer for handling order request. Only request to /order will be passed to OrderHandler
dispose = self.request.pipe(
ops.filter(lambda i : i.path == '/order'),
ops.do_action(self.logRequest),
ops.filter(self.orderHandler.orderVerificator)
).subscribe(self.orderHandler, scheduler=AsyncIOScheduler)
self.subscriptions.append(dispose)
# Create disposable request observer for handling account request. Only request to /account will be passed to OrderHandler
dispose = self.request.pipe(
ops.filter(lambda i : i.path == '/account'),
ops.do_action(self.logRequest),
ops.filter(self.accountHandler.accountVerificator)
).subscribe(self.accountHandler, scheduler=AsyncIOScheduler)
self.subscriptions.append(dispose)
self.app.router.add_post('/order', self.dispatcher, name='order')
self.app.router.add_get('/orders/{account}', self.orderQuery)
self.app.router.add_post('/account', self.dispatcher, name='account')
self.app.router.add_get('/accounts/{account}', self.accountQuery)
self.app.router.add_get('/metrics', self.metrics)
self.app.on_shutdown.append(self.on_shutdown)
return self.app
def run(self):
web.run_app(self.init())
webserver = Webserver()
webserver.run()