From 99f906edfab3c30862fc6226c3e1780e9b69a11a Mon Sep 17 00:00:00 2001 From: Ivan Koryukov Date: Fri, 10 Feb 2023 13:39:17 +0300 Subject: [PATCH 1/3] =?UTF-8?q?#102492=20=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2?= =?UTF-8?q?=D0=BB=D0=B5=D0=BD=20=D1=81=D0=BF=D0=B8=D1=81=D0=BE=D0=BA=20?= =?UTF-8?q?=D1=82=D0=BE=D0=BF=D0=B8=D0=BA=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * теперь в kafka.topics есть список топиков, и HighLevel(Consumer|Producer) должны использовать не имя топика, а ключ из этого списка * добавлен пакет kwn/php-rdkafka-stubs --- composer.json | 3 ++- config/kafka.php | 61 +++++++++++++++++++++++--------------------- src/KafkaFacade.php | 6 +++++ src/KafkaManager.php | 11 ++++++++ 4 files changed, 51 insertions(+), 30 deletions(-) diff --git a/composer.json b/composer.json index 4d17d36..8fe3483 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,8 @@ "orchestra/testbench": "^6.15", "phpunit/phpunit": "^9.3", "spatie/laravel-ray": "^1.9", - "vimeo/psalm": "^4.4" + "vimeo/psalm": "^4.4", + "kwn/php-rdkafka-stubs": "^2.2" }, "autoload": { "psr-4": { diff --git a/config/kafka.php b/config/kafka.php index 58ec6a2..4321f23 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -2,36 +2,39 @@ // configurattion options can be found here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md // if an option is set to null it is ignored. +$contour = env('KAFKA_CONTOUR', 'local'); return [ - 'consumers' => [ - 'default' => [ - 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), - 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), - 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), - 'sasl.username' => env('KAFKA_SASL_USERNAME'), - 'sasl.password' => env('KAFKA_SASL_PASSWORD'), - 'log_level' => env('KAFKA_DEBUG', false) ? (string) LOG_DEBUG : (string) LOG_INFO, - 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, + 'topics' => [ + // 'foobars' => $contour . '.domain.fact.foobars.1' + ], + 'consumers' => [ + 'default' => [ + 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), + 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), + 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), + 'sasl.username' => env('KAFKA_SASL_USERNAME'), + 'sasl.password' => env('KAFKA_SASL_PASSWORD'), + 'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO, + 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, - // consumer specific options - 'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')), - 'enable.auto.commit' => true, - 'auto.offset.reset' => 'beginning', - 'allow.auto.create.topics' => true, - ], - ], - 'producers' => [ - 'default' => [ - 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), - 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), - 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), - 'sasl.username' => env('KAFKA_SASL_USERNAME'), - 'sasl.password' => env('KAFKA_SASL_PASSWORD'), - 'log_level' => env('KAFKA_DEBUG', false) ? (string) LOG_DEBUG : (string) LOG_INFO, - 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, + // consumer specific options + 'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')), + 'enable.auto.commit' => true, + 'auto.offset.reset' => 'beginning', + ], + ], + 'producers' => [ + 'default' => [ + 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), + 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), + 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), + 'sasl.username' => env('KAFKA_SASL_USERNAME'), + 'sasl.password' => env('KAFKA_SASL_PASSWORD'), + 'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO, + 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, - // producer specific options - 'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'), - ], - ], + // producer specific options + 'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'), + ], + ], ]; diff --git a/src/KafkaFacade.php b/src/KafkaFacade.php index d8a3882..f29673e 100644 --- a/src/KafkaFacade.php +++ b/src/KafkaFacade.php @@ -5,6 +5,12 @@ use Illuminate\Support\Facades\Facade; /** + * @method static string|null topicName(string $topicKey) + * @method static \RdKafka\Conf consumerConfig(string $name) + * @method static \RdKafka\Conf producerConfig(string $name) + * @method static \RdKafka\KafkaConsumer consumer(string $name) + * @method static \RdKafka\Producer producer(string $name) + * * @see \Ensi\LaravelPhpRdKafka\KafkaManager */ class KafkaFacade extends Facade diff --git a/src/KafkaManager.php b/src/KafkaManager.php index fa3f5a8..5892feb 100644 --- a/src/KafkaManager.php +++ b/src/KafkaManager.php @@ -118,6 +118,17 @@ public function producer(string $name = 'default'): Producer } + public function topicName(string $topicKey): ?string + { + $topicList = $this->app['config']["kafka.topics"]; + if (!isset($topicList[$topicKey])) { + throw new InvalidArgumentException("Topic with key '{$topicKey}' is not registered in kafka.topics"); + } + + return $topicList[$topicKey]; + } + + protected function makeConfig(string $name, string $type): Conf { $availableValues = $this->app['config']["kafka.{$type}s"]; From 5d04cfb35276a3c76174765e17a0e3fb66af7bbc Mon Sep 17 00:00:00 2001 From: Ivan Koryukov Date: Fri, 10 Feb 2023 15:31:47 +0300 Subject: [PATCH 2/3] =?UTF-8?q?#102492=20=D0=B8=D0=B7=D0=BC=D0=B5=D0=BD?= =?UTF-8?q?=D0=B5=D0=BD=D0=B0=20=D1=81=D1=82=D1=80=D1=83=D0=BA=D1=82=D1=83?= =?UTF-8?q?=D1=80=D0=B0=20=D0=BA=D0=BE=D0=BD=D1=84=D0=B8=D0=B3=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ломающее изменение: структура конфига kafka сильно изменилась * рефакторинг KafkaManager --- config/kafka.php | 49 ++++---- src/KafkaFacade.php | 7 +- src/KafkaManager.php | 279 ++++++++++++++++++++----------------------- 3 files changed, 159 insertions(+), 176 deletions(-) diff --git a/config/kafka.php b/config/kafka.php index 4321f23..de47ea8 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -4,37 +4,38 @@ // if an option is set to null it is ignored. $contour = env('KAFKA_CONTOUR', 'local'); return [ - 'topics' => [ - // 'foobars' => $contour . '.domain.fact.foobars.1' + 'connections' => [ + 'default' => [ + 'settings' => [ + 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), + 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), + 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), + 'sasl.username' => env('KAFKA_SASL_USERNAME'), + 'sasl.password' => env('KAFKA_SASL_PASSWORD'), + 'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO, + 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, + ], + 'topics' => [ + // 'foobars' => $contour . '.domain.fact.foobars.1' + ] + ] ], 'consumers' => [ 'default' => [ - 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), - 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), - 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), - 'sasl.username' => env('KAFKA_SASL_USERNAME'), - 'sasl.password' => env('KAFKA_SASL_PASSWORD'), - 'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO, - 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, - - // consumer specific options - 'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')), - 'enable.auto.commit' => true, - 'auto.offset.reset' => 'beginning', + 'connection' => 'default', + 'additional-settings' => [ + 'group.id' => env('KAFKA_CONSUMER_GROUP_ID', env('APP_NAME')), + 'enable.auto.commit' => true, + 'auto.offset.reset' => 'beginning', + ], ], ], 'producers' => [ 'default' => [ - 'metadata.broker.list' => env('KAFKA_BROKER_LIST'), - 'security.protocol' => env('KAFKA_SECURITY_PROTOCOL', 'plaintext'), - 'sasl.mechanisms' => env('KAFKA_SASL_MECHANISMS'), - 'sasl.username' => env('KAFKA_SASL_USERNAME'), - 'sasl.password' => env('KAFKA_SASL_PASSWORD'), - 'log_level' => env('KAFKA_DEBUG', false) ? (string)LOG_DEBUG : (string)LOG_INFO, - 'debug' => env('KAFKA_DEBUG', false) ? 'all' : null, - - // producer specific options - 'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'), + 'connection' => 'default', + 'additional-settings' => [ + 'compression.codec' => env('KAFKA_PRODUCER_COMPRESSION_CODEC', 'snappy'), + ], ], ], ]; diff --git a/src/KafkaFacade.php b/src/KafkaFacade.php index f29673e..5086f5d 100644 --- a/src/KafkaFacade.php +++ b/src/KafkaFacade.php @@ -5,9 +5,10 @@ use Illuminate\Support\Facades\Facade; /** - * @method static string|null topicName(string $topicKey) - * @method static \RdKafka\Conf consumerConfig(string $name) - * @method static \RdKafka\Conf producerConfig(string $name) + * @method static string[] availableConnections() + * @method static string[] allTopics(string $connection) + * @method static string topicName(string $connection, string $topicKey) + * @method static string topicNameByClient(string $clientType, string $clientName, string $topicKey) * @method static \RdKafka\KafkaConsumer consumer(string $name) * @method static \RdKafka\Producer producer(string $name) * diff --git a/src/KafkaManager.php b/src/KafkaManager.php index 5892feb..c716911 100644 --- a/src/KafkaManager.php +++ b/src/KafkaManager.php @@ -2,166 +2,147 @@ namespace Ensi\LaravelPhpRdKafka; -use Illuminate\Support\Arr; use InvalidArgumentException; -use Illuminate\Contracts\Foundation\Application; use RdKafka\Conf; -use RdKafka\Producer; use RdKafka\KafkaConsumer; +use RdKafka\Producer; class KafkaManager { - /** - * The application instance. - * - * @var \Illuminate\Contracts\Foundation\Application - */ - protected $app; - - /** - * Consumers configs. - * - * @var array - */ - protected $consumersConfigs = []; - - /** - * Producers configs. - * - * @var array - */ - protected $producersConfigs = []; - - /** - * Consumers. - * - * @var array - */ - protected $consumers = []; - - /** - * Producers. - * - * @var array - */ - protected $producers = []; - - /** - * Create a new kafka manager instance. - * - * @param Application $app - * @return void - */ - public function __construct(Application $app) - { - $this->app = $app; - } - - /** - * Get a consumer config instance. - * - * @param string $name - * @return Conf - */ - public function consumerConfig(string $name = 'default'): Conf - { - if (!isset($this->consimersConfigs[$name])) { - $this->consimersConfigs[$name] = $this->makeConfig($name, 'consumer'); - } - - return $this->consimersConfigs[$name]; - } - - /** - * Get a producer config instance. - * - * @param string $name - * @return Conf - */ - public function producerConfig(string $name = 'default'): Conf - { - if (!isset($this->producersConfigs[$name])) { - $this->producersConfigs[$name] = $this->makeConfig($name, 'producer'); - } - - return $this->producersConfigs[$name]; - } - - /** - * Get a consumer instance. - * - * @param string $name - * @return KafkaConsumer - */ - public function consumer(string $name = 'default'): KafkaConsumer - { - if (!isset($this->consumers[$name])) { - $this->consumers[$name] = new KafkaConsumer($this->consumerConfig($name)); - } - - return $this->consumers[$name]; - } - - /** - * Get a producer instance. - * - * @param string $name - * @return Producer - */ - public function producer(string $name = 'default'): Producer - { - if (!isset($this->producers[$name])) { - $this->producers[$name] = new Producer($this->producerConfig($name)); - } - - return $this->producers[$name]; - } - - - public function topicName(string $topicKey): ?string + /** @var array */ + protected $consumers = []; + + /** @var array */ + protected $producers = []; + + protected $config = []; + + public function __construct() + { + $this->config = config('kafka'); + } + + /** + * Get a consumer instance. + * + * @param string $name + * @return KafkaConsumer + */ + public function consumer(string $name = 'default'): KafkaConsumer { - $topicList = $this->app['config']["kafka.topics"]; - if (!isset($topicList[$topicKey])) { + if (!isset($this->consumers[$name])) { + $this->consumers[$name] = new KafkaConsumer($this->makeKafkaConf($name, 'consumer')); + } + + return $this->consumers[$name]; + } + + /** + * Get a producer instance. + * + * @param string $name + * @return Producer + */ + public function producer(string $name = 'default'): Producer + { + if (!isset($this->producers[$name])) { + $this->producers[$name] = new Producer($this->makeKafkaConf($name, 'producer')); + } + + return $this->producers[$name]; + } + + public function availableConnections(): array + { + return array_keys($this->config['connections']); + } + + public function allTopics(string $connection): array + { + $connectionConfig = $this->rawConnectionConfig($connection); + + return $connectionConfig['topics']; + } + + public function topicName(string $connection, string $topicKey): string + { + $connectionConfig = $this->rawConnectionConfig($connection); + + if (!isset($connectionConfig['topics'][$topicKey])) { throw new InvalidArgumentException("Topic with key '{$topicKey}' is not registered in kafka.topics"); } - return $topicList[$topicKey]; + return $connectionConfig['topics'][$topicKey]; } + public function topicNameByClient(string $clientType, string $clientName, string $topicKey): string + { + $clientConfig = $this->rawClientConfig($clientType, $clientName); - protected function makeConfig(string $name, string $type): Conf - { - $availableValues = $this->app['config']["kafka.{$type}s"]; - - if (is_null($configValues = Arr::get($availableValues, $name))) { - throw new InvalidArgumentException("$type config [kafka.{$type}s.{$name}] not found."); - } - - $config = new Conf(); - foreach ($this->cleanupConfigValues($configValues) as $key => $value) { - $config->set($key, $value); - } - - return $config; - } - - protected function cleanupConfigValues(array $configValues) - { - foreach ($configValues as $key => $value) { - if ($value === null) { - unset($configValues[$key]); - } - } - - $booleanToStrings = [ - 'enable.auto.commit', - ]; - foreach ($booleanToStrings as $key) { - if (isset($configValues[$key])) { - $configValues[$key] = Helpers::stringifyBoolean($configValues[$key]); - } - } - - return $configValues; - } + return $this->topicName($clientConfig['connection'], $topicKey); + } + + protected function makeKafkaConf(string $clientName, string $clientType): Conf + { + $rawConnectionSettings = $this->rawKafkaSettings($clientName, $clientType); + $cleanedSettings = $this->cleanupKafkaSettings($rawConnectionSettings); + + $config = new Conf(); + foreach ($cleanedSettings as $key => $value) { + $config->set($key, $value); + } + + return $config; + } + + /** + * Get raw kafka settings array, from merging of client additional-settings and connection settings + * @param string $clientName client name (key in kafka.consumers or kafka.producer) + * @param string $clientType 'consumer' or 'producer' + * @return array + */ + protected function rawKafkaSettings(string $clientName, string $clientType): array + { + $clientConfig = $this->rawClientConfig($clientType, $clientName); + $connectionConfig = $this->rawConnectionConfig($clientConfig['connection']); + + return array_merge($connectionConfig['settings'], $clientConfig['additional-settings']); + } + + protected function rawConnectionConfig(string $connectionName): array + { + if (!isset($this->config['connections'][$connectionName])) { + throw new InvalidArgumentException("connection config [kafka.connections.{$connectionName}] not found."); + } + + return $this->config['connections'][$connectionName]; + } + + protected function rawClientConfig(string $clientType, string $clientName): array + { + $haystack = $clientType . 's'; + if (!isset($this->config[$haystack][$clientName])) { + throw new InvalidArgumentException("$clientType config [kafka.{$clientType}s.{$clientName}] not found."); + } + + return $this->config[$haystack][$clientName]; + } + + /** + * Remove nulls and turn booleans to strings + * @param array $configValues + * @return array + */ + protected function cleanupKafkaSettings(array $configValues): array + { + array_walk($configValues, function ($value, $key) use (&$result) { + if (is_null($value)) { + return; + } + + $result[$key] = Helpers::stringifyBoolean($value); + }); + + return $result; + } } From d9902f5cd2a00f465b3b146c836f0fcb39504e6e Mon Sep 17 00:00:00 2001 From: Ivan Koryukov Date: Fri, 10 Feb 2023 17:58:59 +0300 Subject: [PATCH 3/3] =?UTF-8?q?#102492=20=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2?= =?UTF-8?q?=D0=BB=D0=B5=D0=BD=D0=B0=20=D0=BA=D0=BE=D0=BC=D0=B0=D0=BD=D0=B4?= =?UTF-8?q?=D0=B0=20kafka:find-not-created-topics?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Добавлена консольная команда, которая сверяет список существующих в кафке топиков с теми что указаны в kafka.connections[].topics --- src/Commands/CheckTopicsExistsCommand.php | 66 +++++++++++++++++++++++ src/KafkaFacade.php | 1 + src/KafkaManager.php | 25 +++++++-- src/LaravelPhpRdKafkaServiceProvider.php | 8 +++ 4 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 src/Commands/CheckTopicsExistsCommand.php diff --git a/src/Commands/CheckTopicsExistsCommand.php b/src/Commands/CheckTopicsExistsCommand.php new file mode 100644 index 0000000..a8c5262 --- /dev/null +++ b/src/Commands/CheckTopicsExistsCommand.php @@ -0,0 +1,66 @@ +getExistingTopics($connectionName); + + $desiredTopics = KafkaFacade::allTopics($connectionName); + $totalDesiredTopics += count($desiredTopics); + + foreach ($desiredTopics as $topicName) { + if (!in_array($topicName, $existingTopics)) { + $notFoundTopics[] = $topicName; + } + } + } + + $notFoundTopics = count($notFoundTopics); + if ($notFoundTopics) { + $this->output->writeln(join("\n", $notFoundTopics)); + } + + if ($this->option('validate')) { + if ($notFoundTopics) { + $this->output->writeln("\nThere are {$notFoundTopics} not created topics"); + + return self::FAILURE; + } else { + $this->output->writeln("All {$totalDesiredTopics} desired topics exists"); + + return self::SUCCESS; + } + } + + return self::SUCCESS; + } + + private function getExistingTopics(string $connectionName): array + { + $rdKafka = KafkaFacade::rdKafka($connectionName); + $metadata = $rdKafka->getMetadata(true, null, 2000); + + $existingTopics = []; + foreach ($metadata->getTopics() as $topicMeta) { + $existingTopics[] = $topicMeta->getTopic(); + } + + return $existingTopics; + } +} \ No newline at end of file diff --git a/src/KafkaFacade.php b/src/KafkaFacade.php index 5086f5d..1cc4720 100644 --- a/src/KafkaFacade.php +++ b/src/KafkaFacade.php @@ -11,6 +11,7 @@ * @method static string topicNameByClient(string $clientType, string $clientName, string $topicKey) * @method static \RdKafka\KafkaConsumer consumer(string $name) * @method static \RdKafka\Producer producer(string $name) + * @method static \RdKafka\Producer rdKafka(string $connection) * * @see \Ensi\LaravelPhpRdKafka\KafkaManager */ diff --git a/src/KafkaManager.php b/src/KafkaManager.php index c716911..8cb16c3 100644 --- a/src/KafkaManager.php +++ b/src/KafkaManager.php @@ -3,6 +3,7 @@ namespace Ensi\LaravelPhpRdKafka; use InvalidArgumentException; +use RdKafka; use RdKafka\Conf; use RdKafka\KafkaConsumer; use RdKafka\Producer; @@ -31,7 +32,11 @@ public function __construct() public function consumer(string $name = 'default'): KafkaConsumer { if (!isset($this->consumers[$name])) { - $this->consumers[$name] = new KafkaConsumer($this->makeKafkaConf($name, 'consumer')); + $this->consumers[$name] = new KafkaConsumer( + $this->makeKafkaConf( + $this->rawKafkaSettings($name, 'consumer') + ) + ); } return $this->consumers[$name]; @@ -46,12 +51,25 @@ public function consumer(string $name = 'default'): KafkaConsumer public function producer(string $name = 'default'): Producer { if (!isset($this->producers[$name])) { - $this->producers[$name] = new Producer($this->makeKafkaConf($name, 'producer')); + $this->producers[$name] = new Producer( + $this->makeKafkaConf( + $this->rawKafkaSettings($name, 'producer') + ) + ); } return $this->producers[$name]; } + public function rdKafka(string $connectionName): RdKafka + { + return new Producer( + $this->makeKafkaConf( + $this->rawConnectionConfig($connectionName)['settings'] + ) + ); + } + public function availableConnections(): array { return array_keys($this->config['connections']); @@ -82,9 +100,8 @@ public function topicNameByClient(string $clientType, string $clientName, string return $this->topicName($clientConfig['connection'], $topicKey); } - protected function makeKafkaConf(string $clientName, string $clientType): Conf + protected function makeKafkaConf(array $rawConnectionSettings): Conf { - $rawConnectionSettings = $this->rawKafkaSettings($clientName, $clientType); $cleanedSettings = $this->cleanupKafkaSettings($rawConnectionSettings); $config = new Conf(); diff --git a/src/LaravelPhpRdKafkaServiceProvider.php b/src/LaravelPhpRdKafkaServiceProvider.php index 63f0cfd..cb2d033 100644 --- a/src/LaravelPhpRdKafkaServiceProvider.php +++ b/src/LaravelPhpRdKafkaServiceProvider.php @@ -2,6 +2,10 @@ namespace Ensi\LaravelPhpRdKafka; +use Ensi\LaravelPhpRdKafka\Commands\CheckTopicsExistsCommand; +use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaCheckOffsetsCommand; +use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand; +use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaSetOffset; use Illuminate\Support\ServiceProvider; class LaravelPhpRdKafkaServiceProvider extends ServiceProvider @@ -26,6 +30,10 @@ public function boot() $this->publishes([ $this->packageBasePath("/../config/kafka.php") => config_path("kafka.php"), ], "kafka-config"); + + $this->commands([ + CheckTopicsExistsCommand::class, + ]); } }