forked from zhenbianshu/websocket
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.php
356 lines (320 loc) · 11.2 KB
/
server.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
<?php
error_reporting(E_ALL);
set_time_limit(0);// 设置超时时间为无限,防止超时
date_default_timezone_set('Asia/shanghai');
class WebSocket {
const LOG_PATH = './log/';
const LISTEN_SOCKET_NUM = 9;
/**
* @var array $sockets
* [
* (int)$socket => [
* info
* ]
* ]
* todo 解释socket与file号对应
*/
private $sockets = [];
private $master;
public function __construct($host, $port) {
try {
$this->master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// 设置IP和端口重用,在重启服务器后能重新使用此端口;
socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1);
// 将IP和端口绑定在服务器socket上;
socket_bind($this->master, $host, $port);
// listen函数使用主动连接套接口变为被连接套接口,使得一个进程可以接受其它进程的请求,从而成为一个服务器进程。在TCP服务器编程中listen函数把进程变为一个服务器,并指定相应的套接字变为被动连接,其中的能存储的请求不明的socket数目。
socket_listen($this->master, self::LISTEN_SOCKET_NUM);
} catch (\Exception $e) {
$err_code = socket_last_error();
$err_msg = socket_strerror($err_code);
$this->error([
'error_init_server',
$err_code,
$err_msg
]);
}
$this->sockets[0] = ['resource' => $this->master];
$pid = function_exists('posix_getpwuid')? posix_getpid():"Unknown";
$this->debug(["server: {$this->master} started,pid: {$pid}"]);
while (true) {
try {
$this->doServer();
} catch (\Exception $e) {
$this->error([
'error_do_server',
$e->getCode(),
$e->getMessage()
]);
}
}
}
private function doServer() {
$write = $except = NULL;
$sockets = array_column($this->sockets, 'resource');
$read_num = socket_select($sockets, $write, $except, NULL);
// select作为监视函数,参数分别是(监视可读,可写,异常,超时时间),返回可操作数目,出错时返回false;
if (false === $read_num) {
$this->error([
'error_select',
$err_code = socket_last_error(),
socket_strerror($err_code)
]);
return;
}
foreach ($sockets as $socket) {
// 如果可读的是服务器socket,则处理连接逻辑
if ($socket == $this->master) {
$client = socket_accept($this->master);
// 创建,绑定,监听后accept函数将会接受socket要来的连接,一旦有一个连接成功,将会返回一个新的socket资源用以交互,如果是一个多个连接的队列,只会处理第一个,如果没有连接的话,进程将会被阻塞,直到连接上.如果用set_socket_blocking或socket_set_noblock()设置了阻塞,会返回false;返回资源后,将会持续等待连接。
if (false === $client) {
$this->error([
'err_accept',
$err_code = socket_last_error(),
socket_strerror($err_code)
]);
continue;
} else {
self::connect($client);
continue;
}
} else {
// 如果可读的是其他已连接socket,则读取其数据,并处理应答逻辑
$bytes = @socket_recv($socket, $buffer, 2048, 0);
if ($bytes < 9) {
$recv_msg = $this->disconnect($socket);
} else {
if (!$this->sockets[(int)$socket]['handshake']) {
self::handShake($socket, $buffer);
continue;
} else {
$recv_msg = self::parse($buffer);
}
}
array_unshift($recv_msg, 'receive_msg');
$msg = self::dealMsg($socket, $recv_msg);
$this->broadcast($msg);
}
}
}
/**
* 将socket添加到已连接列表,但握手状态留空;
*
* @param $socket
*/
public function connect($socket) {
socket_getpeername($socket, $ip, $port);
$socket_info = [
'resource' => $socket,
'uname' => '',
'handshake' => false,
'ip' => $ip,
'port' => $port,
];
$this->sockets[(int)$socket] = $socket_info;
$this->debug(array_merge(['socket_connect'], $socket_info));
}
/**
* 客户端关闭连接
*
* @param $socket
*
* @return array
*/
private function disconnect($socket) {
$recv_msg = [
'type' => 'logout',
'content' => $this->sockets[(int)$socket]['uname'],
];
unset($this->sockets[(int)$socket]);
return $recv_msg;
}
/**
* 用公共握手算法握手
*
* @param $socket
* @param $buffer
*
* @return bool
*/
public function handShake($socket, $buffer) {
// 获取到客户端的升级密匙
$line_with_key = substr($buffer, strpos($buffer, 'Sec-WebSocket-Key:') + 18);
$key = trim(substr($line_with_key, 0, strpos($line_with_key, "\r\n")));
// 生成升级密匙,并拼接websocket升级头
$upgrade_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));// 升级key的算法
$upgrade_message = "HTTP/1.1 101 Switching Protocols\r\n";
$upgrade_message .= "Upgrade: websocket\r\n";
$upgrade_message .= "Sec-WebSocket-Version: 13\r\n";
$upgrade_message .= "Connection: Upgrade\r\n";
$upgrade_message .= "Sec-WebSocket-Accept:" . $upgrade_key . "\r\n\r\n";
socket_write($socket, $upgrade_message, strlen($upgrade_message));// 向socket里写入升级信息
$this->sockets[(int)$socket]['handshake'] = true;
socket_getpeername($socket, $ip, $port);
$this->debug([
'hand_shake',
$socket,
$ip,
$port
]);
// 向客户端发送握手成功消息,以触发客户端发送用户名动作;
$msg = [
'type' => 'handshake',
'content' => 'done',
];
$msg = $this->build(json_encode($msg));
socket_write($socket, $msg, strlen($msg));
return true;
}
/**
* 解析数据
*
* @param $buffer
*
* @return bool|string
*/
private function parse($buffer) {
$decoded = '';
$len = ord($buffer[1]) & 127;
if ($len === 126) {
$masks = substr($buffer, 4, 4);
$data = substr($buffer, 8);
} else if ($len === 127) {
$masks = substr($buffer, 10, 4);
$data = substr($buffer, 14);
} else {
$masks = substr($buffer, 2, 4);
$data = substr($buffer, 6);
}
for ($index = 0; $index < strlen($data); $index++) {
$decoded .= $data[$index] ^ $masks[$index % 4];
}
return json_decode($decoded, true);
}
/**
* 将普通信息组装成websocket数据帧
*
* @param $msg
*
* @return string
*/
private function build($msg) {
$frame = [];
$frame[0] = '81';
$len = strlen($msg);
if ($len < 126) {
$frame[1] = $len < 16 ? '0' . dechex($len) : dechex($len);
} else if ($len < 65025) {
$s = dechex($len);
$frame[1] = '7e' . str_repeat('0', 4 - strlen($s)) . $s;
} else {
$s = dechex($len);
$frame[1] = '7f' . str_repeat('0', 16 - strlen($s)) . $s;
}
$data = '';
$l = strlen($msg);
for ($i = 0; $i < $l; $i++) {
$data .= dechex(ord($msg{$i}));
}
$frame[2] = $data;
$data = implode('', $frame);
return pack("H*", $data);
}
/**
* 拼装信息
*
* @param $socket
* @param $recv_msg
* [
* 'type'=>user/login
* 'content'=>content
* ]
*
* @return string
*/
private function dealMsg($socket, $recv_msg) {
$msg_type = $recv_msg['type'];
$msg_content = $recv_msg['content'];
$response = [];
switch ($msg_type) {
case 'login':
$this->sockets[(int)$socket]['uname'] = $msg_content;
// 取得最新的名字记录
$user_list = array_column($this->sockets, 'uname');
$response['type'] = 'login';
$response['content'] = $msg_content;
$response['user_list'] = $user_list;
break;
case 'logout':
$user_list = array_column($this->sockets, 'uname');
$response['type'] = 'logout';
$response['content'] = $msg_content;
$response['user_list'] = $user_list;
break;
case 'user':
$uname = $this->sockets[(int)$socket]['uname'];
$response['type'] = 'user';
$response['from'] = $uname;
$response['content'] = $msg_content;
break;
}
return $this->build(json_encode($response));
}
/**
* 广播消息
*
* @param $data
*/
private function broadcast($data) {
foreach ($this->sockets as $socket) {
if ($socket['resource'] == $this->master) {
continue;
}
socket_write($socket['resource'], $data, strlen($data));
}
}
/**
* colorize output
*/
private function dout($text, $color = null, $newLine = true)
{
$styles = array(
'success' => "\033[0;32m%s\033[0m",
'error' => "\033[31;31m%s\033[0m",
'info' => "\033[33;33m%s\033[0m"
);
$format = '%s';
if (isset($styles[$color]) && USE_ANSI) {
$format = $styles[$color];
}
if ($newLine) {
$format .= PHP_EOL;
}
printf($format, $text);
}
/**
* 记录debug信息
*
* @param array $info
*/
private function debug(array $info) {
$time = date('Y-m-d H:i:s');
array_unshift($info, $time);
$info = array_map('json_encode', $info);
$line = implode(' | ', $info) . "\r\n";
self::dout($line,null,false);
file_put_contents(self::LOG_PATH . 'websocket_debug.log', $line, FILE_APPEND);
}
/**
* 记录错误信息
*
* @param array $info
*/
private function error(array $info) {
$time = date('Y-m-d H:i:s');
array_unshift($info, $time);
$info = array_map('json_encode', $info);
file_put_contents(self::LOG_PATH . 'websocket_error.log', implode(' | ', $info) . "\r\n", FILE_APPEND);
}
}
$ws = new WebSocket("127.0.0.1", "8080");