From 637ad96d374d87f0046315592cc07c3764ef7358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Jacquot?= Date: Wed, 29 May 2019 06:59:54 +0200 Subject: [PATCH 1/2] Handle multi queue processing --- src/LaravelQueueKafkaServiceProvider.php | 2 +- src/Queue/Jobs/KafkaJob.php | 16 +++++++++++---- src/Queue/KafkaQueue.php | 26 ++++++++++++++++-------- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/LaravelQueueKafkaServiceProvider.php b/src/LaravelQueueKafkaServiceProvider.php index 42639a5..0970b88 100644 --- a/src/LaravelQueueKafkaServiceProvider.php +++ b/src/LaravelQueueKafkaServiceProvider.php @@ -52,7 +52,7 @@ protected function registerDependencies() }); $this->app->bind('queue.kafka.consumer', function ($app, $parameters) { - return new \RdKafka\KafkaConsumer($parameters['conf']); + return new \RdKafka\Consumer($parameters['conf']); }); } diff --git a/src/Queue/Jobs/KafkaJob.php b/src/Queue/Jobs/KafkaJob.php index 894c24b..245b5c0 100644 --- a/src/Queue/Jobs/KafkaJob.php +++ b/src/Queue/Jobs/KafkaJob.php @@ -11,6 +11,7 @@ use Illuminate\Support\Str; use Rapide\LaravelQueueKafka\Exceptions\QueueKafkaException; use Rapide\LaravelQueueKafka\Queue\KafkaQueue; +use RdKafka\ConsumerTopic; use RdKafka\Message; class KafkaJob extends Job implements JobContract @@ -30,22 +31,29 @@ class KafkaJob extends Job implements JobContract */ protected $message; + /** + * @var ConsumerTopic + */ + protected $topic; + /** * KafkaJob constructor. * * @param Container $container * @param KafkaQueue $connection * @param Message $message - * @param $connectionName - * @param $queue + * @param string $connectionName + * @param string $queue + * @param ConsumerTopic $topic */ - public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue) + public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue, ConsumerTopic $topic) { $this->container = $container; $this->connection = $connection; $this->message = $message; $this->connectionName = $connectionName; $this->queue = $queue; + $this->topic = $topic; } /** @@ -102,7 +110,7 @@ public function delete() { try { parent::delete(); - $this->connection->getConsumer()->commitAsync($this->message); + $this->topic->offsetStore($this->message->partition, $this->message->offset); } catch (\RdKafka\Exception $exception) { throw new QueueKafkaException('Could not delete job from the queue', 0, $exception); } diff --git a/src/Queue/KafkaQueue.php b/src/Queue/KafkaQueue.php index 2b41784..5c2e9ff 100644 --- a/src/Queue/KafkaQueue.php +++ b/src/Queue/KafkaQueue.php @@ -33,20 +33,24 @@ class KafkaQueue extends Queue implements QueueContract */ private $producer; /** - * @var \RdKafka\KafkaConsumer + * @var \RdKafka\Consumer */ private $consumer; /** * @var array */ - private $subscribedQueueNames = []; + private $topics = []; + /** + * @var array + */ + private $queues = []; /** * @param \RdKafka\Producer $producer * @param \RdKafka\KafkaConsumer $consumer * @param array $config */ - public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer $consumer, $config) + public function __construct(\RdKafka\Producer $producer, \RdKafka\Consumer $consumer, $config) { $this->defaultQueue = $config['queue']; $this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5; @@ -140,12 +144,16 @@ public function pop($queue = null) { try { $queue = $this->getQueueName($queue); - if (!in_array($queue, $this->subscribedQueueNames)) { - $this->subscribedQueueNames[] = $queue; - $this->consumer->subscribe($this->subscribedQueueNames); + if (!array_key_exists($queue, $this->queues)) { + $this->queues[$queue] = $this->consumer->newQueue(); + $topicConf = new \RdKafka\TopicConf(); + $topicConf->set('auto.offset.reset', 'largest'); + + $this->topics[$queue] = $this->consumer->newTopic($queue, $topicConf); + $this->topics[$queue]->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $this->queues[$queue]); } - $message = $this->consumer->consume(1000); + $message = $this->queues[$queue]->consume(1000); if ($message === null) { return null; @@ -155,7 +163,7 @@ public function pop($queue = null) case RD_KAFKA_RESP_ERR_NO_ERROR: return new KafkaJob( $this->container, $this, $message, - $this->connectionName, $queue ?: $this->defaultQueue + $this->connectionName, $queue ?: $this->defaultQueue, $this->topics[$queue] ); case RD_KAFKA_RESP_ERR__PARTITION_EOF: case RD_KAFKA_RESP_ERR__TIMED_OUT: @@ -255,7 +263,7 @@ protected function reportConnectionError($action, Exception $e) } /** - * @return \RdKafka\KafkaConsumer + * @return \RdKafka\Consumer */ public function getConsumer() { From d3d797584e5a3f2f64a32d7dab4ccafe8d4d33da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Jacquot?= Date: Mon, 3 Jun 2019 10:27:28 +0200 Subject: [PATCH 2/2] Fix queue consuming from stored offset --- src/Queue/KafkaQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue/KafkaQueue.php b/src/Queue/KafkaQueue.php index 5c2e9ff..e653215 100644 --- a/src/Queue/KafkaQueue.php +++ b/src/Queue/KafkaQueue.php @@ -150,7 +150,7 @@ public function pop($queue = null) $topicConf->set('auto.offset.reset', 'largest'); $this->topics[$queue] = $this->consumer->newTopic($queue, $topicConf); - $this->topics[$queue]->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $this->queues[$queue]); + $this->topics[$queue]->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $this->queues[$queue]); } $message = $this->queues[$queue]->consume(1000);