-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmicrodot_websocket.py
177 lines (155 loc) · 5.8 KB
/
microdot_websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import binascii
import hashlib
from microdot import Response
class WebSocket:
CONT = 0
TEXT = 1
BINARY = 2
CLOSE = 8
PING = 9
PONG = 10
def __init__(self, request):
self.request = request
self.closed = False
def handshake(self):
response = self._handshake_response()
self.request.sock.send(b'HTTP/1.1 101 Switching Protocols\r\n')
self.request.sock.send(b'Upgrade: websocket\r\n')
self.request.sock.send(b'Connection: Upgrade\r\n')
self.request.sock.send(
b'Sec-WebSocket-Accept: ' + response + b'\r\n\r\n')
def receive(self):
while True:
opcode, payload = self._read_frame()
send_opcode, data = self._process_websocket_frame(opcode, payload)
if send_opcode: # pragma: no cover
self.send(data, send_opcode)
elif data: # pragma: no branch
return data
def send(self, data, opcode=None):
frame = self._encode_websocket_frame(
opcode or (self.TEXT if isinstance(data, str) else self.BINARY),
data)
self.request.sock.send(frame)
def close(self):
if not self.closed: # pragma: no cover
self.closed = True
self.send(b'', self.CLOSE)
def _handshake_response(self):
connection = False
upgrade = False
websocket_key = None
for header, value in self.request.headers.items():
h = header.lower()
if h == 'connection':
connection = True
if 'upgrade' not in value.lower():
return self.request.app.abort(400)
elif h == 'upgrade':
upgrade = True
if not value.lower() == 'websocket':
return self.request.app.abort(400)
elif h == 'sec-websocket-key':
websocket_key = value
if not connection or not upgrade or not websocket_key:
return self.request.app.abort(400)
d = hashlib.sha1(websocket_key.encode())
d.update(b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
return binascii.b2a_base64(d.digest())[:-1]
@classmethod
def _parse_frame_header(cls, header):
fin = header[0] & 0x80
opcode = header[0] & 0x0f
if fin == 0 or opcode == cls.CONT: # pragma: no cover
raise OSError(32, 'Continuation frames not supported')
has_mask = header[1] & 0x80
length = header[1] & 0x7f
if length == 126:
length = -2
elif length == 127:
length = -8
return fin, opcode, has_mask, length
def _process_websocket_frame(self, opcode, payload):
if opcode == self.TEXT:
payload = payload.decode()
elif opcode == self.BINARY:
pass
elif opcode == self.CLOSE:
raise OSError(32, 'Websocket connection closed')
elif opcode == self.PING:
return self.PONG, payload
elif opcode == self.PONG: # pragma: no branch
return None, None
return None, payload
@classmethod
def _encode_websocket_frame(cls, opcode, payload):
frame = bytearray()
frame.append(0x80 | opcode)
if opcode == cls.TEXT:
payload = payload.encode()
if len(payload) < 126:
frame.append(len(payload))
elif len(payload) < (1 << 16):
frame.append(126)
frame.extend(len(payload).to_bytes(2, 'big'))
else:
frame.append(127)
frame.extend(len(payload).to_bytes(8, 'big'))
frame.extend(payload)
return frame
def _read_frame(self):
header = self.request.sock.recv(2)
if len(header) != 2: # pragma: no cover
raise OSError(32, 'Websocket connection closed')
fin, opcode, has_mask, length = self._parse_frame_header(header)
if length < 0:
length = self.request.sock.recv(-length)
length = int.from_bytes(length, 'big')
if has_mask: # pragma: no cover
mask = self.request.sock.recv(4)
payload = self.request.sock.recv(length)
if has_mask: # pragma: no cover
payload = bytes(x ^ mask[i % 4] for i, x in enumerate(payload))
return opcode, payload
def websocket_upgrade(request):
"""Upgrade a request handler to a websocket connection.
This function can be called directly inside a route function to process a
WebSocket upgrade handshake, for example after the user's credentials are
verified. The function returns the websocket object::
@app.route('/echo')
def echo(request):
if not authenticate_user(request):
abort(401)
ws = websocket_upgrade(request)
while True:
message = ws.receive()
ws.send(message)
"""
ws = WebSocket(request)
ws.handshake()
@request.after_request
def after_request(request, response):
return Response.already_handled
return ws
def with_websocket(f):
"""Decorator to make a route a WebSocket endpoint.
This decorator is used to define a route that accepts websocket
connections. The route then receives a websocket object as a second
argument that it can use to send and receive messages::
@app.route('/echo')
@with_websocket
def echo(request, ws):
while True:
message = ws.receive()
ws.send(message)
"""
def wrapper(request, *args, **kwargs):
ws = websocket_upgrade(request)
try:
f(request, ws, *args, **kwargs)
ws.close() # pragma: no cover
except OSError as exc:
if exc.errno not in [32, 54, 104]: # pragma: no cover
raise
return ''
return wrapper