Skip to content

Commit

Permalink
add queue
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaini528 committed Dec 8, 2020
1 parent 52ec42e commit 2c6f6a9
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 15 deletions.
39 changes: 27 additions & 12 deletions src/Api/WebSocket/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class SocketClient
private $keysecret=[];




function __construct(array $config=[])
{
$this->config=$config;
Expand Down Expand Up @@ -132,22 +134,37 @@ protected function getData($global,$callback=null,$sub=[]){
//默认返回所有数据
if(empty($sub)){
foreach ($all_sub as $k=>$v){
if(is_array($v)) $table=$k;
else $table=$v;

$data=$global->get(strtolower($table));
if(empty($data)) continue;
$temp[$table]=$data;
if(is_array($v)){
if(empty($this->keysecret) || $this->keysecret['key']!=$k) continue;

foreach ($v as $vv){
$data=$global->getQueue($vv);
$temp[$vv]=$data;
}
}else{
$data=$global->get($v);
$temp[$v]=$data;
}
}
}else{
//返回规定的数据
if(!empty($this->keysecret)) {
//是否有私有数据
$all_sub=$global->get('all_sub');
if(isset($all_sub[$this->keysecret['key']])) $sub=array_merge($sub,$all_sub[$this->keysecret['key']]);

}
//print_r($sub);die;
foreach ($sub as $k=>$v){
$data=$global->get($v);
//判断私有数据是否需要走队列数据
$temp_v=explode(self::$USER_DELIMITER,$v);
if(count($temp_v)>1){
//private
$data=$global->getQueue($v);
}else{
//public
$data=$global->get($v);
}

if(empty($data)) continue;

$temp[$v]=$data;
Expand All @@ -171,12 +188,10 @@ function test(){
}

function test2(){
//print_r($this->client->global_key);
$global_key=$this->client->global_key;
foreach ($global_key as $k=>$v){
echo $k.PHP_EOL;
print_r($this->client->$v);

echo count($this->client->$v).'----'.$k.PHP_EOL;
echo json_encode($this->client->$v).PHP_EOL;
}
}
}
5 changes: 4 additions & 1 deletion src/Api/WebSocket/SocketFunction.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

trait SocketFunction
{
//标记分隔符
static $USER_DELIMITER='===';

/**
* @param $key_secret
* @return mixed
Expand Down Expand Up @@ -91,6 +94,6 @@ protected function log($message){
* @param $keysecret
*/
protected function userKey(array $keysecret,string $sub){
return $keysecret['key'].'='.$sub;
return $keysecret['key'].self::$USER_DELIMITER.$sub;
}
}
46 changes: 46 additions & 0 deletions src/Api/WebSocket/SocketGlobal.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,57 @@ protected function get($key){
return $this->client->$key;
}


protected function save($key,$value){
if(!isset($this->client->$key)) $this->add($key,$value);
else $this->client->$key=$value;
}

/**
* 对了获取数据
* @param $key
* @return array
*/
protected function getQueue($key){
if(!isset($this->client->$key) || empty($this->client->$key)) return [];

do{
$old_value=$new_value=$this->client->$key;

if(empty($new_value)) return [];
//队列先进先出。
$value=array_shift($new_value);
}
while(!$this->client->cas($key, $old_value, $new_value));

return $value;
}

/**
* 队列保存数据
* @param $key
* @param $value
*/
protected function saveQueue($key,$value){
//最大存储数据量,超过后保留一条最新的数据,其余数据全部删除。
$max=100;

if(!isset($this->client->$key)) $this->add($key,[$value]);
else {
do{
$old_value=$new_value=$this->client->$key;

//超过最大数据量,保留最新数据
if(count($new_value)>$max){
$new_value=[$value];
}else{
array_push($new_value,$value);
}
}
while(!$this->client->cas($key, $old_value, $new_value));
}
}

protected function addSubUpdate($data=[]){
do{
$old_value=$new_value=$this->client->add_sub;
Expand Down
2 changes: 1 addition & 1 deletion src/Api/WebSocket/SocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private function onMessage($global){
if(isset($data['e']) && $con->tag!='public') {
$table=$this->userKey($con->tag_keysecret,$data['e']);

$global->save($table,$data);
$global->saveQueue($table,$data);

$global->allSubUpdate([$con->tag_keysecret['key']=>[$table]],'add');
return;
Expand Down
2 changes: 1 addition & 1 deletion tests/websocket/client_spot.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@
'bchusdt@depth',
]);
print_r(json_encode($data));

die;
//The second way callback
$binance->keysecret($key_secret[0]);
$binance->getSubscribe([
Expand Down

0 comments on commit 2c6f6a9

Please sign in to comment.