-
Notifications
You must be signed in to change notification settings - Fork 0
/
cx_skinnyserver.py
274 lines (200 loc) · 7.48 KB
/
cx_skinnyserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import gc
class urlparser:
"""
This class store static functions, thats are responsible for encoding and
decoding dicts with data from and to strings, that can be received from
path from client, or send by response body.
"""
@staticmethod
def encode(structure):
"""
This function encode dict structure to URL-encoded string format.
:structure: Dict structure
:return: Same structure as string
"""
encoded = "?"
for key in structure:
encoded += __class__.__encode_string(key) + "="
encoded += __class__.__encode_string(str(structure[key])) + "&"
return encoded[:-1]
@staticmethod
def decode(url):
"""
This function decode arguments given in string format, and return
decoded dict.
:url: URL string to decode
:return: Decoded dict
"""
structure = dict()
if url.find("?") != -1:
url = url[url.find("?") + 1:]
for part in url.split("&"):
try:
splited = part.split("=")
key = __class__.__decode_string(splited[0])
value = __class__.__decode_string(splited[1])
structure[key] = value
except:
continue
return structure
@staticmethod
def __decode_string(content):
"""
This decode special characters from url (%ff) to standard python
string.
:content: Content to decode
:return: Decoded string
"""
result = bytes()
content = list(content)
while len(content):
char = content.pop(0)
if char != "%":
result += bytes(char, 'UTF-8')
continue
char = content.pop(0)
char += content.pop(0)
result += bytes([int(char, 16)])
return result.decode("UTF-8")
@staticmethod
def __encode_string(content):
"""
This encode content, from normal python string to (%ff) http path form.
:content: Normal python string to encode
:return: Encoded string
"""
return "".join([__class__.__encode_char(char) for char in content])
@staticmethod
def __encode_char(char):
"""
This function encode single char to url encoded.
:char: Char to encode
:return: Encoded char
"""
if char.isalpha() or char.isdigit():
return char
return str(char.encode("UTF-8"))[2:-1].replace("\\x", "%").upper()
class server_response:
"""
This class is a response from the server for cx_skinnyserver. It stores
headers, response status, and content. This class also generates the string
to be returned to the user.
"""
def __init__(self):
"""
This function creates a new instance of a response. By default, it
closes the connection and sets the response status to be successful.
:return: New response object
"""
self.mark_ok()
self.headers = {
"Connection": "close",
}
self.body = ""
def mark_ok(self):
"""
This function mark response as OK.
"""
self.status_code = 200
self.status_text = "OK"
self.body = "OK"
def mark_internal_error(self):
"""
This function mark response as Internal Server Error.
"""
self.status_code = 500
self.status_text = "Internal Server Error"
self.body = "Internal Server Error"
def mark_not_found(self):
"""
This function mark response as Not Found.
"""
self.status_code = 404
self.status_text = "Not Found"
self.body = "Not Found"
def mark_bad_request(self):
"""
This function mark response as Bad Request.
"""
self.status_code = 400
self.status_text = "Bad Request"
self.body = "Bad Request"
def serialize(self):
"""
This function serialize response, and return string ready to send to
client socket.
:return: Serialized response
"""
serialized = "HTTP/1.1 " + str(self.status_code) + " "
serialized += self.status_text + "\r\n"
self.headers["Content-Length"] = len(self.body)
for header in self.headers:
serialized += header + ": " + str(self.headers[header]) + "\r\n"
serialized += "\r\n" + self.body
return serialized
class server:
"""
This class is responsible for an instance of the cx_skinnyserver server.
It is responsible for a single instance listening on a single port.
"""
def __init__(self, socket):
"""
This function creates a REST server on the specified socket. The
server is empty, with no endpoints
:socket: Socket to listen on
:return: New server instance
"""
self.socket = socket
self.pathes = dict()
self.listening = False
def close(self):
"""
This function closing server (server exit from listening loop, after
run this function from service).
"""
self.listening = False
def add_path(self, callback, path = "/", method = "GET"):
"""
This function binds a new service to the server. Adding it requires
specifying the path, method, and callback (service). A service is a
function that takes the path and response as arguments. The path is the
path the user came in on, and we can extract data passed in through
URL-encoded methods. The response is a cx_response object that will be
serialized and sent back to the client.
:callback: Service to run when client go to specified path
:path: Path to bind service
:method: Method to bind service
"""
self.pathes[method + path] = callback
def listen(self):
"""
This function is the server listening loop. When this function is
called, the server's listening loop is started. The server will exit
this loop only when the close() function is called on the server
object.
"""
self.socket.listen(10)
self.listening = True
while self.listening:
gc.collect()
client, address = self.socket.accept()
request = client.makefile("rwb", 0).readline()
if isinstance(request, bytes):
request = request.decode("UTF-8")
request = request.split(" ")
response = server_response()
method = request[0]
path = request[1]
if path.find("?") == -1:
clean_path = path
else:
clean_path = path[:path.find("?")]
if method + clean_path not in self.pathes:
response.mark_not_found()
client.sendall(response.serialize().encode("UTF-8"))
continue
try:
self.pathes[method + clean_path](path, response)
except:
response.mark_internal_error()
client.sendall(response.serialize().encode("UTF-8"))