Skip to content

Commit

Permalink
Fixed up publish to work with the new session observable
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Jan 19, 2018
1 parent e5c1cd7 commit 41f6ab2
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,19 @@ public function publish(string $uri, $obs, array $options = []): DisposableInter

return $this->session
->takeUntil($completed->delay(1))
->mapTo($obs->finally(function () use ($completed) {
$completed->onNext(0);
}))
->switchFirst()
->map(function ($value) use ($uri, $options) {
return new PublishMessage(Utils::getUniqueId(), (object)$options, $uri, [$value]);
->pluck(1)
->map(function ( $webSocket) use ($obs, $completed, $uri, $options) {
return $obs
->finally(function () use ($completed) {
$completed->onNext(0);
})
->map(function ($value) use ($uri, $options) {
return new PublishMessage(Utils::getUniqueId(), (object)$options, $uri, [$value]);
})
->do([$webSocket, 'onNext']);
})
->subscribe(function ($value) {
$this->webSocket->onNext($value);
});
->switchFirst()
->subscribe();
}

public function onChallenge(callable $challengeCallback)
Expand Down

0 comments on commit 41f6ab2

Please sign in to comment.