Skip to content

Commit

Permalink
align in memory event store and stream with pdo implementations:
Browse files Browse the repository at this point in the history
- when filtering by no producers whole stream should be returned
- in memory event store did not use producer type in internal streams
  • Loading branch information
alanbem committed Mar 22, 2018
1 parent 68694ea commit e7e7ea1
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
24 changes: 16 additions & 8 deletions src/Infrastructure/EventStore/InMemoryEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public function add(Domain\Id $producerId, ?int $version, Event ...$events) : vo
return;
}

$type = get_class($producerId);
$id = $producerId->toString();
$stream = $type.$id;

$transaction = [
'uuids' => [],
Expand All @@ -78,15 +80,15 @@ public function add(Domain\Id $producerId, ?int $version, Event ...$events) : vo

$uuid = Domain\Id\UUID::create()->toString();

if (!isset($this->streams[$id])) {
$this->streams[$id] = [];
if (!isset($this->streams[$stream])) {
$this->streams[$stream] = [];
}

if (null === $version) {
$version = count($this->streams[$id]);
$version = count($this->streams[$stream]);
} else {
++$version;
if (isset($this->streams[$id][$version])) {
if (isset($this->streams[$stream][$version])) {
throw new Exception\ConcurrentWriteDetected($producerId);
}
}
Expand All @@ -102,7 +104,7 @@ public function add(Domain\Id $producerId, ?int $version, Event ...$events) : vo
}

$this->uuids = array_merge($this->uuids, $transaction['uuids']);
$this->streams[$id] = array_merge($this->streams[$id], $transaction['stream']);
$this->streams[$stream] = array_merge($this->streams[$stream], $transaction['stream']);
$this->all = array_merge($this->all, $transaction['all']);

foreach ($transaction['metadata'] as $pair) {
Expand All @@ -115,11 +117,17 @@ public function add(Domain\Id $producerId, ?int $version, Event ...$events) : vo

public function streamFor(Domain\Id ...$producers) : Event\FilterableStream
{
if (0 === count($producers)) {
return new InMemoryStream(...$this->all);
}

$streams = [];
foreach ($producers as $producer) {
$producer = $producer->toString();
if (isset($this->streams[$producer])) {
$streams = array_merge($streams, $this->streams[$producer]);
$type = get_class($producer);
$id = $producer->toString();
$stream = $type.$id;
if (isset($this->streams[$stream])) {
$streams = array_merge($streams, $this->streams[$stream]);
}
}

Expand Down
25 changes: 23 additions & 2 deletions tests/Infrastructure/EventStore/EventStoreTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Streak\Infrastructure\EventBus\PDOPostgresEventStoreTest\Event3;
use Streak\Infrastructure\EventBus\PDOPostgresEventStoreTest\Event4;
use Streak\Infrastructure\EventBus\PDOPostgresEventStoreTest\ProducerId1;
use Streak\Infrastructure\EventBus\PDOPostgresEventStoreTest\ProducerId2;

/**
* @author Alan Gabriel Bem <[email protected]>
Expand Down Expand Up @@ -83,8 +84,9 @@ public function testObject()
{
$this->assertSame($this->store->log(), $this->store->log());

$producer11 = new ProducerId1('producer1-1');
$producer12 = new ProducerId1('producer1-2');
$producer11 = new ProducerId1('producer1');
$producer12 = new ProducerId1('producer2');
$producer21 = new ProducerId2('producer1');

$event111 = new Event1();
$event112 = new Event2();
Expand All @@ -96,6 +98,11 @@ public function testObject()
$event123 = new Event3();
$event124 = new Event4();

$event211 = new Event1();
$event212 = new Event2();
$event213 = new Event3();
$event214 = new Event4();

$this->assertEquals([], iterator_to_array($this->store->log()));

$stream = $this->store->stream($producer11);
Expand Down Expand Up @@ -210,6 +217,20 @@ public function testObject()
$this->assertFalse($stream->empty());
$this->assertEquals($event111, $stream->first());
$this->assertEquals($event124, $stream->last());

$this->store->add($producer21, 0, $event211, $event212, $event213, $event214);

$stream = $this->store->stream($producer21);
$this->assertEquals([$event211, $event212, $event213, $event214], iterator_to_array($stream));
$this->assertFalse($stream->empty());
$this->assertEquals($event211, $stream->first());
$this->assertEquals($event214, $stream->last());

$stream = $this->store->stream();
$this->assertEquals([$event111, $event112, $event113, $event114, $event121, $event122, $event123, $event124, $event211, $event212, $event213, $event214], iterator_to_array($stream));
$this->assertFalse($stream->empty());
$this->assertEquals($event111, $stream->first());
$this->assertEquals($event214, $stream->last());
}

public function testConcurrentWriting()
Expand Down

0 comments on commit e7e7ea1

Please sign in to comment.