Skip to content

Commit

Permalink
Merge pull request #6 from Sin30/all-fix
Browse files Browse the repository at this point in the history
Fix several bugs that stopping this queue from working #6 by Sin30
  • Loading branch information
petermein authored Sep 26, 2017
2 parents f30a279 + c2f11e8 commit b9eb356
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 19 deletions.
24 changes: 9 additions & 15 deletions src/Queue/Jobs/KafkaJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Rapide\LaravelQueueKafka\Queue\Jobs;

use Exception;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Database\DetectsDeadlocks;
use Illuminate\Queue\Jobs\Job;
Expand All @@ -22,15 +23,19 @@ class KafkaJob extends Job implements JobContract
/**
* KafkaJob constructor.
*
* @param Container $container
* @param KafkaQueue $connection
* @param Message $message
* @param $queue
* @param $connectionName
* @param $queue
*/
public function __construct(KafkaQueue $connection, Message $message, $queue)
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue)
{
$this->container = $container;
$this->connection = $connection;
$this->queue = $queue;
$this->message = $message;
$this->connectionName = $connectionName;
$this->queue = $queue;
}

/**
Expand Down Expand Up @@ -67,7 +72,7 @@ public function fire()
*/
public function attempts()
{
return 1;
return (int) ($this->payload()['attempts']) + 1;
}

/**
Expand Down Expand Up @@ -101,7 +106,6 @@ public function release($delay = 0)
parent::release($delay);

$this->delete();
$this->setAttempts($this->attempts() + 1);

$body = $this->payload();

Expand All @@ -123,16 +127,6 @@ public function release($delay = 0)
}
}

/**
* Sets the count of attempts at processing this job.
*
* @param int $count
*/
private function setAttempts($count)
{
$this->connection->setAttempts($count);
}

/**
* Get the job identifier.
*
Expand Down
17 changes: 14 additions & 3 deletions src/Queue/KafkaQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class KafkaQueue extends Queue implements QueueContract
* @var \RdKafka\KafkaConsumer
*/
private $consumer;
/**
* @var array
*/
private $subscribedQueueNames = [];

/**
* @param \RdKafka\Producer $producer
Expand Down Expand Up @@ -122,17 +126,24 @@ public function later($delay, $job, $data = '', $queue = null)
*/
public function pop($queue = null)
{
$this->consumer->subscribe([$this->getQueueName($queue)]);
$queue = $this->getQueueName($queue);
if (!in_array($queue, $this->subscribedQueueNames)) {
$this->subscribedQueueNames[] = $queue;
$this->consumer->subscribe($this->subscribedQueueNames);
}

$message = $this->consumer->consume(30 * 1000);
$message = $this->consumer->consume(1000);

if ($message === null) {
return;
}

switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
return new KafkaJob($this, $message, $queue ?: $this->defaultQueue);
return new KafkaJob(
$this->container, $this, $message,
$this->connectionName, $queue ?: $this->defaultQueue
);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
case RD_KAFKA_RESP_ERR__TIMED_OUT:
Expand Down
3 changes: 3 additions & 0 deletions tests/KafkaQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
/**
* @property \Mockery\MockInterface producer
* @property \Mockery\MockInterface consumer
* @property \Mockery\MockInterface $container
* @property array config
* @property KafkaQueue queue
*/
Expand All @@ -18,13 +19,15 @@ public function setUp()

$this->producer = Mockery::mock(\RdKafka\Producer::class);
$this->consumer = Mockery::mock(\RdKafka\KafkaConsumer::class);
$this->container = Mockery::mock(\Illuminate\Container\Container::class);

$this->config = [
'queue' => str_random(),
'sleep_error' => true,
];

$this->queue = new KafkaQueue($this->producer, $this->consumer, $this->config);
$this->queue->setContainer($this->container);
}

public function test_size()
Expand Down
2 changes: 1 addition & 1 deletion travis-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ cd /tmp \

pecl install rdkafka

echo "extension = rdkafka.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini
echo "extension=rdkafka.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/php.ini

0 comments on commit b9eb356

Please sign in to comment.