Skip to content

Commit

Permalink
Merge pull request #19 from L3o-pold/handle_multiqueue_processing
Browse files Browse the repository at this point in the history
Handle multi queue processing
  • Loading branch information
petermein authored Oct 16, 2019
2 parents eeb63e7 + d3d7975 commit 27f6bdc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/LaravelQueueKafkaServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected function registerDependencies()
});

$this->app->bind('queue.kafka.consumer', function ($app, $parameters) {
return new \RdKafka\KafkaConsumer($parameters['conf']);
return new \RdKafka\Consumer($parameters['conf']);
});
}

Expand Down
16 changes: 12 additions & 4 deletions src/Queue/Jobs/KafkaJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Illuminate\Support\Str;
use Rapide\LaravelQueueKafka\Exceptions\QueueKafkaException;
use Rapide\LaravelQueueKafka\Queue\KafkaQueue;
use RdKafka\ConsumerTopic;
use RdKafka\Message;

class KafkaJob extends Job implements JobContract
Expand All @@ -30,22 +31,29 @@ class KafkaJob extends Job implements JobContract
*/
protected $message;

/**
* @var ConsumerTopic
*/
protected $topic;

/**
* KafkaJob constructor.
*
* @param Container $container
* @param KafkaQueue $connection
* @param Message $message
* @param $connectionName
* @param $queue
* @param string $connectionName
* @param string $queue
* @param ConsumerTopic $topic
*/
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue)
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue, ConsumerTopic $topic)
{
$this->container = $container;
$this->connection = $connection;
$this->message = $message;
$this->connectionName = $connectionName;
$this->queue = $queue;
$this->topic = $topic;
}

/**
Expand Down Expand Up @@ -102,7 +110,7 @@ public function delete()
{
try {
parent::delete();
$this->connection->getConsumer()->commitAsync($this->message);
$this->topic->offsetStore($this->message->partition, $this->message->offset);
} catch (\RdKafka\Exception $exception) {
throw new QueueKafkaException('Could not delete job from the queue', 0, $exception);
}
Expand Down
26 changes: 17 additions & 9 deletions src/Queue/KafkaQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ class KafkaQueue extends Queue implements QueueContract
*/
private $producer;
/**
* @var \RdKafka\KafkaConsumer
* @var \RdKafka\Consumer
*/
private $consumer;
/**
* @var array
*/
private $subscribedQueueNames = [];
private $topics = [];
/**
* @var array
*/
private $queues = [];

/**
* @param \RdKafka\Producer $producer
* @param \RdKafka\KafkaConsumer $consumer
* @param array $config
*/
public function __construct(\RdKafka\Producer $producer, \RdKafka\KafkaConsumer $consumer, $config)
public function __construct(\RdKafka\Producer $producer, \RdKafka\Consumer $consumer, $config)
{
$this->defaultQueue = $config['queue'];
$this->sleepOnError = isset($config['sleep_on_error']) ? $config['sleep_on_error'] : 5;
Expand Down Expand Up @@ -140,12 +144,16 @@ public function pop($queue = null)
{
try {
$queue = $this->getQueueName($queue);
if (!in_array($queue, $this->subscribedQueueNames)) {
$this->subscribedQueueNames[] = $queue;
$this->consumer->subscribe($this->subscribedQueueNames);
if (!array_key_exists($queue, $this->queues)) {
$this->queues[$queue] = $this->consumer->newQueue();
$topicConf = new \RdKafka\TopicConf();
$topicConf->set('auto.offset.reset', 'largest');

$this->topics[$queue] = $this->consumer->newTopic($queue, $topicConf);
$this->topics[$queue]->consumeQueueStart(0, RD_KAFKA_OFFSET_STORED, $this->queues[$queue]);
}

$message = $this->consumer->consume(1000);
$message = $this->queues[$queue]->consume(1000);

if ($message === null) {
return null;
Expand All @@ -155,7 +163,7 @@ public function pop($queue = null)
case RD_KAFKA_RESP_ERR_NO_ERROR:
return new KafkaJob(
$this->container, $this, $message,
$this->connectionName, $queue ?: $this->defaultQueue
$this->connectionName, $queue ?: $this->defaultQueue, $this->topics[$queue]
);
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
Expand Down Expand Up @@ -255,7 +263,7 @@ protected function reportConnectionError($action, Exception $e)
}

/**
* @return \RdKafka\KafkaConsumer
* @return \RdKafka\Consumer
*/
public function getConsumer()
{
Expand Down

0 comments on commit 27f6bdc

Please sign in to comment.