Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

某个队列的消费者不足时,再给这个队列添加 work进程 #12

Open
MrKLL opened this issue Aug 15, 2018 · 12 comments
Open

Comments

@MrKLL
Copy link

MrKLL commented Aug 15, 2018

你好,我现在使用tp5队列来做一个风控,买卖股票的订单会随着股票价钱波动产生盈亏,达到某个亏损值的时候我需要进行平仓,可以队列每一秒只能处理15个订单,我想问一下怎么可以加进程,让系统处理更多的订单呢

@coolseven
Copy link
Owner

coolseven commented Aug 15, 2018 via email

@MrKLL
Copy link
Author

MrKLL commented Aug 15, 2018 via email

@coolseven
Copy link
Owner

coolseven commented Aug 15, 2018 via email

@MrKLL
Copy link
Author

MrKLL commented Aug 15, 2018 via email

@coolseven
Copy link
Owner

你可以尝试一下当订单生成之后主动向队列推送一条消息, 然后由队列的消费者去处理这个订单

而不是定时去扫描现有的所有订单

@MrKLL
Copy link
Author

MrKLL commented Aug 15, 2018

我现在的做法貌似就是这样,我是订单任务都推到队列中,然后在消费者方法中取任务进行判断,没有达到某个点就用release(1)隔一秒继续执行,达到了就去平仓,删除任务,但是这样我看了一下,每秒就只能处理15个订单,而且我换成release(),每秒处理的也就是15单

@coolseven
Copy link
Owner

coolseven commented Aug 15, 2018

你可以把需求拆分成两个部分:
第一部分负责从找出达到平仓条件的订单, 并将达到平仓条件的订单推送到队列中
第二个部分负责处理达到平仓条件的订单

第一部分可以不用队列来做, 而是用常驻脚本来做,
该常驻脚本的伪代码如下:

while(true){
     // 从数据库或缓存中按照时间或者其他维度取出一定数量的订单
     $ordersToCheck = $this->queryOrders();
    
     // 从这些订单中过滤出达到平仓条件的订单
     $orders = $this->filterSuitableOrders($ordersToCheck) ;   

     foreach($orders as $order){
           // 将达到平仓条件的订单推送到 'close-position' 队列中, 由消费进程去进行平仓操作
           Queue::push($order, 'close-position');
     }
}

第二部分负责消费 'close-position' 队列中的订单

你可以找一下每秒处理15单的瓶颈在哪里,

  • 如果瓶颈在过滤出达到平仓条件的订单的话, 就优化第一部分的逻辑, 核心思路是并行过滤
    比如:
    每次将订单存储到缓存或者数据库时, 根据订单的某个特征,如 订单号的 前3位数字, 将订单存储到不同的缓存 list 里面 或者 不同的表里面
    然后, 针对每个 缓存 list , 或者针对每个表, 分别启动一个或多个常驻脚本, 这个常驻脚本负责从它对应的 缓存 list 或者 表里面进行扫描和过滤

  • 如果瓶颈在处理达到平仓条件的订单的话, 就增加 'close-position' 队列的消费进程数量

@MrKLL
Copy link
Author

MrKLL commented Aug 16, 2018 via email

@im55cc
Copy link

im55cc commented Apr 29, 2020

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

@coolseven
Copy link
Owner

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

获取一个待消费的任务的本质是调用 redis 的 pop() 方法得到一个 job payload
由于 redis 服务是单进程的, 所以即使多个消费进程同时调用 pop() 方法, 也不会得到重复的 job payload

@im55cc
Copy link

im55cc commented May 10, 2020 via email

@Caesar0220
Copy link

你好我想请问一下。我创建multiTask的时候,taskA跟taskB都进入队列了,然后运行php think queue:work --queue multiTaskJobQueue的时候,第一次消费了TaskA,第二次消费了TaskB。但是命令行中并无输出。只有Processed: application\index\job\MultiTask@taskB这种提示,请问是什么原因?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants