Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New release branch #31

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
/vendor
composer.lock
.phpstorm.meta.php
phpunit.xml
phpunit.xml
docker-compose.yml
74 changes: 0 additions & 74 deletions .styleci.yml
Original file line number Diff line number Diff line change
@@ -1,75 +1 @@
preset: psr2

enabled:
- alpha_ordered_imports
- binary_operator_spaces
- blank_line_after_opening_tag
- cast_spaces
- concat_with_spaces
- const_visibility_required
- declare_equal_normalize
- function_typehint_space
- hash_to_slash_comment
- heredoc_to_nowdoc
- include
- lowercase_cast
- method_separation
- native_function_casing
- new_with_braces
- no_blank_lines_after_class_opening
- no_blank_lines_after_phpdoc
- no_blank_lines_after_return
- no_blank_lines_after_throw
- no_blank_lines_between_imports
- no_blank_lines_between_traits
- no_empty_statement
- no_extra_consecutive_blank_lines
- no_leading_import_slash
- no_leading_namespace_whitespace
- no_multiline_whitespace_around_double_arrow
- no_short_bool_cast
- no_short_echo_tag
- no_singleline_whitespace_before_semicolons
- no_spaces_inside_offset
- no_spaces_outside_offset
- no_trailing_comma_in_list_call
- no_trailing_comma_in_singleline_array
- no_unneeded_control_parentheses
- no_unreachable_default_argument_value
- no_unused_imports
- no_useless_return
- no_whitespace_before_comma_in_array
- no_whitespace_in_blank_line
- normalize_index_brace
- object_operator_without_whitespace
- phpdoc_add_missing_param_annotation
- phpdoc_indent
- phpdoc_inline_tag
- phpdoc_link_to_see
- phpdoc_no_access
- phpdoc_no_empty_return
- phpdoc_no_package
- phpdoc_order
- phpdoc_property
- phpdoc_scalar
- phpdoc_separation
- phpdoc_single_line_var_spacing
- phpdoc_to_comment
- phpdoc_trim
- phpdoc_type_to_var
- phpdoc_types
- phpdoc_var_without_name
- print_to_echo
- self_accessor
- short_array_syntax
- single_blank_line_before_namespace
- single_quote
- space_after_semicolon
- standardize_not_equals
- ternary_operator_spaces
- trailing_comma_in_multiline_array
- trim_array_spaces
- unalign_double_arrow
- unalign_equals
- unary_operator_spaces
- whitespace_after_comma_in_array
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ language: php
sudo: required

php:
- 5.6
- 7.0
- 7.1
- 7.2
- 7.3
- 7.4
- 8.0
- nightly

install:
Expand Down
72 changes: 36 additions & 36 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
{
"name": "rapide/laravel-queue-kafka",
"description": "Kafka driver for Laravel Queue",
"license": "MIT",
"type": "library",
"version": "1.0",
"authors": [
{
"name": "Peter Mein",
"email": "[email protected]"
}
],
"require": {
"php": ">=5.6.4",
"illuminate/database": "5.4.* || 5.5.* || 5.6.* || 5.7.* || 5.8.* || 6.* || 7.*",
"illuminate/support": "5.4.* || 5.5.* || 5.6.* || 5.7.* || 5.8.* || 6.* || 7.*",
"illuminate/queue": "5.4.* || 5.5.* || 5.6.* || 5.7.* || 5.8.* || 6.* || 7.*",
"ext-rdkafka": "*"
},
"require-dev": {
"phpunit/phpunit": "^5.5",
"mockery/mockery": "^1.0"
},
"autoload": {
"psr-4": {
"Rapide\\LaravelQueueKafka\\": "src/"
}
},
"extra": {
"laravel": {
"providers": [
"Rapide\\LaravelQueueKafka\\LaravelQueueKafkaServiceProvider"
]
}
},
"minimum-stability": "dev",
"prefer-stable": true
"name": "rapide/laravel-queue-kafka",
"description": "Kafka driver for Laravel Queue",
"license": "MIT",
"type": "library",
"version": "1.0",
"authors": [
{
"name": "Peter Mein",
"email": "[email protected]"
}
],
"require": {
"php": ">=7.2|>=8.0",
"illuminate/database": "6.* || 7.* || 8.*",
"illuminate/support": "6.* || 7.* || 8.*",
"illuminate/queue": "6.* || 7.* || 8.*",
"ext-rdkafka": "*"
},
"require-dev": {
"phpunit/phpunit": "^5.5",
"mockery/mockery": "^1.0"
},
"autoload": {
"psr-4": {
"Rapide\\LaravelQueueKafka\\": "src/"
}
},
"extra": {
"laravel": {
"providers": [
"Rapide\\LaravelQueueKafka\\LaravelQueueKafkaServiceProvider"
]
}
},
"minimum-stability": "dev",
"prefer-stable": true
}
6 changes: 3 additions & 3 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/*
* Address of the Kafka broker
*/
'brokers' => env('KAFKA_BROKERS', 'localhost'),
'brokers' => env('KAFKA_BROKERS', '127.0.0.1:9092'),

/*
* Determine the number of seconds to sleep if there's an error communicating with kafka
Expand All @@ -48,12 +48,12 @@
'ssl_ca_location' => '',

/*
* SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms
* SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms
*/
'sasl_plain_username' => env('KAFKA_SASL_PLAIN_USERNAME'),

/*
* SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism
* SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism
*/
'sasl_plain_password' => env('KAFKA_SASL_PLAIN_PASSWORD'),
];
7 changes: 4 additions & 3 deletions src/LaravelQueueKafkaServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class LaravelQueueKafkaServiceProvider extends ServiceProvider
public function register()
{
$this->mergeConfigFrom(
__DIR__ . '/../config/kafka.php', 'queue.connections.kafka'
__DIR__ . '/../config/kafka.php',
'queue.connections.kafka'
);

$this->registerDependencies();
Expand Down Expand Up @@ -43,8 +44,8 @@ protected function registerDependencies()
return new \RdKafka\TopicConf();
});

$this->app->bind('queue.kafka.producer', function () {
return new \RdKafka\Producer();
$this->app->bind('queue.kafka.producer', function ($app, $parameters) {
return new \RdKafka\Producer($parameters['conf'] ?? null);
});

$this->app->bind('queue.kafka.conf', function () {
Expand Down
3 changes: 2 additions & 1 deletion src/LumenQueueKafkaServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public function boot()
parent::boot();

$this->mergeConfigFrom(
__DIR__ . '/../config/kafka.php', 'queue.connections.kafka'
__DIR__ . '/../config/kafka.php',
'queue.connections.kafka'
);
}
}
31 changes: 18 additions & 13 deletions src/Queue/Connectors/KafkaConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

use Illuminate\Container\Container;
use Illuminate\Queue\Connectors\ConnectorInterface;
use Illuminate\Support\Arr;
use Rapide\LaravelQueueKafka\Queue\KafkaQueue;
use RdKafka\Conf;
use RdKafka\KafkaConsumer;
use RdKafka\Consumer;
use RdKafka\Producer;
use RdKafka\TopicConf;

Expand Down Expand Up @@ -36,29 +37,33 @@ public function __construct(Container $container)
*/
public function connect(array $config)
{
/** @var Producer $producer */
$producer = $this->container->makeWith('queue.kafka.producer', []);
$producer->addBrokers($config['brokers']);

/** @var TopicConf $topicConf */
$topicConf = $this->container->makeWith('queue.kafka.topic_conf', []);
$topicConf->set('auto.offset.reset', 'largest');

/** @var Conf $conf */
$conf = $this->container->makeWith('queue.kafka.conf', []);
if (true === $config['sasl_enable']) {
if (true === ($config['sasl_enable'] ?? false)) {
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', $config['sasl_plain_username']);
$conf->set('sasl.password', $config['sasl_plain_password']);
$conf->set('ssl.ca.location', $config['ssl_ca_location']);
}
$conf->set('group.id', array_get($config, 'consumer_group_id', 'php-pubsub'));
$conf->set('group.id', Arr::get($config, 'consumer_group_id', 'php-pubsub'));
$conf->set('metadata.broker.list', $config['brokers']);
$conf->set('enable.auto.commit', 'false');
$conf->set('offset.store.method', 'broker');
$conf->setDefaultTopicConf($topicConf);
$conf->set('log_level', (string)LOG_DEBUG);
$conf->set('debug', 'all');

/** @var Producer $producer */
$producer = $this->container->makeWith('queue.kafka.producer', ['conf' => $conf]);
$producer->addBrokers($config['brokers']);

/** @var TopicConf $topicConf */
$topicConf = $this->container->makeWith('queue.kafka.topic_conf', []);
$topicConf->set('auto.offset.reset', 'largest');


// $conf->setDefaultTopicConf($topicConf);

/** @var KafkaConsumer $consumer */
/** @var Consumer $consumer */
$consumer = $this->container->makeWith('queue.kafka.consumer', ['conf' => $conf]);

return new KafkaQueue(
Expand Down
19 changes: 11 additions & 8 deletions src/Queue/Jobs/KafkaJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@
use Exception;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Database\DetectsDeadlocks;
use Illuminate\Database\DetectsConcurrencyErrors;
use Illuminate\Database\DetectsLostConnections;
use Illuminate\Queue\Jobs\Job;
use Illuminate\Queue\Jobs\JobName;
use Illuminate\Support\Str;
use Rapide\LaravelQueueKafka\Exceptions\QueueKafkaException;
use Rapide\LaravelQueueKafka\Queue\KafkaQueue;
use RdKafka\ConsumerTopic;
use RdKafka\Message;
use RdKafka\Topic;

class KafkaJob extends Job implements JobContract
{
use DetectsDeadlocks;
use DetectsConcurrencyErrors;
use DetectsLostConnections;

/**
* @var KafkaQueue
Expand Down Expand Up @@ -44,9 +46,9 @@ class KafkaJob extends Job implements JobContract
* @param Message $message
* @param string $connectionName
* @param string $queue
* @param ConsumerTopic $topic
* @param Topic $topic
*/
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue, ConsumerTopic $topic)
public function __construct(Container $container, KafkaQueue $connection, Message $message, $connectionName, $queue, Topic $topic)
{
$this->container = $container;
$this->connection = $connection;
Expand Down Expand Up @@ -90,7 +92,7 @@ public function fire()
*/
public function attempts()
{
return (int) ($this->payload()['attempts']) + 1;
return (int)($this->payload()['attempts']) + 1;
}

/**
Expand Down Expand Up @@ -174,17 +176,18 @@ public function setJobId($id)
*
* @param array $body
*
* @return mixed
* @throws Exception
*
* @return mixed
*/
private function unserialize(array $body)
{
try {
return unserialize($body['data']['command']);
} catch (Exception $exception) {
if (
$this->causedByDeadlock($exception)
$this->causedByConcurrencyError($exception)
|| $this->causedByLostConnection($exception)
|| Str::contains($exception->getMessage(), ['detected deadlock'])
) {
sleep($this->connection->getConfig()['sleep_on_deadlock']);
Expand Down
Loading