-
Notifications
You must be signed in to change notification settings - Fork 7
/
sample_tasker.py
92 lines (77 loc) · 2.54 KB
/
sample_tasker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
import time
from functools import partial
from aiohttp import web
from navconfig.logging import logging
from navigator_auth import AuthHandler
from navigator import Application
from navigator.responses import HTMLResponse
from navigator.background import (
SERVICE_NAME,
BackgroundQueue,
BackgroundTask,
TaskWrapper
)
# Middleware to print request details
@web.middleware
async def debug_middleware(request, handler):
app = request.app
for route in app.router.routes():
logging.debug(
f"Route added: {route.resource}, method: {route.method}, Path: {route.resource.canonical}"
)
logging.debug(
f"Request received: {request.method} {request.path}"
)
match_info = request.match_info
logging.debug(f"Matched info: {match_info}")
response = await handler(request)
return response
app = Application(
middlewares=[debug_middleware]
)
auth = AuthHandler()
auth.setup(app) # configure this Auth system into App.
# Background Queue:
BackgroundQueue(app=app, max_workers=5, queue_size=5, enable_profiling=True)
@app.get('/')
async def hola(request: web.Request) -> web.Response:
return HTMLResponse(body="Hola Mundo")
async def send_email(email, message):
print(' :: Waiting for 10 seconds to finish task :: ')
await asyncio.sleep(10) # Simulate email sending
print(f"Email sent to {email} with message: {message}")
return f"Email sent to {email} with message: {message}"
async def sent_message(message, *args, **kwargs):
print(':: Called as Callback ::')
print(message, args, kwargs)
def blocking_code(request):
time.sleep(10)
print(":: Blocking code executed ::")
def blocking_task(request):
print(':: STARTS BLOCKING CODE ::')
time.sleep(10)
print(":: Blocking TASK executed ::")
async def handle_post(request):
data = await request.json()
tasker = request.app[SERVICE_NAME]
# await tasker.put(send_email, data['email'], data['message'])
fn = partial(blocking_task, request)
await tasker.put(fn)
# Or using the Task Wrapper:
task = TaskWrapper(
send_email, data['email'], data['message'],
jitter=5
)
task.add_callback(sent_message)
await tasker.put(task)
# Using Background Task Runner:
# task = BackgroundTask(blocking_task, request)
# asyncio.create_task(task.run())
return web.json_response({'status': 'Task enqueued'})
app.router.add_post('/manage_tasks', handle_post)
if __name__ == '__main__':
try:
app.run()
except KeyboardInterrupt:
pass