forked from stblassitude/ceptjs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdevserver.py
executable file
·64 lines (48 loc) · 1.53 KB
/
devserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python3
import aiohttp
import asyncio
from aiohttp import web
routes = web.RouteTableDef()
# @routes.get('/')
# async def hello(request):
# return web.Response(text="Hello, world")
@routes.get('/btxws')
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
print('opening connection')
reader, writer = await asyncio.open_connection(
'195.201.94.166', 20000)
async def server_to_websocket():
while True:
try:
data = await reader.read(64)
if not data:
print('socket connection closed')
break
await ws.send_bytes(data)
except ConnectionError:
await ws.close()
break
task = asyncio.get_event_loop().create_task(server_to_websocket())
async for msg in ws:
if msg.type == aiohttp.WSMsgType.BINARY:
try:
writer.write(msg.data)
await writer.drain()
except ConnectionError:
await ws.close()
writer.close()
elif msg.type == aiohttp.WSMsgType.ERROR:
if task:
task.cancel()
task = None
writer.close()
print('ws connection closed with exception %s' %
ws.exception())
writer.close()
return ws
app = web.Application()
app.add_routes([web.static('/web', './', show_index=True)])
app.add_routes(routes)
web.run_app(app)