Skip to content

EventStore asynchronous PHP client with reactiveX flavours

License

Notifications You must be signed in to change notification settings

Rxnet/eventstore-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Event Store Client

License Latest Stable Version Total Downloads Latest Unstable Version composer.lock FOSSA Status

Asynchronous client for EventStore TCP Api

Usage

Connect

<?php
$eventStore = new \Rxnet\EventStore\EventStore();
// Default value
$eventStore->connect('tcp://admin:changeit@localhost:1113');

$eventStore = new \Rxnet\EventStore\EventStore();
// Lazy way, to connect
$eventStore->connect()
->subscribe(function() { echo "connected"; });

Write

You can put as many event you want (max 2000)

<?php
$eventA = new \Rxnet\EventStore\NewEvent\JsonEvent('event_type1', ['data' => 'a'], ['worker'=>'metadata']);
$eventB = new \Rxnet\EventStore\RawEvent('event_type2', 'raw data', 'raw metadata');

$eventStore->write('category-test_stream_id', [$eventA, $eventB])
    ->subscribe(function(\Rxnet\EventStore\Data\WriteEventsCompleted $eventsCompleted) {
        echo "Last event number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} \n";
    });

Transaction

<?php
$eventStore->startTransaction('category-test_stream')
    ->subscribe(
        function (\Rxnet\EventStore\Transaction $transaction) {
            $eventA = new JsonEvent('event_type', ['i' => "data"]);
            $eventB = new JsonEvent('event_type', ['i' => "data"]);
            // You can write as many as you want
            return $transaction->write([$eventA, $eventB])
                // Commit to make it work
                ->flatMap([$transaction, 'commit'])
                ->subscribe(
                    function (TransactionCommitCompleted $commitCompleted) {
                        echo "Transaction {$commitCompleted->getTransactionId()} commit completed : events from {$commitCompleted->getFirstEventNumber()} to {$commitCompleted->getLastEventNumber()} \n";
                    }
                );
        }
    );

Subscription

Connect to persistent subscription $ce-category (projection) has group my-group, then acknowledge or not

<?php
$eventStore->persistentSubscription('projection-name', 'my-group')
    ->subscribe(function(\Rxnet\EventStore\AcknowledgeableEventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
        if($event->getNumber() %2) {
            $event->ack();
        }
        else {
            $event->nack($event::NACK_ACTION_RETRY, 'Explain why');
        }
    });

Watch given stream for new events.
SubscribeCallback will be called when a new event appeared

<?php
$eventStore->volatileSubscription('category-test_stream_id')
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

Read all events from position 100, when everything is read, watch for new events (like volatile)

<?php
$eventStore->catchUpSubscription('category-test_stream_id', 100)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

Read

Read from event 0 to event 100 on stream category-test_stream_id then end

<?php
$eventStore->readEventsForward('category-test_stream_id', 0, 100)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

Read backward (latest to oldest) from event 100 to event 90 on stream category-test_stream_id then end

<?php
$eventStore->readEventsBackWard('category-test_stream_id', 100, 10)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

Read first event detail from category-test_stream_id

<?php
$eventStore->readEvent('category-test_stream_id', 0)
    ->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
        echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
    });

Contribute

TODO

  • Append event to stream
  • Read given stream
  • Subscribe to given stream
  • Read a huge stream
  • Persistent subscription
  • Connect to cluster
  • Auto re-connect to master if needed
  • Reconnect and disconnected from remote
  • Transactions
  • TLS connect
  • Write some specs
  • create / update / delete persistent subscription
  • create / update / delete projection
  • delete stream

Protocol buffer

If ClientMessageDtos.proto is modified, you must generate new Data php class

./vendor/bin/protobuf --include-descriptors -i . -o ./src ./ClientMessageDtos.proto

License

FOSSA Status