From 2c6f6a94bfe4c5c8f69177349da6b11bd69b0d72 Mon Sep 17 00:00:00 2001 From: lin <465382251@qq.com> Date: Tue, 8 Dec 2020 11:38:58 +0800 Subject: [PATCH] add queue --- src/Api/WebSocket/SocketClient.php | 39 +++++++++++++++-------- src/Api/WebSocket/SocketFunction.php | 5 ++- src/Api/WebSocket/SocketGlobal.php | 46 ++++++++++++++++++++++++++++ src/Api/WebSocket/SocketServer.php | 2 +- tests/websocket/client_spot.php | 2 +- 5 files changed, 79 insertions(+), 15 deletions(-) diff --git a/src/Api/WebSocket/SocketClient.php b/src/Api/WebSocket/SocketClient.php index 2437fe4..e0717e9 100644 --- a/src/Api/WebSocket/SocketClient.php +++ b/src/Api/WebSocket/SocketClient.php @@ -20,6 +20,8 @@ class SocketClient private $keysecret=[]; + + function __construct(array $config=[]) { $this->config=$config; @@ -132,22 +134,37 @@ protected function getData($global,$callback=null,$sub=[]){ //默认返回所有数据 if(empty($sub)){ foreach ($all_sub as $k=>$v){ - if(is_array($v)) $table=$k; - else $table=$v; - - $data=$global->get(strtolower($table)); - if(empty($data)) continue; - $temp[$table]=$data; + if(is_array($v)){ + if(empty($this->keysecret) || $this->keysecret['key']!=$k) continue; + + foreach ($v as $vv){ + $data=$global->getQueue($vv); + $temp[$vv]=$data; + } + }else{ + $data=$global->get($v); + $temp[$v]=$data; + } } }else{ //返回规定的数据 if(!empty($this->keysecret)) { //是否有私有数据 - $all_sub=$global->get('all_sub'); if(isset($all_sub[$this->keysecret['key']])) $sub=array_merge($sub,$all_sub[$this->keysecret['key']]); + } + //print_r($sub);die; foreach ($sub as $k=>$v){ - $data=$global->get($v); + //判断私有数据是否需要走队列数据 + $temp_v=explode(self::$USER_DELIMITER,$v); + if(count($temp_v)>1){ + //private + $data=$global->getQueue($v); + }else{ + //public + $data=$global->get($v); + } + if(empty($data)) continue; $temp[$v]=$data; @@ -171,12 +188,10 @@ function test(){ } function test2(){ - //print_r($this->client->global_key); $global_key=$this->client->global_key; foreach ($global_key as $k=>$v){ - echo $k.PHP_EOL; - print_r($this->client->$v); - + echo count($this->client->$v).'----'.$k.PHP_EOL; + echo json_encode($this->client->$v).PHP_EOL; } } } diff --git a/src/Api/WebSocket/SocketFunction.php b/src/Api/WebSocket/SocketFunction.php index 6c8fbe4..2598c77 100644 --- a/src/Api/WebSocket/SocketFunction.php +++ b/src/Api/WebSocket/SocketFunction.php @@ -8,6 +8,9 @@ trait SocketFunction { + //标记分隔符 + static $USER_DELIMITER='==='; + /** * @param $key_secret * @return mixed @@ -91,6 +94,6 @@ protected function log($message){ * @param $keysecret */ protected function userKey(array $keysecret,string $sub){ - return $keysecret['key'].'='.$sub; + return $keysecret['key'].self::$USER_DELIMITER.$sub; } } diff --git a/src/Api/WebSocket/SocketGlobal.php b/src/Api/WebSocket/SocketGlobal.php index f9597b7..7bfee8a 100644 --- a/src/Api/WebSocket/SocketGlobal.php +++ b/src/Api/WebSocket/SocketGlobal.php @@ -53,11 +53,57 @@ protected function get($key){ return $this->client->$key; } + protected function save($key,$value){ if(!isset($this->client->$key)) $this->add($key,$value); else $this->client->$key=$value; } + /** + * 对了获取数据 + * @param $key + * @return array + */ + protected function getQueue($key){ + if(!isset($this->client->$key) || empty($this->client->$key)) return []; + + do{ + $old_value=$new_value=$this->client->$key; + + if(empty($new_value)) return []; + //队列先进先出。 + $value=array_shift($new_value); + } + while(!$this->client->cas($key, $old_value, $new_value)); + + return $value; + } + + /** + * 队列保存数据 + * @param $key + * @param $value + */ + protected function saveQueue($key,$value){ + //最大存储数据量,超过后保留一条最新的数据,其余数据全部删除。 + $max=100; + + if(!isset($this->client->$key)) $this->add($key,[$value]); + else { + do{ + $old_value=$new_value=$this->client->$key; + + //超过最大数据量,保留最新数据 + if(count($new_value)>$max){ + $new_value=[$value]; + }else{ + array_push($new_value,$value); + } + } + while(!$this->client->cas($key, $old_value, $new_value)); + } + } + protected function addSubUpdate($data=[]){ do{ $old_value=$new_value=$this->client->add_sub; diff --git a/src/Api/WebSocket/SocketServer.php b/src/Api/WebSocket/SocketServer.php index 3be0f9f..7389408 100644 --- a/src/Api/WebSocket/SocketServer.php +++ b/src/Api/WebSocket/SocketServer.php @@ -108,7 +108,7 @@ private function onMessage($global){ if(isset($data['e']) && $con->tag!='public') { $table=$this->userKey($con->tag_keysecret,$data['e']); - $global->save($table,$data); + $global->saveQueue($table,$data); $global->allSubUpdate([$con->tag_keysecret['key']=>[$table]],'add'); return; diff --git a/tests/websocket/client_spot.php b/tests/websocket/client_spot.php index 2212bc9..d7803a4 100644 --- a/tests/websocket/client_spot.php +++ b/tests/websocket/client_spot.php @@ -185,7 +185,7 @@ 'bchusdt@depth', ]); print_r(json_encode($data)); - + die; //The second way callback $binance->keysecret($key_secret[0]); $binance->getSubscribe([