-
When I use memory as the jobs driver, I have config: jobs:
consume: ["queue1", "queue2"]
pool:
num_workers: 1
pipelines:
queue1:
driver: memory
config:
priority: 10
prefetch: 1
queue2:
driver: memory
config:
priority: 10
prefetch: 1 When I push 2 messages to queue1, I find that there is only one php worker.php process, which is ok. But When I use nats as the jobs driver, I have config: jobs:
consume: ["queue1", "queue2"]
pool:
num_workers: 1
pipelines:
queue1:
driver: nats
config:
priority: 10
prefetch: 1
delete_after_ack: true
stream: default
queue2:
driver: nats
config:
priority: 10
prefetch: 1
delete_after_ack: true
stream: default I find that there are two php worker.php process. Unless I configure different streams for different pipelines. I don't really understand how the pool worker works, but I think it should be explicitly written in the documentation or restricted in the code to not allow the same stream to be used |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 19 replies
-
Hey @shellphy 👋 |
Beta Was this translation helpful? Give feedback.
-
config is: nats:
addr: nats://nats:4222
jobs:
consume: ["queue1", "queue2"]
num_workers: 1
pipelines:
queue1:
driver: nats
config:
priority: 10
prefetch: 1
delete_after_ack: true
subject: default_queue
stream: default_queue
queue2:
driver: nats
config:
priority: 10
prefetch: 1
delete_after_ack: true
subject: default_queue
stream: default_queue send If I configure different streams for different pipelines, it will be normal: ...
queue1:
...
stream: default_queue1
...
queue2:
...
stream: default_queue2
... So I think we should explain this situation in the documentation, with special attention. |
Beta Was this translation helpful? Give feedback.
-
config is: version: '3'
rpc:
listen: 'tcp://127.0.0.1:6001'
server:
command: 'php worker.php'
nats:
addr: nats://127.0.0.1:4222
logs:
mode: development
level: debug
jobs:
consume: ["queue1", "queue2"]
pool:
num_workers: 1
debug: true
pipelines:
queue1:
driver: nats
config:
priority: 10
prefetch: 1
delete_after_ack: true
subject: default_queue
stream: default_queue
queue2:
driver: nats
config:
priority: 10
prefetch: 1
delete_after_ack: true
subject: default_queue
stream: default_queue producer.php: <?php
use Spiral\Goridge\RPC\RPC;
use Spiral\RoadRunner\Jobs\Jobs;
require "vendor/autoload.php";
$jobs = new Jobs(
RPC::create('tcp://127.0.0.1:6001')
);
$queue = $jobs->connect('queue1');
$task = $queue->create("dummy", 'hello');
$result = $queue->dispatch($task);
var_dump($result); worker.php <?php
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
require "vendor/autoload.php";
$consumer = new Spiral\RoadRunner\Jobs\Consumer();
while ($task = $consumer->waitTask()) {
try {
echo "received " . $task->getPayload();
sleep(20);
} catch (\Throwable $e) {
var_dump($e->getMessage());
}
} |
Beta Was this translation helpful? Give feedback.
Purge
.