diff --git a/src/Contracts/MqttClient.php b/src/Contracts/MqttClient.php index 8955a7a..e43ec47 100644 --- a/src/Contracts/MqttClient.php +++ b/src/Contracts/MqttClient.php @@ -122,7 +122,7 @@ public function interrupt(): void; * @throws MqttClientException * @throws ProtocolViolationException */ - public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): void; + public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): bool; /** * Runs an event loop iteration that handles messages from the server and calls the registered diff --git a/src/MqttClient.php b/src/MqttClient.php index c68ef2c..07c048e 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -77,8 +77,7 @@ public function __construct( string $protocol = self::MQTT_3_1, ?Repository $repository = null, ?LoggerInterface $logger = null - ) - { + ) { if (!in_array($protocol, [self::MQTT_3_1, self::MQTT_3_1_1])) { throw new ProtocolNotSupportedException($protocol); } @@ -160,8 +159,8 @@ protected function establishSocketConnection(): void $this->logger->debug('Using TLS for the connection to the broker.'); $shouldVerifyPeer = $this->settings->shouldTlsVerifyPeer() - || $this->settings->getTlsCertificateAuthorityFile() !== null - || $this->settings->getTlsCertificateAuthorityPath() !== null; + || $this->settings->getTlsCertificateAuthorityFile() !== null + || $this->settings->getTlsCertificateAuthorityPath() !== null; if (!$shouldVerifyPeer) { $this->logger->warning('Using TLS without peer verification is discouraged. Are you aware of the security risk?'); @@ -200,7 +199,7 @@ protected function establishSocketConnection(): void if ($this->settings->getTlsAlpn() !== null) { $tlsOptions['alpn_protocols'] = $this->settings->getTlsAlpn(); } - + $contextOptions['ssl'] = $tlsOptions; } @@ -525,8 +524,7 @@ protected function publishMessage( bool $retain, ?int $messageId = null, bool $isDuplicate = false - ): void - { + ): void { $this->logger->debug('Publishing a message on topic [{topic}]: {message}', [ 'topic' => $topic, 'message' => $message, @@ -597,8 +595,9 @@ protected function nextPingAt(): float /** * {@inheritDoc} + * @return bool True if loop was exited because queue was empty, False otherwise */ - public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): void + public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): bool { $this->logger->debug('Starting client loop to process incoming messages and the resend queue.'); @@ -616,16 +615,17 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, // In any case, there may not be any active subscriptions though. if ($exitWhenQueuesEmpty && $this->repository->countSubscriptions() === 0) { if ($this->allQueuesAreEmpty()) { - break; + return true; } // The time limit is reached. This most likely means the outgoing queues could not be emptied in time. // Probably the server did not respond with an acknowledgement. if ($queueWaitLimit !== null && (microtime(true) - $loopStartedAt) > $queueWaitLimit) { - break; + return false; } } } + return false; } /** @@ -877,7 +877,7 @@ protected function handleMessage(Message $message): void protected function allQueuesAreEmpty(): bool { return $this->repository->countPendingOutgoingMessages() === 0 && - $this->repository->countPendingIncomingMessages() === 0; + $this->repository->countPendingIncomingMessages() === 0; } /**