Asynchronous client for EventStore TCP Api
<?php
$eventStore = new \Rxnet\EventStore\EventStore();
// Default value
$eventStore->connect('tcp://admin:changeit@localhost:1113');
$eventStore = new \Rxnet\EventStore\EventStore();
// Lazy way, to connect
$eventStore->connect()
->subscribe(function() { echo "connected"; });
You can put as many event you want (max 2000)
<?php
$eventA = new \Rxnet\EventStore\NewEvent\JsonEvent('event_type1', ['data' => 'a'], ['worker'=>'metadata']);
$eventB = new \Rxnet\EventStore\RawEvent('event_type2', 'raw data', 'raw metadata');
$eventStore->write('category-test_stream_id', [$eventA, $eventB])
->subscribe(function(\Rxnet\EventStore\Data\WriteEventsCompleted $eventsCompleted) {
echo "Last event number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} \n";
});
<?php
$eventStore->startTransaction('category-test_stream')
->subscribe(
function (\Rxnet\EventStore\Transaction $transaction) {
$eventA = new JsonEvent('event_type', ['i' => "data"]);
$eventB = new JsonEvent('event_type', ['i' => "data"]);
// You can write as many as you want
return $transaction->write([$eventA, $eventB])
// Commit to make it work
->flatMap([$transaction, 'commit'])
->subscribe(
function (TransactionCommitCompleted $commitCompleted) {
echo "Transaction {$commitCompleted->getTransactionId()} commit completed : events from {$commitCompleted->getFirstEventNumber()} to {$commitCompleted->getLastEventNumber()} \n";
}
);
}
);
Connect to persistent subscription $ce-category (projection) has group my-group, then acknowledge or not
<?php
$eventStore->persistentSubscription('projection-name', 'my-group')
->subscribe(function(\Rxnet\EventStore\AcknowledgeableEventRecord $event) {
echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
if($event->getNumber() %2) {
$event->ack();
}
else {
$event->nack($event::NACK_ACTION_RETRY, 'Explain why');
}
});
Watch given stream for new events.
SubscribeCallback will be called when a new event appeared
<?php
$eventStore->volatileSubscription('category-test_stream_id')
->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
});
Read all events from position 100, when everything is read, watch for new events (like volatile)
<?php
$eventStore->catchUpSubscription('category-test_stream_id', 100)
->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
});
Read from event 0 to event 100 on stream category-test_stream_id then end
<?php
$eventStore->readEventsForward('category-test_stream_id', 0, 100)
->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
});
Read backward (latest to oldest) from event 100 to event 90 on stream category-test_stream_id then end
<?php
$eventStore->readEventsBackWard('category-test_stream_id', 100, 10)
->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
});
Read first event detail from category-test_stream_id
<?php
$eventStore->readEvent('category-test_stream_id', 0)
->subscribe(function(\Rxnet\EventStore\EventRecord $event) {
echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n";
});
- Append event to stream
- Read given stream
- Subscribe to given stream
- Read a huge stream
- Persistent subscription
- Connect to cluster
- Auto re-connect to master if needed
- Reconnect and disconnected from remote
- Transactions
- TLS connect
- Write some specs
- create / update / delete persistent subscription
- create / update / delete projection
- delete stream
If ClientMessageDtos.proto is modified, you must generate new Data php class
./vendor/bin/protobuf --include-descriptors -i . -o ./src ./ClientMessageDtos.proto