Skip to content
This repository has been archived by the owner on Jan 27, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2 from Zenith-Kim-Light/master
Browse files Browse the repository at this point in the history
обновил до состояния основной ветки
  • Loading branch information
mshumakov authored Jun 10, 2020
2 parents 3fdda3b + 5de3820 commit 5e4aea5
Show file tree
Hide file tree
Showing 13 changed files with 319 additions and 39 deletions.
3 changes: 3 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ public function getConfigTreeBuilder(): TreeBuilder
->scalarNode('dsn')->isRequired()->end()
->scalarNode('client_id')->isRequired()->end()
->scalarNode('cluster_id')->isRequired()->end()
->scalarNode('user')->defaultValue(null)->end()
->scalarNode('pass')->defaultValue(null)->end()
->booleanNode('verbose')->defaultFalse()->end()
->integerNode('connection_timeout')->defaultValue(30)->end()
->integerNode('write_timeout')->defaultValue(5)->end()
->booleanNode('debug')->defaultValue(false)->end()
->booleanNode('is_random_client_id')->defaultValue(false)->end()
->arrayNode('context')
->children()
->arrayNode('tls')
Expand Down
2 changes: 2 additions & 0 deletions src/Events/CloudEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CloudEvent extends Event
* @var string
* @Serializer\Expose()
* @Serializer\Type("string")
* @Serializer\SerializedName("specversion")
*/
private $specVersion = '0.3';

Expand Down Expand Up @@ -68,6 +69,7 @@ class CloudEvent extends Event
* @var string
* @Serializer\Expose()
* @Serializer\Type("string")
* @Serializer\SerializedName("datacontenttype")
*/
private $dataContentType = 'application/json';

Expand Down
25 changes: 25 additions & 0 deletions src/Events/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,32 @@
namespace LeNats\Events;

use Symfony\Component\EventDispatcher\Event as BaseEvent;
use JMS\Serializer\Annotation as Serializer;

/**
* @Serializer\ExclusionPolicy("ALL")
*/
abstract class Event extends BaseEvent
{
/**
* @var bool
* @Serializer\Exclude()
*/
protected $propagationStopped = false;

/**
* @return bool
*/
public function isPropagationStopped(): bool
{
return $this->propagationStopped;
}

/**
* @deprecated since Symfony 4.3, use "Symfony\Contracts\EventDispatcher\Event" instead
*/
public function stopPropagation(): void
{
$this->propagationStopped = true;
}
}
236 changes: 236 additions & 0 deletions src/Events/Fake/CloudEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
<?php

namespace LeNats\Events\Fake;

use DateTimeImmutable;
use DateTimeInterface;
use LeNats\Subscription\Subscription;
use JMS\Serializer\Annotation as Serializer;

/**
* @Serializer\ExclusionPolicy("ALL")
*/
class CloudEvent
{
/**
* @var mixed
* @Serializer\Expose()
* @Serializer\Type("array")
*/
protected $data;

/**
* @var string
* @Serializer\Expose()
* @Serializer\Type("string")
* @Serializer\SerializedName("specversion")
*/
private $specVersion = '0.3';

/**
* @var string
* @Serializer\Expose()
* @Serializer\Type("string")
*/
private $type;

/**
* @var string|null
* @Serializer\Expose()
* @Serializer\Type("string")
*/
private $source;

/**
* @var string
* @Serializer\Expose()
* @Serializer\Type("string")
*/
private $id;

/**
* @var int|string
*/
private $sequenceId;

/**
* @var Subscription
*/
private $subscription;

/**
* @var DateTimeInterface
* @Serializer\Expose()
* @Serializer\Type("DateTimeImmutable")
*/
private $time;

/**
* @var string
* @Serializer\Expose()
* @Serializer\Type("string")
* @Serializer\SerializedName("datacontenttype")
*/
private $dataContentType = 'application/json';

public function __construct()
{
$this->time = new DateTimeImmutable();
}

/**
* @return string
*/
public function getSpecVersion(): string
{
return $this->specVersion;
}

/**
* @param string $specVersion
*/
public function setSpecVersion(string $specVersion): void
{
$this->specVersion = $specVersion;
}

/**
* @return string
*/
public function getType(): string
{
return $this->type;
}

/**
* @param string $type
*/
public function setType(string $type): void
{
$this->type = $type;
}

/**
* @return string
*/
public function getSource(): ?string
{
return $this->source;
}

/**
* @param string $source
*/
public function setSource(?string $source): void
{
$this->source = $source;
}

/**
* @return string
*/
public function getId(): string
{
return $this->id;
}

/**
* @param string $id
*/
public function setId(string $id): void
{
$this->id = $id;
}

/**
* @return Subscription
*/
public function getSubscription(): Subscription
{
return $this->subscription;
}

/**
* @param Subscription $subscription
*/
public function setSubscription(Subscription $subscription): void
{
$this->subscription = $subscription;
}

/**
* @return DateTimeInterface
*/
public function getTime(): DateTimeInterface
{
return $this->time;
}

/**
* @param DateTimeInterface $time
*/
public function setTime(DateTimeInterface $time): void
{
$this->time = $time;
}

/**
* @return string
*/
public function getDataContentType(): string
{
return $this->dataContentType;
}

/**
* @param string $dataContentType
*/
public function setDataContentType(string $dataContentType): void
{
$this->dataContentType = $dataContentType;
}

/**
* @return mixed
*/
public function getData()
{
return $this->data;
}

/**
* @param mixed $data
*/
public function setData($data): void
{
$this->data = $data;
}

/**
* @return int|string
*/
public function getSequenceId()
{
return $this->sequenceId;
}

/**
* @param int|string $sequenceId
*/
public function setSequenceId($sequenceId): void
{
$this->sequenceId = $sequenceId;
}

/**
* @Serializer\PreSerialize()
*/
public function setIdFromData(): void
{
if (is_object($this->data) && method_exists($this->data, 'getId')) {
$this->id = $this->data->getId();
} elseif (is_array($this->data) && isset($this->data['id'])) {
$this->id = $this->data['id'];
}
}
}
12 changes: 8 additions & 4 deletions src/Listeners/MessageProcessorSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public function processBuffer(): void
$handled = false;
$commands = Protocol::getServerMethods();

while (!$handled && $command = array_shift($commands)) {
while (!$handled && !empty($commands)) {
$command = array_shift($commands);

if (strpos($line, $command) !== 0) {
continue;
}
Expand Down Expand Up @@ -177,11 +179,13 @@ private function getFullMessage(string $rawMessage): ?UndefinedMessageReceived
{
$buffer = $this->buffer;
$message = explode(Protocol::SPC, $rawMessage, 5);
array_shift($message);

if (count($message) < 3) {
throw new StreamException('Wrong message format: ' . $rawMessage);
if (empty($message) || count($message) < 4) {
return null;
}

array_shift($message);

$length = (int)array_pop($message);
$message = array_pad($message, 3, null);
[$subject, $sid, $replay] = $message;
Expand Down
23 changes: 21 additions & 2 deletions src/Listeners/SubscriptionListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use LeNats\Subscription\Subscriber;
use LeNats\Support\Dispatcherable;
use NatsStreamingProtocol\MsgProto;
use LeNats\Events\Fake\CloudEvent as FakeCloudEvent;

class SubscriptionListener implements EventDispatcherAwareInterface
{
Expand Down Expand Up @@ -67,13 +68,31 @@ public function handle(SubscriptionMessageReceived $event): void
$eventType = $data['type'];
unset($data);

$eventClass = $this->typeResolver->getClass($eventType) ?? CloudEvent::class;
$defaultClass = CloudEvent::class;

if (!class_exists('Symfony\Contracts\EventDispatcher\Event')) {
$defaultClass = FakeCloudEvent::class;
}

$eventClass = $this->typeResolver->getClass($eventType) ?? $defaultClass;

$cloudEvent = $this->serializer->deserialize($message->getData(), $eventClass, 'json');
if (!($cloudEvent instanceof CloudEvent)) {
if (!($cloudEvent instanceof CloudEvent) && !($cloudEvent instanceof FakeCloudEvent)) {
throw new SubscriptionException($eventClass . ' must be instance of CloudEvent');
}

if ($cloudEvent instanceof FakeCloudEvent) {
$realCloudEvent = new CloudEvent();
$realCloudEvent->setData($cloudEvent->getData());
$realCloudEvent->setSpecVersion($cloudEvent->getSpecVersion());
$realCloudEvent->setType($cloudEvent->getType());
$realCloudEvent->setSource($cloudEvent->getSource());
$realCloudEvent->setId($cloudEvent->getId());
$realCloudEvent->setTime($cloudEvent->getTime());

$cloudEvent = $realCloudEvent;
}

$subscription = $event->subscription;
$cloudEvent->setSubscription($subscription);
$cloudEvent->setSequenceId($message->getSequence());
Expand Down
13 changes: 13 additions & 0 deletions src/Services/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace LeNats\Services;

use LeNats\Support\RandomGenerator;
use NatsStreamingProtocol\ConnectResponse;
use RandomLib\Factory;

class Configuration
{
Expand Down Expand Up @@ -84,6 +86,17 @@ public function __construct(?array $config = null)
$this->{$method}($value);
}
}

if (!empty($config['is_random_client_id']) && $config['is_random_client_id']) {
if (PHP_VERSION_ID > 70000) {
$generator = new RandomGenerator();
} else {
$randomFactory = new Factory();
$generator = $randomFactory->getLowStrengthGenerator();
}

$this->clientId .= '_' . $generator->generateString(16);
}
}

public function configureConnection(ConnectResponse $response): void
Expand Down
Loading

0 comments on commit 5e4aea5

Please sign in to comment.