Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trunk enhancements branch #100

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"php-amqplib/php-amqplib": "^3.1.0",
"yiisoft/factory": "^1.0",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/queue": "dev-master"
"yiisoft/queue": "dev-new"
},
"require-dev": {
"maglnet/composer-require-checker": "^4.4",
Expand Down
61 changes: 31 additions & 30 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
Expand All @@ -23,64 +25,63 @@ public function __construct(

public function withChannel(string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
$new = clone $this;
$new->queueProvider = $this->queueProvider->withChannelName($channel);

return $instance;
return $new;
}

/**
* @param callable(MessageInterface): bool $handlerCallback
* @param callable(MessageInterface): bool $handlerCallback
*/
public function runExisting(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
(new ExistingMessagesConsumer($channel, $this->queueProvider
->getQueueSettings()
->getName(), $this->serializer))
->consume($handlerCallback);
$queueName = $this->queueProvider->getQueueSettings()->getName();
$consumer = new ExistingMessagesConsumer(
$channel,
$queueName,
$this->serializer
);

$consumer->consume($handlerCallback);
}

/**
* @return never
*/
public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
throw new NotImplementedException(sprintf('Status check is not supported by the adapter %s.', self::class));
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
);
$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
$channel = $this->queueProvider->getChannel();
$channel->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
$queueName = $this->queueProvider->getQueueSettings()->getName();

$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
->getName(),
$this->queueProvider
->getQueueSettings()
->getName(),
$queueName,
$queueName,
false,
false,
false,
Expand Down
54 changes: 0 additions & 54 deletions src/Exception/NoKeyInPayloadException.php

This file was deleted.

1 change: 1 addition & 0 deletions src/ExistingMessagesConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

/**
* @internal
Expand Down
64 changes: 0 additions & 64 deletions src/MessageSerializer.php

This file was deleted.

14 changes: 0 additions & 14 deletions src/MessageSerializerInterface.php

This file was deleted.

58 changes: 36 additions & 22 deletions src/Middleware/DelayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,32 @@
use InvalidArgumentException;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface;
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Middleware\DelayMiddlewareInterface;
use Yiisoft\Queue\Middleware\MessageHandlerInterface;
use Yiisoft\Queue\Middleware\MiddlewareInterface;
use Yiisoft\Queue\Middleware\Request;

final class DelayMiddleware implements DelayMiddlewareInterface
final class DelayMiddleware implements MiddlewareInterface, DelayMiddlewareInterface
{
public function __construct(private float $delayInSeconds, private bool $forcePersistentMessages = true)
{
public function __construct(
private AdapterInterface $adapter,
private float $delayInSeconds,
private bool $forcePersistentMessages = true
) {
if (!$adapter instanceof Adapter) {
throw new InvalidArgumentException(
sprintf(
'This middleware works only with the %s. %s given.',
Adapter::class,
get_debug_type($adapter)
)
);
}
}

/**
Expand All @@ -39,28 +53,28 @@
return $this->delayInSeconds;
}

public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
public function process(Request $request, MessageHandlerInterface $handler): Request
{
$adapter = $request->getAdapter();
if (!$adapter instanceof Adapter) {
$type = get_debug_type($adapter);
$class = Adapter::class;
throw new InvalidArgumentException(
"This middleware works only with the $class. $type given."
);
}
$queueProvider = $this->adapter->getQueueProvider();

Check failure on line 58 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:58:9: MixedAssignment: Unable to determine the type that $queueProvider is being assigned to (see https://psalm.dev/032)

Check failure on line 58 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

UndefinedInterfaceMethod

src/Middleware/DelayMiddleware.php:58:42: UndefinedInterfaceMethod: Method Yiisoft\Queue\Adapter\AdapterInterface::getQueueProvider does not exist (see https://psalm.dev/181)

Check failure on line 58 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:58:9: MixedAssignment: Unable to determine the type that $queueProvider is being assigned to (see https://psalm.dev/032)

Check failure on line 58 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

UndefinedInterfaceMethod

src/Middleware/DelayMiddleware.php:58:42: UndefinedInterfaceMethod: Method Yiisoft\Queue\Adapter\AdapterInterface::getQueueProvider does not exist (see https://psalm.dev/181)

Check failure on line 58 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:58:9: MixedAssignment: Unable to determine the type that $queueProvider is being assigned to (see https://psalm.dev/032)

Check failure on line 58 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

UndefinedInterfaceMethod

src/Middleware/DelayMiddleware.php:58:42: UndefinedInterfaceMethod: Method Yiisoft\Queue\Adapter\AdapterInterface::getQueueProvider does not exist (see https://psalm.dev/181)
$originalExchangeSettings = $queueProvider->getExchangeSettings();

Check failure on line 59 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:59:9: MixedAssignment: Unable to determine the type that $originalExchangeSettings is being assigned to (see https://psalm.dev/032)

Check failure on line 59 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:59:53: MixedMethodCall: Cannot determine the type of $queueProvider when calling method getExchangeSettings (see https://psalm.dev/015)

Check failure on line 59 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:59:9: MixedAssignment: Unable to determine the type that $originalExchangeSettings is being assigned to (see https://psalm.dev/032)

Check failure on line 59 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:59:53: MixedMethodCall: Cannot determine the type of $queueProvider when calling method getExchangeSettings (see https://psalm.dev/015)

Check failure on line 59 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:59:9: MixedAssignment: Unable to determine the type that $originalExchangeSettings is being assigned to (see https://psalm.dev/032)

Check failure on line 59 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:59:53: MixedMethodCall: Cannot determine the type of $queueProvider when calling method getExchangeSettings (see https://psalm.dev/015)
$delayedExchangeSettings = $this->getExchangeSettings($originalExchangeSettings);

Check failure on line 60 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedArgument

src/Middleware/DelayMiddleware.php:60:63: MixedArgument: Argument 1 of Yiisoft\Queue\AMQP\Middleware\DelayMiddleware::getExchangeSettings cannot be mixed, expecting Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface|null (see https://psalm.dev/030)

Check failure on line 60 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedArgument

src/Middleware/DelayMiddleware.php:60:63: MixedArgument: Argument 1 of Yiisoft\Queue\AMQP\Middleware\DelayMiddleware::getExchangeSettings cannot be mixed, expecting Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface|null (see https://psalm.dev/030)

Check failure on line 60 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedArgument

src/Middleware/DelayMiddleware.php:60:63: MixedArgument: Argument 1 of Yiisoft\Queue\AMQP\Middleware\DelayMiddleware::getExchangeSettings cannot be mixed, expecting Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface|null (see https://psalm.dev/030)
$queueSettings = $this->getQueueSettings(
$queueProvider->getQueueSettings(),

Check failure on line 62 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedArgument

src/Middleware/DelayMiddleware.php:62:13: MixedArgument: Argument 1 of Yiisoft\Queue\AMQP\Middleware\DelayMiddleware::getQueueSettings cannot be mixed, expecting Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface (see https://psalm.dev/030)

Check failure on line 62 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:62:29: MixedMethodCall: Cannot determine the type of $queueProvider when calling method getQueueSettings (see https://psalm.dev/015)

Check failure on line 62 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedArgument

src/Middleware/DelayMiddleware.php:62:13: MixedArgument: Argument 1 of Yiisoft\Queue\AMQP\Middleware\DelayMiddleware::getQueueSettings cannot be mixed, expecting Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface (see https://psalm.dev/030)

Check failure on line 62 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:62:29: MixedMethodCall: Cannot determine the type of $queueProvider when calling method getQueueSettings (see https://psalm.dev/015)

Check failure on line 62 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedArgument

src/Middleware/DelayMiddleware.php:62:13: MixedArgument: Argument 1 of Yiisoft\Queue\AMQP\Middleware\DelayMiddleware::getQueueSettings cannot be mixed, expecting Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface (see https://psalm.dev/030)

Check failure on line 62 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:62:29: MixedMethodCall: Cannot determine the type of $queueProvider when calling method getQueueSettings (see https://psalm.dev/015)
$originalExchangeSettings
);

$queueProvider = $adapter->getQueueProvider();
$exchangeSettings = $this->getExchangeSettings($queueProvider->getExchangeSettings());
$queueSettings = $this->getQueueSettings($queueProvider->getQueueSettings(), $queueProvider->getExchangeSettings());
$adapter = $adapter->withQueueProvider(
$adapter = $this->adapter->withQueueProvider(

Check failure on line 66 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:66:9: MixedAssignment: Unable to determine the type that $adapter is being assigned to (see https://psalm.dev/032)

Check failure on line 66 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

UndefinedInterfaceMethod

src/Middleware/DelayMiddleware.php:66:36: UndefinedInterfaceMethod: Method Yiisoft\Queue\Adapter\AdapterInterface::withQueueProvider does not exist (see https://psalm.dev/181)

Check failure on line 66 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:66:9: MixedAssignment: Unable to determine the type that $adapter is being assigned to (see https://psalm.dev/032)

Check failure on line 66 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

UndefinedInterfaceMethod

src/Middleware/DelayMiddleware.php:66:36: UndefinedInterfaceMethod: Method Yiisoft\Queue\Adapter\AdapterInterface::withQueueProvider does not exist (see https://psalm.dev/181)

Check failure on line 66 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedAssignment

src/Middleware/DelayMiddleware.php:66:9: MixedAssignment: Unable to determine the type that $adapter is being assigned to (see https://psalm.dev/032)

Check failure on line 66 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

UndefinedInterfaceMethod

src/Middleware/DelayMiddleware.php:66:36: UndefinedInterfaceMethod: Method Yiisoft\Queue\Adapter\AdapterInterface::withQueueProvider does not exist (see https://psalm.dev/181)
$queueProvider
->withMessageProperties($this->getMessageProperties($queueProvider))

Check failure on line 68 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:68:19: MixedMethodCall: Cannot determine the type of $queueProvider when calling method withMessageProperties (see https://psalm.dev/015)

Check failure on line 68 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:68:19: MixedMethodCall: Cannot determine the type of $queueProvider when calling method withMessageProperties (see https://psalm.dev/015)

Check failure on line 68 in src/Middleware/DelayMiddleware.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MixedMethodCall

src/Middleware/DelayMiddleware.php:68:19: MixedMethodCall: Cannot determine the type of $queueProvider when calling method withMessageProperties (see https://psalm.dev/015)
->withExchangeSettings($exchangeSettings)
->withExchangeSettings($delayedExchangeSettings)
->withQueueSettings($queueSettings)
);

return $handler->handlePush($request->withAdapter($adapter));
return $handler->handle(
$request->withQueue(
$request->getQueue()->withAdapter($adapter)
)
);
}

/**
Expand Down Expand Up @@ -104,7 +118,7 @@
/** @noinspection NullPointerExceptionInspection */
return $exchangeSettings
?->withName("{$exchangeSettings->getName()}.dlx")
->withAutoDelete(true)
->withAutoDelete(false)
->withType(AMQPExchangeType::TOPIC);
}
}
1 change: 1 addition & 0 deletions src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function __construct(
public function __destruct()
{
$this->channel?->close();
//unset($this->channel);
}

public function getChannel(): AMQPChannel
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Exchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private string $type = AMQPExchangeType::DIRECT,
private bool $passive = false,
private bool $durable = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $internal = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private bool $passive = false,
private bool $durable = false,
private bool $exclusive = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
private ?int $ticket = null
Expand Down
Loading
Loading