Skip to content

Commit

Permalink
Merge pull request #2 from ensi-platform/task-102492
Browse files Browse the repository at this point in the history
#102492 команда проверки что все нужные топики созданы
  • Loading branch information
MadridianFox authored Feb 22, 2023
2 parents 08dc7fb + d9902f5 commit 694a281
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 177 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"orchestra/testbench": "^6.15",
"phpunit/phpunit": "^9.3",
"spatie/laravel-ray": "^1.9",
"vimeo/psalm": "^4.4"
"vimeo/psalm": "^4.4",
"kwn/php-rdkafka-stubs": "^2.2"
},
"autoload": {
"psr-4": {
Expand Down
66 changes: 35 additions & 31 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,40 @@

// configurattion options can be found here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
// if an option is set to null it is ignored.
$contour = env('KAFKA_CONTOUR', 'local');
return [
'consumers' => [
'default' => [
'metadata.broker.list' => env('KAFKA_BROKER_LIST'),
'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'),
'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'),
'sasl.username' => env('KAFKA_SASL_USERNAME'),
'sasl.password' => env('KAFKA_SASL_PASSWORD'),
'log_level' => env('KAFKA_DEBUG', false) ? (string) LOG_DEBUG : (string) LOG_INFO,
'debug' => env('KAFKA_DEBUG', false) ? 'all' : null,

// consumer specific options
'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')),
'enable.auto.commit' => true,
'auto.offset.reset' => 'beginning',
'allow.auto.create.topics' => true,
],
],
'producers' => [
'default' => [
'metadata.broker.list' => env('KAFKA_BROKER_LIST'),
'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'),
'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'),
'sasl.username' => env('KAFKA_SASL_USERNAME'),
'sasl.password' => env('KAFKA_SASL_PASSWORD'),
'log_level' => env('KAFKA_DEBUG', false) ? (string) LOG_DEBUG : (string) LOG_INFO,
'debug' => env('KAFKA_DEBUG', false) ? 'all' : null,

// producer specific options
'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'),
],
],
'connections' => [
'default' => [
'settings' => [
'metadata.broker.list' => env('KAFKA_BROKER_LIST'),
'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'),
'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'),
'sasl.username' => env('KAFKA_SASL_USERNAME'),
'sasl.password' => env('KAFKA_SASL_PASSWORD'),
'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO,
'debug' => env('KAFKA_DEBUG', false) ? 'all' : null,
],
'topics' => [
// 'foobars' => $contour . '.domain.fact.foobars.1'
]
]
],
'consumers' => [
'default' => [
'connection' => 'default',
'additional-settings' => [
'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')),
'enable.auto.commit' => true,
'auto.offset.reset' => 'beginning',
],
],
],
'producers' => [
'default' => [
'connection' => 'default',
'additional-settings' => [
'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'),
],
],
],
];
66 changes: 66 additions & 0 deletions src/Commands/CheckTopicsExistsCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

namespace Ensi\LaravelPhpRdKafka\Commands;

use Ensi\LaravelPhpRdKafka\KafkaFacade;
use Illuminate\Console\Command;

class CheckTopicsExistsCommand extends Command
{
protected $signature = 'kafka:find-not-created-topics
{--validate : вернуть ошибку если есть не созданные топики}';
protected $description = 'Проверить что все топики из kafka.topics существуют';

public function handle(): int
{
$totalDesiredTopics = 0;
$notFoundTopics = [];

$connectionNames = KafkaFacade::availableConnections();

foreach ($connectionNames as $connectionName) {
$existingTopics = $this->getExistingTopics($connectionName);

$desiredTopics = KafkaFacade::allTopics($connectionName);
$totalDesiredTopics += count($desiredTopics);

foreach ($desiredTopics as $topicName) {
if (!in_array($topicName, $existingTopics)) {
$notFoundTopics[] = $topicName;
}
}
}

$notFoundTopics = count($notFoundTopics);
if ($notFoundTopics) {
$this->output->writeln(join("\n", $notFoundTopics));
}

if ($this->option('validate')) {
if ($notFoundTopics) {
$this->output->writeln("\nThere are {$notFoundTopics} not created topics");

return self::FAILURE;
} else {
$this->output->writeln("All {$totalDesiredTopics} desired topics exists");

return self::SUCCESS;
}
}

return self::SUCCESS;
}

private function getExistingTopics(string $connectionName): array
{
$rdKafka = KafkaFacade::rdKafka($connectionName);
$metadata = $rdKafka->getMetadata(true, null, 2000);

$existingTopics = [];
foreach ($metadata->getTopics() as $topicMeta) {
$existingTopics[] = $topicMeta->getTopic();
}

return $existingTopics;
}
}
8 changes: 8 additions & 0 deletions src/KafkaFacade.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
use Illuminate\Support\Facades\Facade;

/**
* @method static string[] availableConnections()
* @method static string[] allTopics(string $connection)
* @method static string topicName(string $connection, string $topicKey)
* @method static string topicNameByClient(string $clientType, string $clientName, string $topicKey)
* @method static \RdKafka\KafkaConsumer consumer(string $name)
* @method static \RdKafka\Producer producer(string $name)
* @method static \RdKafka\Producer rdKafka(string $connection)
*
* @see \Ensi\LaravelPhpRdKafka\KafkaManager
*/
class KafkaFacade extends Facade
Expand Down
Loading

0 comments on commit 694a281

Please sign in to comment.